use std::io::{self, Read, Seek, SeekFrom};
use crate::format::parser::ArchiveHeader;
use crate::format::streams::Folder;
use crate::read::Entry;
use crate::{Error, READ_BUFFER_SIZE, Result};
#[cfg(feature = "aes")]
use crate::Password;
use super::config::StreamingConfig;
pub struct EntryIterator<'a, R: Read + Seek> {
header: &'a ArchiveHeader,
entries: &'a [Entry],
source: &'a mut R,
#[cfg(feature = "aes")]
#[allow(dead_code)] password: &'a Password,
config: StreamingConfig,
current_index: usize,
current_folder: Option<usize>,
folder_decoder: Option<Box<dyn Read + Send + 'static>>,
stream_position_in_folder: usize,
bytes_remaining: u64,
pack_start: u64,
finished: bool,
}
impl<'a, R: Read + Seek + Send> EntryIterator<'a, R> {
#[cfg(feature = "aes")]
pub(crate) fn new(
header: &'a ArchiveHeader,
entries: &'a [Entry],
source: &'a mut R,
password: &'a Password,
config: StreamingConfig,
) -> Result<Self> {
let pack_start = super::calculate_pack_start(header);
Ok(Self {
header,
entries,
source,
password,
config,
current_index: 0,
current_folder: None,
folder_decoder: None,
stream_position_in_folder: 0,
bytes_remaining: 0,
pack_start,
finished: false,
})
}
#[cfg(not(feature = "aes"))]
pub(crate) fn new(
header: &'a ArchiveHeader,
entries: &'a [Entry],
source: &'a mut R,
config: StreamingConfig,
) -> Result<Self> {
let pack_start = super::calculate_pack_start(header);
Ok(Self {
header,
entries,
source,
config,
current_index: 0,
current_folder: None,
folder_decoder: None,
stream_position_in_folder: 0,
bytes_remaining: 0,
pack_start,
finished: false,
})
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn remaining(&self) -> usize {
self.entries.len().saturating_sub(self.current_index)
}
pub fn config(&self) -> &StreamingConfig {
&self.config
}
fn next_internal(&mut self) -> Result<Option<StreamingEntry<'a>>> {
if self.finished || self.current_index >= self.entries.len() {
return Ok(None);
}
let entry = &self.entries[self.current_index];
self.current_index += 1;
if entry.is_directory {
return Ok(Some(StreamingEntry::directory(entry)));
}
let folder_index = match entry.folder_index {
Some(idx) => idx,
None => {
return Ok(Some(StreamingEntry::empty(entry)));
}
};
let stream_index = entry.stream_index.unwrap_or(0);
if self.current_folder != Some(folder_index) {
self.init_folder_decoder(folder_index)?;
self.stream_position_in_folder = 0;
}
while self.stream_position_in_folder < stream_index {
let skip_size = self.get_stream_size(folder_index, self.stream_position_in_folder);
self.skip_bytes(skip_size)?;
self.stream_position_in_folder += 1;
}
let size = self.get_stream_size(folder_index, stream_index);
self.bytes_remaining = size;
self.stream_position_in_folder = stream_index + 1;
Ok(Some(StreamingEntry::with_size(entry, size)))
}
fn skip_bytes(&mut self, bytes: u64) -> Result<()> {
if let Some(decoder) = &mut self.folder_decoder {
io::copy(&mut decoder.take(bytes), &mut io::sink()).map_err(Error::Io)?;
}
Ok(())
}
fn init_folder_decoder(&mut self, folder_index: usize) -> Result<()> {
let folders = match &self.header.unpack_info {
Some(ui) => &ui.folders,
None => return Err(Error::InvalidFormat("missing unpack info".into())),
};
if folder_index >= folders.len() {
return Err(Error::InvalidFormat(format!(
"folder index {} out of range",
folder_index
)));
}
let folder = &folders[folder_index];
let pack_offset = self.calculate_folder_offset(folder_index)?;
self.source
.seek(SeekFrom::Start(pack_offset))
.map_err(Error::Io)?;
let decoder = self.build_folder_decoder(folder)?;
self.folder_decoder = Some(decoder);
self.current_folder = Some(folder_index);
Ok(())
}
fn get_stream_size(&self, folder_index: usize, stream_index: usize) -> u64 {
let ss = match &self.header.substreams_info {
Some(ss) => ss,
None => {
return self
.header
.unpack_info
.as_ref()
.and_then(|ui| ui.folders.get(folder_index))
.and_then(|f| f.final_unpack_size())
.unwrap_or(0);
}
};
let mut offset = 0usize;
for (i, &count) in ss.num_unpack_streams_in_folders.iter().enumerate() {
if i == folder_index {
return ss
.unpack_sizes
.get(offset + stream_index)
.copied()
.unwrap_or(0);
}
offset += count as usize;
}
0
}
fn calculate_folder_offset(&self, folder_index: usize) -> Result<u64> {
let pack_info = self
.header
.pack_info
.as_ref()
.ok_or_else(|| Error::InvalidFormat("missing pack info".into()))?;
let mut offset = self.pack_start;
for i in 0..folder_index {
if i < pack_info.pack_sizes.len() {
offset += pack_info.pack_sizes[i];
}
}
Ok(offset)
}
fn build_folder_decoder(&mut self, folder: &Folder) -> Result<Box<dyn Read + Send + 'static>> {
if folder.coders.is_empty() {
return Err(Error::InvalidFormat("folder has no coders".into()));
}
let coder = &folder.coders[0];
let uncompressed_size = folder.final_unpack_size().unwrap_or(0);
let folder_index = self.current_folder.unwrap_or(0);
let pack_size = self
.header
.pack_info
.as_ref()
.and_then(|pi| pi.pack_sizes.get(folder_index).copied())
.unwrap_or(0);
let mut packed_data = vec![0u8; pack_size as usize];
self.source
.read_exact(&mut packed_data)
.map_err(Error::Io)?;
let cursor = std::io::Cursor::new(packed_data);
let decoder = crate::codec::build_decoder(cursor, coder, uncompressed_size)?;
Ok(Box::new(decoder) as Box<dyn Read + Send + 'static>)
}
pub fn read_entry_data(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.bytes_remaining == 0 {
return Ok(0);
}
let decoder = match &mut self.folder_decoder {
Some(d) => d,
None => return Ok(0),
};
let to_read = buf.len().min(self.bytes_remaining as usize);
let n = decoder.read(&mut buf[..to_read])?;
self.bytes_remaining -= n as u64;
Ok(n)
}
#[allow(dead_code)] pub(crate) fn skip_remaining(&mut self) -> Result<()> {
self.skip_bytes(self.bytes_remaining)?;
self.bytes_remaining = 0;
Ok(())
}
pub fn extract_current_to<W: io::Write>(&mut self, sink: &mut W) -> Result<u64> {
let mut total_written = 0u64;
let mut buf = [0u8; READ_BUFFER_SIZE];
loop {
let n = self.read_entry_data(&mut buf)?;
if n == 0 {
break;
}
sink.write_all(&buf[..n]).map_err(Error::Io)?;
total_written += n as u64;
}
Ok(total_written)
}
pub fn extract_current_to_with_progress<W, F>(
&mut self,
sink: &mut W,
mut on_progress: F,
) -> Result<u64>
where
W: io::Write,
F: FnMut(u64, u64),
{
let total = self.bytes_remaining;
let mut total_written = 0u64;
let mut buf = [0u8; READ_BUFFER_SIZE];
loop {
let n = self.read_entry_data(&mut buf)?;
if n == 0 {
break;
}
sink.write_all(&buf[..n]).map_err(Error::Io)?;
total_written += n as u64;
on_progress(total_written, total);
}
Ok(total_written)
}
pub fn extract_current_to_vec(&mut self) -> Result<Vec<u8>> {
let mut data = Vec::with_capacity(self.bytes_remaining as usize);
self.extract_current_to(&mut data)?;
Ok(data)
}
pub fn current_entry_remaining(&self) -> u64 {
self.bytes_remaining
}
}
impl<'a, R: Read + Seek + Send> Iterator for EntryIterator<'a, R> {
type Item = Result<StreamingEntry<'a>>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_internal() {
Ok(Some(entry)) => Some(Ok(entry)),
Ok(None) => None,
Err(e) => {
self.finished = true;
Some(Err(e))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.remaining();
(remaining, Some(remaining))
}
}
impl<R: Read + Seek + Send> ExactSizeIterator for EntryIterator<'_, R> {}
#[allow(dead_code)] pub struct StreamingEntry<'a> {
entry: &'a Entry,
size: u64,
is_directory: bool,
bytes_read: u64,
buffer: Vec<u8>,
}
impl<'a> StreamingEntry<'a> {
fn directory(entry: &'a Entry) -> Self {
Self {
entry,
size: 0,
is_directory: true,
bytes_read: 0,
buffer: Vec::new(),
}
}
fn empty(entry: &'a Entry) -> Self {
Self {
entry,
size: 0,
is_directory: false,
bytes_read: 0,
buffer: Vec::new(),
}
}
fn with_size(entry: &'a Entry, size: u64) -> Self {
Self {
entry,
size,
is_directory: false,
bytes_read: 0,
buffer: Vec::new(),
}
}
pub fn entry(&self) -> &Entry {
self.entry
}
pub fn is_directory(&self) -> bool {
self.is_directory
}
pub fn size(&self) -> u64 {
self.size
}
pub fn name(&self) -> &str {
self.entry.path.as_str()
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read
}
pub fn remaining(&self) -> u64 {
self.size.saturating_sub(self.bytes_read)
}
pub fn skip(self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config_defaults() {
let config = StreamingConfig::default();
assert!(config.max_memory_buffer > 0);
assert!(config.read_buffer_size > 0);
}
#[test]
fn test_streaming_entry_directory() {
use crate::ArchivePath;
let entry = Entry {
path: ArchivePath::new("test").unwrap(),
is_directory: true,
size: 0,
crc32: None,
crc64: None,
modification_time: None,
creation_time: None,
access_time: None,
attributes: None,
is_encrypted: false,
is_symlink: false,
is_anti: false,
ownership: None,
index: 0,
folder_index: None,
stream_index: None,
};
let streaming = StreamingEntry::directory(&entry);
assert!(streaming.is_directory());
assert_eq!(streaming.size(), 0);
}
}