rc_zip_sync/
streaming_entry_reader.rs

1use oval::Buffer;
2use rc_zip::{
3    error::FormatError,
4    fsm::{EntryFsm, FsmResult},
5    Entry, Error,
6};
7use std::io::{self, Read};
8use tracing::trace;
9
10/// Reads a zip entry based on a local header. Some information is missing,
11/// not all name encodings may work, and only by reading it in its entirety
12/// can you move on to the next entry.
13///
14/// However, it only requires an [io::Read], and does not need to seek.
15pub struct StreamingEntryReader<R> {
16    entry: Entry,
17    rd: R,
18    state: State,
19}
20
21#[derive(Default)]
22#[allow(clippy::large_enum_variant)]
23enum State {
24    Reading {
25        fsm: EntryFsm,
26    },
27    Finished {
28        /// remaining buffer for next entry
29        remain: Buffer,
30    },
31    #[default]
32    Transition,
33}
34
35impl<R> StreamingEntryReader<R>
36where
37    R: io::Read,
38{
39    pub(crate) fn new(fsm: EntryFsm, entry: Entry, rd: R) -> Self {
40        Self {
41            entry,
42            rd,
43            state: State::Reading { fsm },
44        }
45    }
46}
47
48impl<R> io::Read for StreamingEntryReader<R>
49where
50    R: io::Read,
51{
52    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
53        trace!("reading from streaming entry reader");
54
55        match std::mem::take(&mut self.state) {
56            State::Reading { mut fsm } => {
57                if fsm.wants_read() {
58                    trace!("fsm wants read");
59                    let n = self.rd.read(fsm.space())?;
60                    trace!("giving fsm {} bytes from rd", n);
61                    fsm.fill(n);
62                } else {
63                    trace!("fsm does not want read");
64                }
65
66                match fsm.process(buf)? {
67                    FsmResult::Continue((fsm, outcome)) => {
68                        trace!("fsm wants to continue");
69                        self.state = State::Reading { fsm };
70
71                        if outcome.bytes_written > 0 {
72                            trace!("bytes have been written");
73                            Ok(outcome.bytes_written)
74                        } else if outcome.bytes_read == 0 {
75                            trace!("no bytes have been written or read");
76                            // that's EOF, baby!
77                            Ok(0)
78                        } else {
79                            trace!("read some bytes, hopefully will write more later");
80                            // loop, it happens
81                            self.read(buf)
82                        }
83                    }
84                    FsmResult::Done(remain) => {
85                        self.state = State::Finished { remain };
86
87                        // neat!
88                        Ok(0)
89                    }
90                }
91            }
92            State::Finished { remain } => {
93                // wait for them to call finish
94                self.state = State::Finished { remain };
95                Ok(0)
96            }
97            State::Transition => unreachable!(),
98        }
99    }
100}
101
102impl<R> StreamingEntryReader<R>
103where
104    R: io::Read,
105{
106    /// Return entry information for this reader
107    #[inline(always)]
108    pub fn entry(&self) -> &Entry {
109        &self.entry
110    }
111
112    /// Finish reading this entry, returning the next streaming entry reader, if
113    /// any. This panics if the entry is not fully read.
114    ///
115    /// If this returns None, there's no entries left.
116    pub fn finish(mut self) -> Result<Option<StreamingEntryReader<R>>, Error> {
117        trace!("finishing streaming entry reader");
118
119        if matches!(self.state, State::Reading { .. }) {
120            // this should transition to finished if there's no data
121            _ = self.read(&mut [0u8; 1])?;
122        }
123
124        match self.state {
125            State::Reading { .. } => {
126                panic!("entry not fully read");
127            }
128            State::Finished { remain } => {
129                // parse the next entry, if any
130                let mut fsm = EntryFsm::new(None, Some(remain));
131
132                loop {
133                    if fsm.wants_read() {
134                        let n = self.rd.read(fsm.space())?;
135                        trace!("read {} bytes into buf for first zip entry", n);
136                        fsm.fill(n);
137                    }
138
139                    match fsm.process_till_header() {
140                        Ok(Some(entry)) => {
141                            let entry = entry.clone();
142                            return Ok(Some(StreamingEntryReader::new(fsm, entry, self.rd)));
143                        }
144                        Ok(None) => {
145                            // needs more turns
146                        }
147                        Err(e) => match e {
148                            Error::Format(FormatError::InvalidLocalHeader) => {
149                                // we probably reached the end of central directory!
150                                // TODO: we should probably check for the end of central directory
151                                return Ok(None);
152                            }
153                            _ => return Err(e),
154                        },
155                    }
156                }
157            }
158            State::Transition => unreachable!(),
159        }
160    }
161}