use std::fs::File;
use std::io::{BufReader, Read, Seek};
use std::path::Path;
use crate::format::header::StartHeader;
use crate::format::parser::{ArchiveHeader, read_archive_header};
use crate::format::streams::ResourceLimits;
use crate::read::{Entry, ExtractOptions};
use crate::{ArchivePath, Error, Result};
#[cfg(feature = "aes")]
use crate::Password;
use super::config::StreamingConfig;
use super::iterator::EntryIterator;
use super::memory::MemoryTracker;
use super::parallel::{
ParallelExtractionOptions, ParallelExtractionResult, ParallelFolderExtractor,
};
use super::pool::{DecoderPool, PoolStats};
pub struct StreamingArchive<R> {
reader: R,
#[allow(dead_code)] start_header: StartHeader,
header: ArchiveHeader,
entries: Vec<Entry>,
skipped_entries: Vec<super::SkippedEntry>,
#[cfg(feature = "aes")]
password: Password,
config: StreamingConfig,
memory_tracker: MemoryTracker,
decoder_pool: Option<DecoderPool>,
is_solid: bool,
}
impl StreamingArchive<BufReader<File>> {
#[cfg(feature = "aes")]
pub fn open_path(path: impl AsRef<Path>, password: impl Into<Password>) -> Result<Self> {
Self::open_path_with_config(path, password, StreamingConfig::default())
}
#[cfg(not(feature = "aes"))]
pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
Self::open_path_with_config(path, StreamingConfig::default())
}
#[cfg(feature = "aes")]
pub fn open_path_with_config(
path: impl AsRef<Path>,
password: impl Into<Password>,
config: StreamingConfig,
) -> Result<Self> {
let file = File::open(path.as_ref()).map_err(Error::Io)?;
let reader = BufReader::new(file);
Self::open_with_config(reader, password, config)
}
#[cfg(not(feature = "aes"))]
pub fn open_path_with_config(path: impl AsRef<Path>, config: StreamingConfig) -> Result<Self> {
let file = File::open(path.as_ref()).map_err(Error::Io)?;
let reader = BufReader::new(file);
Self::open_with_config(reader, config)
}
}
impl<R: Read + Seek + Send> StreamingArchive<R> {
#[cfg(feature = "aes")]
pub fn open(reader: R, password: impl Into<Password>) -> Result<Self> {
Self::open_with_config(reader, password, StreamingConfig::default())
}
#[cfg(not(feature = "aes"))]
pub fn open(reader: R) -> Result<Self> {
Self::open_with_config(reader, StreamingConfig::default())
}
#[cfg(feature = "aes")]
pub fn open_with_config(
mut reader: R,
password: impl Into<Password>,
config: StreamingConfig,
) -> Result<Self> {
config.validate()?;
let limits = ResourceLimits::default()
.max_entries(config.max_entries)
.ratio_limit(Some(crate::format::streams::RatioLimit::new(
config.max_compression_ratio,
)));
let (start_header, header) = read_archive_header(&mut reader, Some(limits))?;
let (entries, skipped_entries) = Self::build_entries(&header);
let is_solid = super::check_is_solid(&header);
let memory_tracker = MemoryTracker::new(config.max_memory_buffer);
let decoder_pool = Self::create_decoder_pool(&config, is_solid);
Ok(Self {
reader,
start_header,
header,
entries,
skipped_entries,
password: password.into(),
config,
memory_tracker,
decoder_pool,
is_solid,
})
}
#[cfg(not(feature = "aes"))]
pub fn open_with_config(mut reader: R, config: StreamingConfig) -> Result<Self> {
config.validate()?;
let limits = ResourceLimits::default()
.max_entries(config.max_entries)
.ratio_limit(Some(crate::format::streams::RatioLimit::new(
config.max_compression_ratio,
)));
let (start_header, header) = read_archive_header(&mut reader, Some(limits))?;
let (entries, skipped_entries) = Self::build_entries(&header);
let is_solid = super::check_is_solid(&header);
let memory_tracker = MemoryTracker::new(config.max_memory_buffer);
let decoder_pool = Self::create_decoder_pool(&config, is_solid);
Ok(Self {
reader,
start_header,
header,
entries,
skipped_entries,
config,
memory_tracker,
decoder_pool,
is_solid,
})
}
fn create_decoder_pool(config: &StreamingConfig, is_solid: bool) -> Option<DecoderPool> {
if !is_solid {
return None;
}
let capacity = config.resolved_decoder_pool_capacity();
if capacity == 0 {
return None;
}
Some(DecoderPool::new(capacity))
}
fn build_entries(header: &ArchiveHeader) -> (Vec<Entry>, Vec<super::SkippedEntry>) {
let files_info = match &header.files_info {
Some(info) => info,
None => return (Vec::new(), Vec::new()),
};
let substreams = header.substreams_info.as_ref();
let mut entries = Vec::with_capacity(files_info.entries.len());
let mut skipped_entries = Vec::new();
let mut stream_idx: usize = 0;
let mut folder_idx: usize = 0;
for (idx, archive_entry) in files_info.entries.iter().enumerate() {
let path = match ArchivePath::new(&archive_entry.name) {
Ok(p) => p,
Err(e) => {
skipped_entries.push(super::SkippedEntry {
index: idx,
raw_path: Some(archive_entry.name.as_bytes().to_vec()),
reason: super::SkipReason::InvalidPath(e.to_string()),
});
continue;
}
};
let (folder_index, stream_index) = if archive_entry.is_directory {
(None, None)
} else {
let fi = folder_idx;
let si = stream_idx;
if let Some(ss) = substreams {
if folder_idx < ss.num_unpack_streams_in_folders.len() {
stream_idx += 1;
let num_streams = ss.num_unpack_streams_in_folders[folder_idx] as usize;
if stream_idx >= num_streams {
stream_idx = 0;
folder_idx += 1;
}
}
} else {
folder_idx += 1;
}
(Some(fi), Some(si))
};
let is_encrypted = folder_index
.and_then(|fi| Self::check_folder_encryption(header, fi))
.unwrap_or(false);
let is_symlink = !archive_entry.is_directory
&& archive_entry.attributes.is_some_and(|attrs| {
let unix_mode = (attrs >> 16) & 0xFFFF;
let is_unix_symlink = unix_mode != 0 && (unix_mode & 0o170000) == 0o120000;
let is_windows_symlink = (attrs & 0x400) != 0;
is_unix_symlink || is_windows_symlink
});
entries.push(Entry {
path,
is_directory: archive_entry.is_directory,
size: archive_entry.size,
crc32: archive_entry.crc,
crc64: None,
modification_time: archive_entry.mtime,
creation_time: archive_entry.ctime,
access_time: archive_entry.atime,
attributes: archive_entry.attributes,
is_encrypted,
is_symlink,
is_anti: archive_entry.is_anti,
ownership: None,
index: idx,
folder_index,
stream_index,
});
}
(entries, skipped_entries)
}
fn check_folder_encryption(header: &ArchiveHeader, folder_index: usize) -> Option<bool> {
let unpack_info = header.unpack_info.as_ref()?;
let folder = unpack_info.folders.get(folder_index)?;
Some(
folder
.coders
.iter()
.any(|coder| coder.method_id.as_slice() == crate::codec::method::AES),
)
}
pub fn header(&self) -> &ArchiveHeader {
&self.header
}
pub fn config(&self) -> &StreamingConfig {
&self.config
}
pub fn memory_tracker(&self) -> &MemoryTracker {
&self.memory_tracker
}
pub fn is_solid(&self) -> bool {
self.is_solid
}
pub fn entries_list(&self) -> &[Entry] {
&self.entries
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn skipped_entries(&self) -> &[super::SkippedEntry] {
&self.skipped_entries
}
pub fn has_skipped_entries(&self) -> bool {
!self.skipped_entries.is_empty()
}
pub fn entry(&self, path: &str) -> Option<&Entry> {
self.entries.iter().find(|e| e.path.as_str() == path)
}
pub fn total_size(&self) -> u64 {
self.entries.iter().map(|e| e.size).sum()
}
pub fn num_folders(&self) -> usize {
self.header
.unpack_info
.as_ref()
.map(|ui| ui.folders.len())
.unwrap_or(0)
}
pub fn has_decoder_pool(&self) -> bool {
self.decoder_pool.is_some()
}
pub fn pool_stats(&self) -> Option<&PoolStats> {
self.decoder_pool.as_ref().map(|p| p.stats())
}
pub fn pool_capacity(&self) -> Option<usize> {
self.decoder_pool.as_ref().map(|p| p.capacity())
}
pub fn reset_pool_stats(&mut self) {
if let Some(pool) = &mut self.decoder_pool {
pool.reset_stats();
}
}
pub fn clear_decoder_pool(&mut self) {
if let Some(pool) = &mut self.decoder_pool {
pool.clear();
}
}
#[cfg(feature = "aes")]
pub fn entries(&mut self) -> Result<EntryIterator<'_, R>> {
EntryIterator::new(
&self.header,
&self.entries,
&mut self.reader,
&self.password,
self.config.clone(),
)
}
#[cfg(not(feature = "aes"))]
pub fn entries(&mut self) -> Result<EntryIterator<'_, R>> {
EntryIterator::new(
&self.header,
&self.entries,
&mut self.reader,
self.config.clone(),
)
}
pub fn extract_all(
&mut self,
dest: impl AsRef<Path>,
_options: &ExtractOptions,
) -> Result<ExtractAllResult> {
let dest = dest.as_ref();
if !dest.exists() {
std::fs::create_dir_all(dest).map_err(Error::Io)?;
}
let mut result = ExtractAllResult::default();
let mut iter = self.entries()?;
while let Some(entry_result) = iter.next() {
match entry_result {
Ok(entry) => {
let entry_name = entry.name().to_string();
let entry_path = dest.join(&entry_name);
if entry.is_directory() {
if let Err(e) = std::fs::create_dir_all(&entry_path) {
result.entries_failed += 1;
result.failures.push((entry_name, e.to_string()));
} else {
result.entries_extracted += 1;
}
continue;
}
if let Some(parent) = entry_path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
result.entries_failed += 1;
result.failures.push((entry_name, e.to_string()));
continue;
}
}
match std::fs::File::create(&entry_path) {
Ok(mut file) => match iter.extract_current_to(&mut file) {
Ok(bytes) => {
result.entries_extracted += 1;
result.bytes_extracted += bytes;
}
Err(e) => {
result.entries_failed += 1;
result.failures.push((entry_name, e.to_string()));
}
},
Err(e) => {
result.entries_failed += 1;
result.failures.push((entry_name, e.to_string()));
}
}
}
Err(e) => {
result.entries_failed += 1;
result.failures.push(("unknown".to_string(), e.to_string()));
}
}
}
Ok(result)
}
pub fn extract_all_to_sinks<W, F>(&mut self, mut sink_factory: F) -> Result<ExtractAllResult>
where
W: std::io::Write,
F: FnMut(&Entry) -> Option<W>,
{
let mut result = ExtractAllResult::default();
let mut iter = self.entries()?;
while let Some(entry_result) = iter.next() {
match entry_result {
Ok(entry) => {
if entry.is_directory() {
result.entries_extracted += 1;
continue;
}
let entry_name = entry.name().to_string();
match sink_factory(entry.entry()) {
Some(mut sink) => match iter.extract_current_to(&mut sink) {
Ok(bytes) => {
result.entries_extracted += 1;
result.bytes_extracted += bytes;
}
Err(e) => {
result.entries_failed += 1;
result.failures.push((entry_name, e.to_string()));
}
},
None => {
result.entries_skipped += 1;
}
}
}
Err(e) => {
result.entries_failed += 1;
result.failures.push(("unknown".to_string(), e.to_string()));
}
}
}
Ok(result)
}
pub fn extract_all_parallel(
&mut self,
dest: impl AsRef<Path>,
options: &ParallelExtractionOptions,
) -> Result<ParallelExtractionResult> {
if self.is_solid {
return Err(Error::UnsupportedFeature {
feature: "parallel extraction for solid archives",
});
}
let extractor = ParallelFolderExtractor::new(&self.header, &self.entries, options.clone());
extractor.extract_to_directory(&mut self.reader, dest)
}
pub fn supports_parallel_extraction(&self) -> bool {
!self.is_solid
}
}
#[derive(Debug, Default)]
pub struct ExtractAllResult {
pub entries_extracted: usize,
pub entries_skipped: usize,
pub entries_failed: usize,
pub bytes_extracted: u64,
pub failures: Vec<(String, String)>,
}
impl ExtractAllResult {
pub fn is_success(&self) -> bool {
self.entries_failed == 0
}
pub fn total_processed(&self) -> usize {
self.entries_extracted + self.entries_skipped + self.entries_failed
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config_default() {
let config = StreamingConfig::default();
assert!(config.validate().is_ok());
}
#[test]
fn test_extract_all_result_default() {
let result = ExtractAllResult::default();
assert!(result.is_success());
assert_eq!(result.total_processed(), 0);
}
#[test]
fn test_extract_all_result_with_failures() {
let result = ExtractAllResult {
entries_extracted: 5,
entries_skipped: 2,
entries_failed: 1,
bytes_extracted: 1000,
failures: vec![("test.txt".to_string(), "error".to_string())],
};
assert!(!result.is_success());
assert_eq!(result.total_processed(), 8);
}
}