1use std::{
2 cmp, io,
3 ops::Deref,
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use futures_util::future::BoxFuture;
10use positioned_io::{RandomAccessFile, ReadAt, Size};
11use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
12
13use rc_zip::{
14 error::Error,
15 fsm::{ArchiveFsm, EntryFsm, FsmResult},
16 parse::{Archive, Entry},
17};
18use tracing::trace;
19
20use crate::{entry_reader::EntryReader, StreamingEntryReader};
21
22pub trait ReadZipWithSize {
26 type File: HasCursor;
28
29 #[allow(async_fn_in_trait)]
31 async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, Self::File>, Error>;
32}
33
34pub trait ReadZip {
40 type File: HasCursor;
42
43 #[allow(async_fn_in_trait)]
45 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error>;
46}
47
48struct CursorState<'a, F: HasCursor + 'a> {
49 cursor: <F as HasCursor>::Cursor<'a>,
50 offset: u64,
51}
52
53impl<'a, F: HasCursor + 'a> CursorState<'a, F> {
54 fn try_new(has_cursor: &'a F, offset: u64, size: u64) -> Result<Self, Error> {
56 if offset > size {
57 return Err(std::io::Error::other(format!(
58 "archive tried reading beyond zip archive end. {offset} goes beyond {size}"
59 ))
60 .into());
61 }
62 let cursor = has_cursor.cursor_at(offset);
63 Ok(Self { cursor, offset })
64 }
65}
66
67impl<F> ReadZipWithSize for F
68where
69 F: HasCursor,
70{
71 type File = F;
72
73 async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, F>, Error> {
74 let mut cstate: Option<CursorState<'_, F>> = None;
75
76 let mut fsm = ArchiveFsm::new(size);
77 loop {
78 if let Some(offset) = fsm.wants_read() {
79 let mut cstate_next = match cstate.take() {
80 Some(cstate) if cstate.offset == offset => cstate,
82 Some(cstate) => {
83 trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");
84 CursorState::try_new(self, offset, size)?
85 }
86 None => {
87 trace!(%offset, "read_zip_with_size: making new cursor (had none)");
88 CursorState::try_new(self, offset, size)?
89 }
90 };
91
92 match cstate_next.cursor.read(fsm.space()).await {
93 Ok(read_bytes) => {
94 cstate_next.offset += read_bytes as u64;
95 cstate = Some(cstate_next);
96
97 trace!(%read_bytes, "filling fsm");
98 if read_bytes == 0 {
99 return Err(Error::IO(io::ErrorKind::UnexpectedEof.into()));
100 }
101 fsm.fill(read_bytes);
102 }
103 Err(err) => return Err(Error::IO(err)),
104 }
105 }
106
107 fsm = match fsm.process()? {
108 FsmResult::Done(archive) => {
109 return Ok(ArchiveHandle {
110 file: self,
111 archive,
112 })
113 }
114 FsmResult::Continue(fsm) => fsm,
115 }
116 }
117 }
118}
119
120impl ReadZip for &[u8] {
121 type File = Self;
122
123 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
124 self.read_zip_with_size(self.len() as u64).await
125 }
126}
127
128impl ReadZip for Vec<u8> {
129 type File = Self;
130
131 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
132 self.read_zip_with_size(self.len() as u64).await
133 }
134}
135
136impl ReadZip for Arc<RandomAccessFile> {
137 type File = Self;
138
139 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
140 let size = self.size()?.unwrap_or_default();
141 self.read_zip_with_size(size).await
142 }
143}
144
145pub struct ArchiveHandle<'a, F>
147where
148 F: HasCursor,
149{
150 file: &'a F,
151 archive: Archive,
152}
153
154impl<F> Deref for ArchiveHandle<'_, F>
155where
156 F: HasCursor,
157{
158 type Target = Archive;
159
160 fn deref(&self) -> &Self::Target {
161 &self.archive
162 }
163}
164
165impl<F> ArchiveHandle<'_, F>
166where
167 F: HasCursor,
168{
169 pub fn entries(&self) -> impl Iterator<Item = EntryHandle<'_, F>> {
171 self.archive.entries().map(move |entry| EntryHandle {
172 file: self.file,
173 entry,
174 })
175 }
176
177 pub fn by_name<N: AsRef<str>>(&self, name: N) -> Option<EntryHandle<'_, F>> {
180 self.archive
181 .entries()
182 .find(|&x| x.name == name.as_ref())
183 .map(|entry| EntryHandle {
184 file: self.file,
185 entry,
186 })
187 }
188}
189
190pub struct EntryHandle<'a, F> {
192 file: &'a F,
193 entry: &'a Entry,
194}
195
196impl<F> Deref for EntryHandle<'_, F> {
197 type Target = Entry;
198
199 fn deref(&self) -> &Self::Target {
200 self.entry
201 }
202}
203
204impl<'a, F> EntryHandle<'a, F>
205where
206 F: HasCursor,
207{
208 pub fn reader(&self) -> impl AsyncRead + Unpin + '_ {
210 EntryReader::new(self.entry, |offset| self.file.cursor_at(offset))
211 }
212
213 pub async fn bytes(&self) -> io::Result<Vec<u8>> {
215 let mut v = Vec::new();
216 self.reader().read_to_end(&mut v).await?;
217 Ok(v)
218 }
219}
220
221pub trait HasCursor {
223 type Cursor<'a>: AsyncRead + Unpin + 'a
225 where
226 Self: 'a;
227
228 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
230}
231
232impl HasCursor for &[u8] {
233 type Cursor<'a>
234 = &'a [u8]
235 where
236 Self: 'a;
237
238 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
239 &self[offset.try_into().unwrap()..]
240 }
241}
242
243impl HasCursor for Vec<u8> {
244 type Cursor<'a>
245 = &'a [u8]
246 where
247 Self: 'a;
248
249 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
250 &self[offset.try_into().unwrap()..]
251 }
252}
253
254impl HasCursor for Arc<RandomAccessFile> {
255 type Cursor<'a>
256 = AsyncRandomAccessFileCursor
257 where
258 Self: 'a;
259
260 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
261 AsyncRandomAccessFileCursor {
262 state: ARAFCState::Idle(ARAFCCore {
263 file_offset: offset,
264 inner_buf: vec![0u8; 128 * 1024],
265 inner_buf_len: 0,
267 inner_buf_offset: 0,
268 file: self.clone(),
269 }),
270 }
271 }
272}
273
274struct ARAFCCore {
275 file_offset: u64,
277
278 inner_buf: Vec<u8>,
280
281 inner_buf_offset: usize,
283
284 inner_buf_len: usize,
286
287 file: Arc<RandomAccessFile>,
288}
289
290type JoinResult<T> = Result<T, tokio::task::JoinError>;
291
292#[derive(Default)]
293enum ARAFCState {
294 Idle(ARAFCCore),
295 Reading {
296 fut: BoxFuture<'static, JoinResult<Result<ARAFCCore, io::Error>>>,
297 },
298
299 #[default]
300 Transitioning,
301}
302
303pub struct AsyncRandomAccessFileCursor {
305 state: ARAFCState,
306}
307
308impl AsyncRead for AsyncRandomAccessFileCursor {
309 fn poll_read(
310 mut self: Pin<&mut Self>,
311 cx: &mut Context<'_>,
312 buf: &mut ReadBuf<'_>,
313 ) -> Poll<io::Result<()>> {
314 match &mut self.state {
315 ARAFCState::Idle(core) => {
316 if core.inner_buf_offset < core.inner_buf_len {
317 let read_len =
320 cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
321
322 buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
323 core.inner_buf_offset += read_len;
324 trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer");
325
326 return Poll::Ready(Ok(()));
327 }
328
329 #[allow(unused_variables, clippy::let_unit_value)]
331 let core = ();
332
333 let (file_offset, file, mut inner_buf) = {
334 let core = match std::mem::take(&mut self.state) {
335 ARAFCState::Idle(core) => core,
336 _ => unreachable!(),
337 };
338 (core.file_offset, core.file, core.inner_buf)
339 };
340
341 let fut = Box::pin(tokio::task::spawn_blocking(move || {
342 let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
343 trace!(%read_bytes, "read from file");
344 Ok(ARAFCCore {
345 file_offset: file_offset + read_bytes as u64,
346 file,
347 inner_buf,
348 inner_buf_len: read_bytes,
349 inner_buf_offset: 0,
350 })
351 }));
352 self.state = ARAFCState::Reading { fut };
353 self.poll_read(cx, buf)
354 }
355 ARAFCState::Reading { fut } => {
356 let core =
357 futures_util::ready!(fut.as_mut().poll(cx).map_err(io::Error::other)??);
358 let is_eof = core.inner_buf_len == 0;
359 self.state = ARAFCState::Idle(core);
360
361 if is_eof {
362 return Poll::Ready(Ok(()));
364 }
365 self.poll_read(cx, buf)
366 }
367 ARAFCState::Transitioning => unreachable!(),
368 }
369 }
370}
371
372pub trait ReadZipStreaming<R>
377where
378 R: AsyncRead,
379{
380 #[allow(async_fn_in_trait)]
386 async fn stream_zip_entries_throwing_caution_to_the_wind(
387 self,
388 ) -> Result<StreamingEntryReader<R>, Error>;
389}
390
391impl<R> ReadZipStreaming<R> for R
392where
393 R: AsyncRead + Unpin,
394{
395 async fn stream_zip_entries_throwing_caution_to_the_wind(
396 mut self,
397 ) -> Result<StreamingEntryReader<Self>, Error> {
398 let mut fsm = EntryFsm::new(None, None);
399
400 loop {
401 if fsm.wants_read() {
402 let n = self.read(fsm.space()).await?;
403 trace!("read {} bytes into buf for first zip entry", n);
404 fsm.fill(n);
405 }
406
407 if let Some(entry) = fsm.process_till_header()? {
408 let entry = entry.clone();
409 return Ok(StreamingEntryReader::new(fsm, entry, self));
410 }
411 }
412 }
413}