use oval::Buffer;
use rc_zip::{
error::FormatError,
fsm::{EntryFsm, FsmResult},
Entry, Error,
};
use std::io::{self, Read};
use tracing::trace;
pub struct StreamingEntryReader<R> {
entry: Entry,
rd: R,
state: State,
}
#[derive(Default)]
#[allow(clippy::large_enum_variant)]
enum State {
Reading {
fsm: EntryFsm,
},
Finished {
remain: Buffer,
},
#[default]
Transition,
}
impl<R> StreamingEntryReader<R>
where
R: io::Read,
{
pub(crate) fn new(fsm: EntryFsm, entry: Entry, rd: R) -> Self {
Self {
entry,
rd,
state: State::Reading { fsm },
}
}
}
impl<R> io::Read for StreamingEntryReader<R>
where
R: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace!("reading from streaming entry reader");
match std::mem::take(&mut self.state) {
State::Reading { mut fsm } => {
if fsm.wants_read() {
trace!("fsm wants read");
let n = self.rd.read(fsm.space())?;
trace!("giving fsm {} bytes from rd", n);
fsm.fill(n);
} else {
trace!("fsm does not want read");
}
match fsm.process(buf)? {
FsmResult::Continue((fsm, outcome)) => {
trace!("fsm wants to continue");
self.state = State::Reading { fsm };
if outcome.bytes_written > 0 {
trace!("bytes have been written");
Ok(outcome.bytes_written)
} else if outcome.bytes_read == 0 {
trace!("no bytes have been written or read");
Ok(0)
} else {
trace!("read some bytes, hopefully will write more later");
self.read(buf)
}
}
FsmResult::Done(remain) => {
self.state = State::Finished { remain };
Ok(0)
}
}
}
State::Finished { remain } => {
self.state = State::Finished { remain };
Ok(0)
}
State::Transition => unreachable!(),
}
}
}
impl<R> StreamingEntryReader<R>
where
R: io::Read,
{
#[inline(always)]
pub fn entry(&self) -> &Entry {
&self.entry
}
pub fn finish(mut self) -> Result<Option<StreamingEntryReader<R>>, Error> {
trace!("finishing streaming entry reader");
if matches!(self.state, State::Reading { .. }) {
_ = self.read(&mut [0u8; 1])?;
}
match self.state {
State::Reading { .. } => {
panic!("entry not fully read");
}
State::Finished { remain } => {
let mut fsm = EntryFsm::new(None, Some(remain));
loop {
if fsm.wants_read() {
let n = self.rd.read(fsm.space())?;
trace!("read {} bytes into buf for first zip entry", n);
fsm.fill(n);
}
match fsm.process_till_header() {
Ok(Some(entry)) => {
let entry = entry.clone();
return Ok(Some(StreamingEntryReader::new(fsm, entry, self.rd)));
}
Ok(None) => {
}
Err(e) => match e {
Error::Format(FormatError::InvalidLocalHeader) => {
return Ok(None);
}
_ => return Err(e),
},
}
}
}
State::Transition => unreachable!(),
}
}
}