rc_zip_sync/
streaming_entry_reader.rs1use oval::Buffer;
2use rc_zip::{
3 error::FormatError,
4 fsm::{EntryFsm, FsmResult},
5 Entry, Error,
6};
7use std::io::{self, Read};
8use tracing::trace;
9
10pub struct StreamingEntryReader<R> {
16 entry: Entry,
17 rd: R,
18 state: State,
19}
20
21#[derive(Default)]
22#[allow(clippy::large_enum_variant)]
23enum State {
24 Reading {
25 fsm: EntryFsm,
26 },
27 Finished {
28 remain: Buffer,
30 },
31 #[default]
32 Transition,
33}
34
35impl<R> StreamingEntryReader<R>
36where
37 R: io::Read,
38{
39 pub(crate) fn new(fsm: EntryFsm, entry: Entry, rd: R) -> Self {
40 Self {
41 entry,
42 rd,
43 state: State::Reading { fsm },
44 }
45 }
46}
47
48impl<R> io::Read for StreamingEntryReader<R>
49where
50 R: io::Read,
51{
52 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
53 trace!("reading from streaming entry reader");
54
55 match std::mem::take(&mut self.state) {
56 State::Reading { mut fsm } => {
57 if fsm.wants_read() {
58 trace!("fsm wants read");
59 let n = self.rd.read(fsm.space())?;
60 trace!("giving fsm {} bytes from rd", n);
61 fsm.fill(n);
62 } else {
63 trace!("fsm does not want read");
64 }
65
66 match fsm.process(buf)? {
67 FsmResult::Continue((fsm, outcome)) => {
68 trace!("fsm wants to continue");
69 self.state = State::Reading { fsm };
70
71 if outcome.bytes_written > 0 {
72 trace!("bytes have been written");
73 Ok(outcome.bytes_written)
74 } else if outcome.bytes_read == 0 {
75 trace!("no bytes have been written or read");
76 Ok(0)
78 } else {
79 trace!("read some bytes, hopefully will write more later");
80 self.read(buf)
82 }
83 }
84 FsmResult::Done(remain) => {
85 self.state = State::Finished { remain };
86
87 Ok(0)
89 }
90 }
91 }
92 State::Finished { remain } => {
93 self.state = State::Finished { remain };
95 Ok(0)
96 }
97 State::Transition => unreachable!(),
98 }
99 }
100}
101
102impl<R> StreamingEntryReader<R>
103where
104 R: io::Read,
105{
106 #[inline(always)]
108 pub fn entry(&self) -> &Entry {
109 &self.entry
110 }
111
112 pub fn finish(mut self) -> Result<Option<StreamingEntryReader<R>>, Error> {
117 trace!("finishing streaming entry reader");
118
119 if matches!(self.state, State::Reading { .. }) {
120 _ = self.read(&mut [0u8; 1])?;
122 }
123
124 match self.state {
125 State::Reading { .. } => {
126 panic!("entry not fully read");
127 }
128 State::Finished { remain } => {
129 let mut fsm = EntryFsm::new(None, Some(remain));
131
132 loop {
133 if fsm.wants_read() {
134 let n = self.rd.read(fsm.space())?;
135 trace!("read {} bytes into buf for first zip entry", n);
136 fsm.fill(n);
137 }
138
139 match fsm.process_till_header() {
140 Ok(Some(entry)) => {
141 let entry = entry.clone();
142 return Ok(Some(StreamingEntryReader::new(fsm, entry, self.rd)));
143 }
144 Ok(None) => {
145 }
147 Err(e) => match e {
148 Error::Format(FormatError::InvalidLocalHeader) => {
149 return Ok(None);
152 }
153 _ => return Err(e),
154 },
155 }
156 }
157 }
158 State::Transition => unreachable!(),
159 }
160 }
161}