1use std::{
2 cmp, io,
3 ops::Deref,
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use futures_util::future::BoxFuture;
10use positioned_io::{RandomAccessFile, ReadAt, Size};
11use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
12
13use rc_zip::{
14 fsm::{ArchiveFsm, EntryFsm, FsmResult},
15 Archive, Entry, Error,
16};
17use tracing::trace;
18
19use crate::{entry_reader::EntryReader, StreamingEntryReader};
20
21pub trait ReadZipWithSize {
25 type File: HasCursor;
27
28 #[allow(async_fn_in_trait)]
30 async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, Self::File>, Error>;
31}
32
33pub trait ReadZip {
39 type File: HasCursor;
41
42 #[allow(async_fn_in_trait)]
44 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error>;
45}
46
47struct CursorState<'a, F: HasCursor + 'a> {
48 cursor: <F as HasCursor>::Cursor<'a>,
49 offset: u64,
50}
51
52impl<'a, F: HasCursor + 'a> CursorState<'a, F> {
53 fn try_new(has_cursor: &'a F, offset: u64, size: u64) -> Result<Self, Error> {
55 if offset > size {
56 return Err(std::io::Error::other(format!(
57 "archive tried reading beyond zip archive end. {offset} goes beyond {size}"
58 ))
59 .into());
60 }
61 let cursor = has_cursor.cursor_at(offset);
62 Ok(Self { cursor, offset })
63 }
64}
65
66impl<F> ReadZipWithSize for F
67where
68 F: HasCursor,
69{
70 type File = F;
71
72 async fn read_zip_with_size(&self, size: u64) -> Result<ArchiveHandle<'_, F>, Error> {
73 let mut cstate: Option<CursorState<'_, F>> = None;
74
75 let mut fsm = ArchiveFsm::new(size);
76 loop {
77 if let Some(offset) = fsm.wants_read() {
78 let mut cstate_next = match cstate.take() {
79 Some(cstate) if cstate.offset == offset => cstate,
81 Some(cstate) => {
82 trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");
83 CursorState::try_new(self, offset, size)?
84 }
85 None => {
86 trace!(%offset, "read_zip_with_size: making new cursor (had none)");
87 CursorState::try_new(self, offset, size)?
88 }
89 };
90
91 match cstate_next.cursor.read(fsm.space()).await {
92 Ok(read_bytes) => {
93 cstate_next.offset += read_bytes as u64;
94 cstate = Some(cstate_next);
95
96 trace!(%read_bytes, "filling fsm");
97 if read_bytes == 0 {
98 return Err(Error::IO(io::ErrorKind::UnexpectedEof.into()));
99 }
100 fsm.fill(read_bytes);
101 }
102 Err(err) => return Err(Error::IO(err)),
103 }
104 }
105
106 fsm = match fsm.process()? {
107 FsmResult::Done(archive) => {
108 return Ok(ArchiveHandle {
109 file: self,
110 archive,
111 })
112 }
113 FsmResult::Continue(fsm) => fsm,
114 }
115 }
116 }
117}
118
119impl ReadZip for &[u8] {
120 type File = Self;
121
122 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
123 self.read_zip_with_size(self.len() as u64).await
124 }
125}
126
127impl ReadZip for Vec<u8> {
128 type File = Self;
129
130 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
131 self.read_zip_with_size(self.len() as u64).await
132 }
133}
134
135impl ReadZip for Arc<RandomAccessFile> {
136 type File = Self;
137
138 async fn read_zip(&self) -> Result<ArchiveHandle<'_, Self::File>, Error> {
139 let size = self.size()?.unwrap_or_default();
140 self.read_zip_with_size(size).await
141 }
142}
143
144pub struct ArchiveHandle<'a, F>
146where
147 F: HasCursor,
148{
149 file: &'a F,
150 archive: Archive,
151}
152
153impl<F> Deref for ArchiveHandle<'_, F>
154where
155 F: HasCursor,
156{
157 type Target = Archive;
158
159 fn deref(&self) -> &Self::Target {
160 &self.archive
161 }
162}
163
164impl<F> ArchiveHandle<'_, F>
165where
166 F: HasCursor,
167{
168 pub fn entries(&self) -> impl Iterator<Item = EntryHandle<'_, F>> {
170 self.archive.entries().map(move |entry| EntryHandle {
171 file: self.file,
172 entry,
173 })
174 }
175
176 pub fn by_name<N: AsRef<str>>(&self, name: N) -> Option<EntryHandle<'_, F>> {
179 self.archive
180 .entries()
181 .find(|&x| x.name == name.as_ref())
182 .map(|entry| EntryHandle {
183 file: self.file,
184 entry,
185 })
186 }
187}
188
189pub struct EntryHandle<'a, F> {
191 file: &'a F,
192 entry: &'a Entry,
193}
194
195impl<F> Deref for EntryHandle<'_, F> {
196 type Target = Entry;
197
198 fn deref(&self) -> &Self::Target {
199 self.entry
200 }
201}
202
203impl<'a, F> EntryHandle<'a, F>
204where
205 F: HasCursor,
206{
207 pub fn reader(&self) -> impl AsyncRead + Unpin + '_ {
209 EntryReader::new(self.entry, |offset| self.file.cursor_at(offset))
210 }
211
212 pub async fn bytes(&self) -> io::Result<Vec<u8>> {
214 let mut v = Vec::new();
215 self.reader().read_to_end(&mut v).await?;
216 Ok(v)
217 }
218}
219
220pub trait HasCursor {
222 type Cursor<'a>: AsyncRead + Unpin + 'a
224 where
225 Self: 'a;
226
227 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
229}
230
231impl HasCursor for &[u8] {
232 type Cursor<'a>
233 = &'a [u8]
234 where
235 Self: 'a;
236
237 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
238 &self[offset.try_into().unwrap()..]
239 }
240}
241
242impl HasCursor for Vec<u8> {
243 type Cursor<'a>
244 = &'a [u8]
245 where
246 Self: 'a;
247
248 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
249 &self[offset.try_into().unwrap()..]
250 }
251}
252
253impl HasCursor for Arc<RandomAccessFile> {
254 type Cursor<'a>
255 = AsyncRandomAccessFileCursor
256 where
257 Self: 'a;
258
259 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
260 AsyncRandomAccessFileCursor {
261 state: ARAFCState::Idle(ARAFCCore {
262 file_offset: offset,
263 inner_buf: vec![0u8; 128 * 1024],
264 inner_buf_len: 0,
266 inner_buf_offset: 0,
267 file: self.clone(),
268 }),
269 }
270 }
271}
272
273struct ARAFCCore {
274 file_offset: u64,
276
277 inner_buf: Vec<u8>,
279
280 inner_buf_offset: usize,
282
283 inner_buf_len: usize,
285
286 file: Arc<RandomAccessFile>,
287}
288
289type JoinResult<T> = Result<T, tokio::task::JoinError>;
290
291#[derive(Default)]
292enum ARAFCState {
293 Idle(ARAFCCore),
294 Reading {
295 fut: BoxFuture<'static, JoinResult<Result<ARAFCCore, io::Error>>>,
296 },
297
298 #[default]
299 Transitioning,
300}
301
302pub struct AsyncRandomAccessFileCursor {
304 state: ARAFCState,
305}
306
307impl AsyncRead for AsyncRandomAccessFileCursor {
308 fn poll_read(
309 mut self: Pin<&mut Self>,
310 cx: &mut Context<'_>,
311 buf: &mut ReadBuf<'_>,
312 ) -> Poll<io::Result<()>> {
313 match &mut self.state {
314 ARAFCState::Idle(core) => {
315 if core.inner_buf_offset < core.inner_buf_len {
316 let read_len =
319 cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
320
321 buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
322 core.inner_buf_offset += read_len;
323 trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer");
324
325 return Poll::Ready(Ok(()));
326 }
327
328 #[allow(unused_variables, clippy::let_unit_value)]
330 let core = ();
331
332 let (file_offset, file, mut inner_buf) = {
333 let core = match std::mem::take(&mut self.state) {
334 ARAFCState::Idle(core) => core,
335 _ => unreachable!(),
336 };
337 (core.file_offset, core.file, core.inner_buf)
338 };
339
340 let fut = Box::pin(tokio::task::spawn_blocking(move || {
341 let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
342 trace!(%read_bytes, "read from file");
343 Ok(ARAFCCore {
344 file_offset: file_offset + read_bytes as u64,
345 file,
346 inner_buf,
347 inner_buf_len: read_bytes,
348 inner_buf_offset: 0,
349 })
350 }));
351 self.state = ARAFCState::Reading { fut };
352 self.poll_read(cx, buf)
353 }
354 ARAFCState::Reading { fut } => {
355 let core =
356 futures_util::ready!(fut.as_mut().poll(cx).map_err(io::Error::other)??);
357 let is_eof = core.inner_buf_len == 0;
358 self.state = ARAFCState::Idle(core);
359
360 if is_eof {
361 return Poll::Ready(Ok(()));
363 }
364 self.poll_read(cx, buf)
365 }
366 ARAFCState::Transitioning => unreachable!(),
367 }
368 }
369}
370
371pub trait ReadZipStreaming<R>
376where
377 R: AsyncRead,
378{
379 #[allow(async_fn_in_trait)]
385 async fn stream_zip_entries_throwing_caution_to_the_wind(
386 self,
387 ) -> Result<StreamingEntryReader<R>, Error>;
388}
389
390impl<R> ReadZipStreaming<R> for R
391where
392 R: AsyncRead + Unpin,
393{
394 async fn stream_zip_entries_throwing_caution_to_the_wind(
395 mut self,
396 ) -> Result<StreamingEntryReader<Self>, Error> {
397 let mut fsm = EntryFsm::new(None, None);
398
399 loop {
400 if fsm.wants_read() {
401 let n = self.read(fsm.space()).await?;
402 trace!("read {} bytes into buf for first zip entry", n);
403 fsm.fill(n);
404 }
405
406 if let Some(entry) = fsm.process_till_header()? {
407 let entry = entry.clone();
408 return Ok(StreamingEntryReader::new(fsm, entry, self));
409 }
410 }
411 }
412}