rc_zip_tokio/
read_zip.rs

1use std::{
2    cmp, io,
3    ops::Deref,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use futures_util::future::BoxFuture;
10use positioned_io::{RandomAccessFile, ReadAt, Size};
11use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
12
13use rc_zip::{
14    error::Error,
15    fsm::{ArchiveFsm, EntryFsm, FsmResult},
16    parse::{Archive, Entry},
17};
18use tracing::trace;
19
20use crate::{entry_reader::EntryReader, StreamingEntryReader};
21
22/// A trait for reading something as a zip archive.
23///
24/// See also [ReadZip].
25pub trait ReadZipWithSize {
26    /// The type of the file to read from.
27    type File: HasCursor;
28
29    /// Reads self as a zip archive.
30    #[allow(async_fn_in_trait)]
31    async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, Self::File>, Error>;
32}
33
34/// A zip archive, read asynchronously from a file or other I/O resource.
35///
36/// This only contains metadata for the archive and its entries. Separate
37/// readers can be created for arbitraries entries on-demand using
38/// [EntryHandle::reader].
39pub trait ReadZip {
40    /// The type of the file to read from.
41    type File: HasCursor;
42
43    /// Reads self as a zip archive.
44    #[allow(async_fn_in_trait)]
45    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error>;
46}
47
48struct CursorState<'a, F: HasCursor + 'a> {
49    cursor: <F as HasCursor>::Cursor<'a>,
50    offset: u64,
51}
52
53impl<'a, F: HasCursor + 'a> CursorState<'a, F> {
54    /// Constructs a cursor only _after_ doing a bounds check with `offset` and `size`
55    fn try_new(has_cursor: &'a F, offset: u64, size: u64) -> Result<Self, Error> {
56        if offset > size {
57            return Err(std::io::Error::other(format!(
58                "archive tried reading beyond zip archive end. {offset} goes beyond {size}"
59            ))
60            .into());
61        }
62        let cursor = has_cursor.cursor_at(offset);
63        Ok(Self { cursor, offset })
64    }
65}
66
67impl<F> ReadZipWithSize for F
68where
69    F: HasCursor,
70{
71    type File = F;
72
73    async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, F>, Error> {
74        let mut cstate: Option<CursorState<'_, F>> = None;
75
76        let mut fsm = ArchiveFsm::new(size);
77        loop {
78            if let Some(offset) = fsm.wants_read() {
79                let mut cstate_next = match cstate.take() {
80                    // all good, re-using
81                    Some(cstate) if cstate.offset == offset => cstate,
82                    Some(cstate) => {
83                        trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");
84                        CursorState::try_new(self, offset, size)?
85                    }
86                    None => {
87                        trace!(%offset, "read_zip_with_size: making new cursor (had none)");
88                        CursorState::try_new(self, offset, size)?
89                    }
90                };
91
92                match cstate_next.cursor.read(fsm.space()).await {
93                    Ok(read_bytes) => {
94                        cstate_next.offset += read_bytes as u64;
95                        cstate = Some(cstate_next);
96
97                        trace!(%read_bytes, "filling fsm");
98                        if read_bytes == 0 {
99                            return Err(Error::IO(io::ErrorKind::UnexpectedEof.into()));
100                        }
101                        fsm.fill(read_bytes);
102                    }
103                    Err(err) => return Err(Error::IO(err)),
104                }
105            }
106
107            fsm = match fsm.process()? {
108                FsmResult::Done(archive) => {
109                    return Ok(ArchiveHandle {
110                        file: self,
111                        archive,
112                    })
113                }
114                FsmResult::Continue(fsm) => fsm,
115            }
116        }
117    }
118}
119
120impl ReadZip for &[u8] {
121    type File = Self;
122
123    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
124        self.read_zip_with_size(self.len() as u64).await
125    }
126}
127
128impl ReadZip for Vec<u8> {
129    type File = Self;
130
131    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
132        self.read_zip_with_size(self.len() as u64).await
133    }
134}
135
136impl ReadZip for Arc<RandomAccessFile> {
137    type File = Self;
138
139    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
140        let size = self.size()?.unwrap_or_default();
141        self.read_zip_with_size(size).await
142    }
143}
144
145/// A zip archive, read asynchronously from a file or other I/O resource.
146pub struct ArchiveHandle<'a, F>
147where
148    F: HasCursor,
149{
150    file: &'a F,
151    archive: Archive,
152}
153
154impl<F> Deref for ArchiveHandle<'_, F>
155where
156    F: HasCursor,
157{
158    type Target = Archive;
159
160    fn deref(&self) -> &Self::Target {
161        &self.archive
162    }
163}
164
165impl<F> ArchiveHandle<'_, F>
166where
167    F: HasCursor,
168{
169    /// Iterate over all files in this zip, read from the central directory.
170    pub fn entries(&self) -> impl Iterator<Item = EntryHandle<'_, F>> {
171        self.archive.entries().map(move |entry| EntryHandle {
172            file: self.file,
173            entry,
174        })
175    }
176
177    /// Attempts to look up an entry by name. This is usually a bad idea,
178    /// as names aren't necessarily normalized in zip archives.
179    pub fn by_name<N: AsRef<str>>(&self, name: N) -> Option<EntryHandle<'_, F>> {
180        self.archive
181            .entries()
182            .find(|&x| x.name == name.as_ref())
183            .map(|entry| EntryHandle {
184                file: self.file,
185                entry,
186            })
187    }
188}
189
190/// A single entry in a zip archive, read asynchronously from a file or other I/O resource.
191pub struct EntryHandle<'a, F> {
192    file: &'a F,
193    entry: &'a Entry,
194}
195
196impl<F> Deref for EntryHandle<'_, F> {
197    type Target = Entry;
198
199    fn deref(&self) -> &Self::Target {
200        self.entry
201    }
202}
203
204impl<'a, F> EntryHandle<'a, F>
205where
206    F: HasCursor,
207{
208    /// Returns a reader for the entry.
209    pub fn reader(&self) -> impl AsyncRead + Unpin + '_ {
210        EntryReader::new(self.entry, |offset| self.file.cursor_at(offset))
211    }
212
213    /// Reads the entire entry into a vector.
214    pub async fn bytes(&self) -> io::Result<Vec<u8>> {
215        let mut v = Vec::new();
216        self.reader().read_to_end(&mut v).await?;
217        Ok(v)
218    }
219}
220
221/// A sliceable I/O resource: we can ask for an [AsyncRead] at a given offset.
222pub trait HasCursor {
223    /// The type returned by [HasCursor::cursor_at].
224    type Cursor<'a>: AsyncRead + Unpin + 'a
225    where
226        Self: 'a;
227
228    /// Returns an [AsyncRead] at the given offset.
229    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
230}
231
232impl HasCursor for &[u8] {
233    type Cursor<'a>
234        = &'a [u8]
235    where
236        Self: 'a;
237
238    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
239        &self[offset.try_into().unwrap()..]
240    }
241}
242
243impl HasCursor for Vec<u8> {
244    type Cursor<'a>
245        = &'a [u8]
246    where
247        Self: 'a;
248
249    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
250        &self[offset.try_into().unwrap()..]
251    }
252}
253
254impl HasCursor for Arc<RandomAccessFile> {
255    type Cursor<'a>
256        = AsyncRandomAccessFileCursor
257    where
258        Self: 'a;
259
260    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
261        AsyncRandomAccessFileCursor {
262            state: ARAFCState::Idle(ARAFCCore {
263                file_offset: offset,
264                inner_buf: vec![0u8; 128 * 1024],
265                // inner_buf: vec![0u8; 128],
266                inner_buf_len: 0,
267                inner_buf_offset: 0,
268                file: self.clone(),
269            }),
270        }
271    }
272}
273
274struct ARAFCCore {
275    // offset we're reading from in the file
276    file_offset: u64,
277
278    // note: the length of this vec is the inner buffer capacity
279    inner_buf: Vec<u8>,
280
281    // the start of data we haven't returned put to caller buffets yet
282    inner_buf_offset: usize,
283
284    // the end of data we haven't returned put to caller buffets yet
285    inner_buf_len: usize,
286
287    file: Arc<RandomAccessFile>,
288}
289
290type JoinResult<T> = Result<T, tokio::task::JoinError>;
291
292#[derive(Default)]
293enum ARAFCState {
294    Idle(ARAFCCore),
295    Reading {
296        fut: BoxFuture<'static, JoinResult<Result<ARAFCCore, io::Error>>>,
297    },
298
299    #[default]
300    Transitioning,
301}
302
303/// A cursor for reading from a [RandomAccessFile] asynchronously.
304pub struct AsyncRandomAccessFileCursor {
305    state: ARAFCState,
306}
307
308impl AsyncRead for AsyncRandomAccessFileCursor {
309    fn poll_read(
310        mut self: Pin<&mut Self>,
311        cx: &mut Context<'_>,
312        buf: &mut ReadBuf<'_>,
313    ) -> Poll<io::Result<()>> {
314        match &mut self.state {
315            ARAFCState::Idle(core) => {
316                if core.inner_buf_offset < core.inner_buf_len {
317                    // we have data in the inner buffer, don't even need
318                    // to spawn a blocking task
319                    let read_len =
320                        cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
321
322                    buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
323                    core.inner_buf_offset += read_len;
324                    trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer");
325
326                    return Poll::Ready(Ok(()));
327                }
328
329                // this is just used to shadow core
330                #[allow(unused_variables, clippy::let_unit_value)]
331                let core = ();
332
333                let (file_offset, file, mut inner_buf) = {
334                    let core = match std::mem::take(&mut self.state) {
335                        ARAFCState::Idle(core) => core,
336                        _ => unreachable!(),
337                    };
338                    (core.file_offset, core.file, core.inner_buf)
339                };
340
341                let fut = Box::pin(tokio::task::spawn_blocking(move || {
342                    let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
343                    trace!(%read_bytes, "read from file");
344                    Ok(ARAFCCore {
345                        file_offset: file_offset + read_bytes as u64,
346                        file,
347                        inner_buf,
348                        inner_buf_len: read_bytes,
349                        inner_buf_offset: 0,
350                    })
351                }));
352                self.state = ARAFCState::Reading { fut };
353                self.poll_read(cx, buf)
354            }
355            ARAFCState::Reading { fut } => {
356                let core =
357                    futures_util::ready!(fut.as_mut().poll(cx).map_err(io::Error::other)??);
358                let is_eof = core.inner_buf_len == 0;
359                self.state = ARAFCState::Idle(core);
360
361                if is_eof {
362                    // we're at EOF
363                    return Poll::Ready(Ok(()));
364                }
365                self.poll_read(cx, buf)
366            }
367            ARAFCState::Transitioning => unreachable!(),
368        }
369    }
370}
371
372/// Allows reading zip entries in a streaming fashion, without seeking,
373/// based only on local headers. THIS IS NOT RECOMMENDED, as correctly
374/// reading zip files requires reading the central directory (located at
375/// the end of the file).
376pub trait ReadZipStreaming<R>
377where
378    R: AsyncRead,
379{
380    /// Get the first zip entry from the stream as a [StreamingEntryReader].
381    ///
382    /// See the trait's documentation for why using this is
383    /// generally a bad idea: you might want to use [ReadZip] or
384    /// [ReadZipWithSize] instead.
385    #[allow(async_fn_in_trait)]
386    async fn stream_zip_entries_throwing_caution_to_the_wind(
387        self,
388    ) -> Result<StreamingEntryReader<R>, Error>;
389}
390
391impl<R> ReadZipStreaming<R> for R
392where
393    R: AsyncRead + Unpin,
394{
395    async fn stream_zip_entries_throwing_caution_to_the_wind(
396        mut self,
397    ) -> Result<StreamingEntryReader<Self>, Error> {
398        let mut fsm = EntryFsm::new(None, None);
399
400        loop {
401            if fsm.wants_read() {
402                let n = self.read(fsm.space()).await?;
403                trace!("read {} bytes into buf for first zip entry", n);
404                fsm.fill(n);
405            }
406
407            if let Some(entry) = fsm.process_till_header()? {
408                let entry = entry.clone();
409                return Ok(StreamingEntryReader::new(fsm, entry, self));
410            }
411        }
412    }
413}