rc_zip_tokio/
read_zip.rs

1use std::{
2    cmp, io,
3    ops::Deref,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use futures_util::future::BoxFuture;
10use positioned_io::{RandomAccessFile, ReadAt, Size};
11use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
12
13use rc_zip::{
14    fsm::{ArchiveFsm, EntryFsm, FsmResult},
15    Archive, Entry, Error,
16};
17use tracing::trace;
18
19use crate::{entry_reader::EntryReader, StreamingEntryReader};
20
21/// A trait for reading something as a zip archive.
22///
23/// See also [ReadZip].
24pub trait ReadZipWithSize {
25    /// The type of the file to read from.
26    type File: HasCursor;
27
28    /// Reads self as a zip archive.
29    #[allow(async_fn_in_trait)]
30    async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, Self::File>, Error>;
31}
32
33/// A zip archive, read asynchronously from a file or other I/O resource.
34///
35/// This only contains metadata for the archive and its entries. Separate
36/// readers can be created for arbitraries entries on-demand using
37/// [EntryHandle::reader].
38pub trait ReadZip {
39    /// The type of the file to read from.
40    type File: HasCursor;
41
42    /// Reads self as a zip archive.
43    #[allow(async_fn_in_trait)]
44    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error>;
45}
46
47struct CursorState<'a, F: HasCursor + 'a> {
48    cursor: <F as HasCursor>::Cursor<'a>,
49    offset: u64,
50}
51
52impl<'a, F: HasCursor + 'a> CursorState<'a, F> {
53    /// Constructs a cursor only _after_ doing a bounds check with `offset` and `size`
54    fn try_new(has_cursor: &'a F, offset: u64, size: u64) -> Result<Self, Error> {
55        if offset > size {
56            return Err(std::io::Error::other(format!(
57                "archive tried reading beyond zip archive end. {offset} goes beyond {size}"
58            ))
59            .into());
60        }
61        let cursor = has_cursor.cursor_at(offset);
62        Ok(Self { cursor, offset })
63    }
64}
65
66impl<F> ReadZipWithSize for F
67where
68    F: HasCursor,
69{
70    type File = F;
71
72    async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, F>, Error> {
73        let mut cstate: Option<CursorState<'_, F>> = None;
74
75        let mut fsm = ArchiveFsm::new(size);
76        loop {
77            if let Some(offset) = fsm.wants_read() {
78                let mut cstate_next = match cstate.take() {
79                    // all good, re-using
80                    Some(cstate) if cstate.offset == offset => cstate,
81                    Some(cstate) => {
82                        trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");
83                        CursorState::try_new(self, offset, size)?
84                    }
85                    None => {
86                        trace!(%offset, "read_zip_with_size: making new cursor (had none)");
87                        CursorState::try_new(self, offset, size)?
88                    }
89                };
90
91                match cstate_next.cursor.read(fsm.space()).await {
92                    Ok(read_bytes) => {
93                        cstate_next.offset += read_bytes as u64;
94                        cstate = Some(cstate_next);
95
96                        trace!(%read_bytes, "filling fsm");
97                        if read_bytes == 0 {
98                            return Err(Error::IO(io::ErrorKind::UnexpectedEof.into()));
99                        }
100                        fsm.fill(read_bytes);
101                    }
102                    Err(err) => return Err(Error::IO(err)),
103                }
104            }
105
106            fsm = match fsm.process()? {
107                FsmResult::Done(archive) => {
108                    return Ok(ArchiveHandle {
109                        file: self,
110                        archive,
111                    })
112                }
113                FsmResult::Continue(fsm) => fsm,
114            }
115        }
116    }
117}
118
119impl ReadZip for &[u8] {
120    type File = Self;
121
122    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
123        self.read_zip_with_size(self.len() as u64).await
124    }
125}
126
127impl ReadZip for Vec<u8> {
128    type File = Self;
129
130    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
131        self.read_zip_with_size(self.len() as u64).await
132    }
133}
134
135impl ReadZip for Arc<RandomAccessFile> {
136    type File = Self;
137
138    async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
139        let size = self.size()?.unwrap_or_default();
140        self.read_zip_with_size(size).await
141    }
142}
143
144/// A zip archive, read asynchronously from a file or other I/O resource.
145pub struct ArchiveHandle<'a, F>
146where
147    F: HasCursor,
148{
149    file: &'a F,
150    archive: Archive,
151}
152
153impl<F> Deref for ArchiveHandle<'_, F>
154where
155    F: HasCursor,
156{
157    type Target = Archive;
158
159    fn deref(&self) -> &Self::Target {
160        &self.archive
161    }
162}
163
164impl<F> ArchiveHandle<'_, F>
165where
166    F: HasCursor,
167{
168    /// Iterate over all files in this zip, read from the central directory.
169    pub fn entries(&self) -> impl Iterator<Item = EntryHandle<'_, F>> {
170        self.archive.entries().map(move |entry| EntryHandle {
171            file: self.file,
172            entry,
173        })
174    }
175
176    /// Attempts to look up an entry by name. This is usually a bad idea,
177    /// as names aren't necessarily normalized in zip archives.
178    pub fn by_name<N: AsRef<str>>(&self, name: N) -> Option<EntryHandle<'_, F>> {
179        self.archive
180            .entries()
181            .find(|&x| x.name == name.as_ref())
182            .map(|entry| EntryHandle {
183                file: self.file,
184                entry,
185            })
186    }
187}
188
189/// A single entry in a zip archive, read asynchronously from a file or other I/O resource.
190pub struct EntryHandle<'a, F> {
191    file: &'a F,
192    entry: &'a Entry,
193}
194
195impl<F> Deref for EntryHandle<'_, F> {
196    type Target = Entry;
197
198    fn deref(&self) -> &Self::Target {
199        self.entry
200    }
201}
202
203impl<'a, F> EntryHandle<'a, F>
204where
205    F: HasCursor,
206{
207    /// Returns a reader for the entry.
208    pub fn reader(&self) -> impl AsyncRead + Unpin + '_ {
209        EntryReader::new(self.entry, |offset| self.file.cursor_at(offset))
210    }
211
212    /// Reads the entire entry into a vector.
213    pub async fn bytes(&self) -> io::Result<Vec<u8>> {
214        let mut v = Vec::new();
215        self.reader().read_to_end(&mut v).await?;
216        Ok(v)
217    }
218}
219
220/// A sliceable I/O resource: we can ask for an [AsyncRead] at a given offset.
221pub trait HasCursor {
222    /// The type returned by [HasCursor::cursor_at].
223    type Cursor<'a>: AsyncRead + Unpin + 'a
224    where
225        Self: 'a;
226
227    /// Returns an [AsyncRead] at the given offset.
228    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
229}
230
231impl HasCursor for &[u8] {
232    type Cursor<'a>
233        = &'a [u8]
234    where
235        Self: 'a;
236
237    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
238        &self[offset.try_into().unwrap()..]
239    }
240}
241
242impl HasCursor for Vec<u8> {
243    type Cursor<'a>
244        = &'a [u8]
245    where
246        Self: 'a;
247
248    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
249        &self[offset.try_into().unwrap()..]
250    }
251}
252
253impl HasCursor for Arc<RandomAccessFile> {
254    type Cursor<'a>
255        = AsyncRandomAccessFileCursor
256    where
257        Self: 'a;
258
259    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
260        AsyncRandomAccessFileCursor {
261            state: ARAFCState::Idle(ARAFCCore {
262                file_offset: offset,
263                inner_buf: vec![0u8; 128 * 1024],
264                // inner_buf: vec![0u8; 128],
265                inner_buf_len: 0,
266                inner_buf_offset: 0,
267                file: self.clone(),
268            }),
269        }
270    }
271}
272
273struct ARAFCCore {
274    // offset we're reading from in the file
275    file_offset: u64,
276
277    // note: the length of this vec is the inner buffer capacity
278    inner_buf: Vec<u8>,
279
280    // the start of data we haven't returned put to caller buffets yet
281    inner_buf_offset: usize,
282
283    // the end of data we haven't returned put to caller buffets yet
284    inner_buf_len: usize,
285
286    file: Arc<RandomAccessFile>,
287}
288
289type JoinResult<T> = Result<T, tokio::task::JoinError>;
290
291#[derive(Default)]
292enum ARAFCState {
293    Idle(ARAFCCore),
294    Reading {
295        fut: BoxFuture<'static, JoinResult<Result<ARAFCCore, io::Error>>>,
296    },
297
298    #[default]
299    Transitioning,
300}
301
302/// A cursor for reading from a [RandomAccessFile] asynchronously.
303pub struct AsyncRandomAccessFileCursor {
304    state: ARAFCState,
305}
306
307impl AsyncRead for AsyncRandomAccessFileCursor {
308    fn poll_read(
309        mut self: Pin<&mut Self>,
310        cx: &mut Context<'_>,
311        buf: &mut ReadBuf<'_>,
312    ) -> Poll<io::Result<()>> {
313        match &mut self.state {
314            ARAFCState::Idle(core) => {
315                if core.inner_buf_offset < core.inner_buf_len {
316                    // we have data in the inner buffer, don't even need
317                    // to spawn a blocking task
318                    let read_len =
319                        cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
320
321                    buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
322                    core.inner_buf_offset += read_len;
323                    trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer");
324
325                    return Poll::Ready(Ok(()));
326                }
327
328                // this is just used to shadow core
329                #[allow(unused_variables, clippy::let_unit_value)]
330                let core = ();
331
332                let (file_offset, file, mut inner_buf) = {
333                    let core = match std::mem::take(&mut self.state) {
334                        ARAFCState::Idle(core) => core,
335                        _ => unreachable!(),
336                    };
337                    (core.file_offset, core.file, core.inner_buf)
338                };
339
340                let fut = Box::pin(tokio::task::spawn_blocking(move || {
341                    let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
342                    trace!(%read_bytes, "read from file");
343                    Ok(ARAFCCore {
344                        file_offset: file_offset + read_bytes as u64,
345                        file,
346                        inner_buf,
347                        inner_buf_len: read_bytes,
348                        inner_buf_offset: 0,
349                    })
350                }));
351                self.state = ARAFCState::Reading { fut };
352                self.poll_read(cx, buf)
353            }
354            ARAFCState::Reading { fut } => {
355                let core =
356                    futures_util::ready!(fut.as_mut().poll(cx).map_err(io::Error::other)??);
357                let is_eof = core.inner_buf_len == 0;
358                self.state = ARAFCState::Idle(core);
359
360                if is_eof {
361                    // we're at EOF
362                    return Poll::Ready(Ok(()));
363                }
364                self.poll_read(cx, buf)
365            }
366            ARAFCState::Transitioning => unreachable!(),
367        }
368    }
369}
370
371/// Allows reading zip entries in a streaming fashion, without seeking,
372/// based only on local headers. THIS IS NOT RECOMMENDED, as correctly
373/// reading zip files requires reading the central directory (located at
374/// the end of the file).
375pub trait ReadZipStreaming<R>
376where
377    R: AsyncRead,
378{
379    /// Get the first zip entry from the stream as a [StreamingEntryReader].
380    ///
381    /// See the trait's documentation for why using this is
382    /// generally a bad idea: you might want to use [ReadZip] or
383    /// [ReadZipWithSize] instead.
384    #[allow(async_fn_in_trait)]
385    async fn stream_zip_entries_throwing_caution_to_the_wind(
386        self,
387    ) -> Result<StreamingEntryReader<R>, Error>;
388}
389
390impl<R> ReadZipStreaming<R> for R
391where
392    R: AsyncRead + Unpin,
393{
394    async fn stream_zip_entries_throwing_caution_to_the_wind(
395        mut self,
396    ) -> Result<StreamingEntryReader<Self>, Error> {
397        let mut fsm = EntryFsm::new(None, None);
398
399        loop {
400            if fsm.wants_read() {
401                let n = self.read(fsm.space()).await?;
402                trace!("read {} bytes into buf for first zip entry", n);
403                fsm.fill(n);
404            }
405
406            if let Some(entry) = fsm.process_till_header()? {
407                let entry = entry.clone();
408                return Ok(StreamingEntryReader::new(fsm, entry, self));
409            }
410        }
411    }
412}