use std::collections::HashMap;
use std::io::{self, Read, Seek, SeekFrom};
use std::num::NonZeroUsize;
use crate::format::parser::ArchiveHeader;
use crate::s3fifo::S3FifoCache;
use crate::{Error, READ_BUFFER_SIZE, Result};
struct CachedDecoder {
decoder: Box<dyn Read + Send>,
byte_offset: u64,
total_size: u64,
}
pub struct DecoderPool {
cache: S3FifoCache<usize, CachedDecoder>,
stats: PoolStats,
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub hits: u64,
pub misses: u64,
pub bytes_skipped: u64,
pub bytes_redecompressed: u64,
}
impl PoolStats {
pub fn hit_ratio(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
0.0
} else {
self.hits as f64 / total as f64
}
}
pub fn bytes_saved(&self) -> u64 {
self.bytes_skipped
}
}
impl DecoderPool {
pub fn new(capacity: usize) -> Self {
let cap = NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::MIN);
Self {
cache: S3FifoCache::new(cap),
stats: PoolStats::default(),
}
}
pub fn auto_sized() -> Self {
let cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
Self::new(cpus)
}
pub fn with_multiplier(multiplier: usize) -> Self {
let cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
Self::new(cpus.saturating_mul(multiplier).max(1))
}
pub fn detected_cpu_count() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
}
}
impl Default for DecoderPool {
fn default() -> Self {
Self::auto_sized()
}
}
impl DecoderPool {
pub fn capacity(&self) -> usize {
self.cache.capacity()
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
pub fn stats(&self) -> &PoolStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = PoolStats::default();
}
pub fn clear(&mut self) {
self.cache.clear();
}
pub fn get_decoder<R: Read + Seek + Send>(
&mut self,
header: &ArchiveHeader,
source: &mut R,
folder_index: usize,
target_offset: u64,
pack_start: u64,
) -> Result<PooledDecoder> {
if let Some(cached) = self.cache.pop(&folder_index) {
if cached.byte_offset <= target_offset {
let skip_bytes = target_offset - cached.byte_offset;
self.stats.hits += 1;
self.stats.bytes_skipped += cached.byte_offset;
return Ok(PooledDecoder {
folder_index,
decoder: cached.decoder,
byte_offset: cached.byte_offset,
total_size: cached.total_size,
skip_remaining: skip_bytes,
});
}
}
self.stats.misses += 1;
self.stats.bytes_redecompressed += target_offset;
let decoder = self.create_decoder(header, source, folder_index, pack_start)?;
let total_size = self.get_folder_uncompressed_size(header, folder_index);
Ok(PooledDecoder {
folder_index,
decoder,
byte_offset: 0,
total_size,
skip_remaining: target_offset,
})
}
pub fn return_decoder(&mut self, decoder: PooledDecoder) {
if decoder.byte_offset < decoder.total_size {
let cached = CachedDecoder {
decoder: decoder.decoder,
byte_offset: decoder.byte_offset,
total_size: decoder.total_size,
};
self.cache.insert(decoder.folder_index, cached);
}
}
fn create_decoder<R: Read + Seek + Send>(
&self,
header: &ArchiveHeader,
source: &mut R,
folder_index: usize,
pack_start: u64,
) -> Result<Box<dyn Read + Send>> {
let unpack_info = header
.unpack_info
.as_ref()
.ok_or_else(|| Error::InvalidFormat("missing unpack info".into()))?;
let folder = unpack_info.folders.get(folder_index).ok_or_else(|| {
Error::InvalidFormat(format!("folder index {} out of range", folder_index))
})?;
if folder.coders.is_empty() {
return Err(Error::InvalidFormat("folder has no coders".into()));
}
let pack_offset = self.calculate_pack_offset(header, folder_index, pack_start)?;
let pack_size = header
.pack_info
.as_ref()
.and_then(|pi| pi.pack_sizes.get(folder_index).copied())
.unwrap_or(0);
source
.seek(SeekFrom::Start(pack_offset))
.map_err(Error::Io)?;
let mut packed_data = vec![0u8; pack_size as usize];
source.read_exact(&mut packed_data).map_err(Error::Io)?;
let coder = &folder.coders[0];
let uncompressed_size = folder.final_unpack_size().unwrap_or(0);
let cursor = std::io::Cursor::new(packed_data);
let decoder = crate::codec::build_decoder(cursor, coder, uncompressed_size)?;
Ok(Box::new(decoder))
}
fn calculate_pack_offset(
&self,
header: &ArchiveHeader,
folder_index: usize,
pack_start: u64,
) -> Result<u64> {
let pack_info = header
.pack_info
.as_ref()
.ok_or_else(|| Error::InvalidFormat("missing pack info".into()))?;
let mut offset = pack_start + pack_info.pack_pos;
for i in 0..folder_index {
if let Some(&size) = pack_info.pack_sizes.get(i) {
offset += size;
}
}
Ok(offset)
}
fn get_folder_uncompressed_size(&self, header: &ArchiveHeader, folder_index: usize) -> u64 {
header
.unpack_info
.as_ref()
.and_then(|ui| ui.folders.get(folder_index))
.and_then(|f| f.final_unpack_size())
.unwrap_or(0)
}
}
pub struct PooledDecoder {
folder_index: usize,
decoder: Box<dyn Read + Send>,
byte_offset: u64,
total_size: u64,
skip_remaining: u64,
}
impl PooledDecoder {
pub fn folder_index(&self) -> usize {
self.folder_index
}
pub fn byte_offset(&self) -> u64 {
self.byte_offset
}
pub fn total_size(&self) -> u64 {
self.total_size
}
pub fn remaining(&self) -> u64 {
self.total_size.saturating_sub(self.byte_offset)
}
pub fn skip_to_offset(&mut self) -> io::Result<()> {
if self.skip_remaining > 0 {
let mut remaining = self.skip_remaining;
let mut buf = [0u8; READ_BUFFER_SIZE];
while remaining > 0 {
let to_read = (remaining as usize).min(buf.len());
let n = self.decoder.read(&mut buf[..to_read])?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF while skipping",
));
}
remaining -= n as u64;
self.byte_offset += n as u64;
}
self.skip_remaining = 0;
}
Ok(())
}
}
impl Read for PooledDecoder {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.skip_to_offset()?;
let n = self.decoder.read(buf)?;
self.byte_offset += n as u64;
Ok(n)
}
}
#[derive(Debug, Clone)]
pub struct SolidEntryLocator {
entries: HashMap<usize, EntryLocation>,
}
#[derive(Debug, Clone, Copy)]
pub struct EntryLocation {
pub folder_index: usize,
pub offset: u64,
pub size: u64,
}
impl SolidEntryLocator {
pub fn from_header(header: &ArchiveHeader) -> Self {
let mut entries = HashMap::new();
let files_info = match &header.files_info {
Some(fi) => fi,
None => return Self { entries },
};
let substreams = match &header.substreams_info {
Some(ss) => ss,
None => return Self { entries },
};
let mut entry_idx = 0;
let mut stream_offset = 0usize;
for (folder_idx, &num_streams) in
substreams.num_unpack_streams_in_folders.iter().enumerate()
{
let mut folder_offset = 0u64;
for stream_idx in 0..(num_streams as usize) {
while entry_idx < files_info.entries.len() {
let file = &files_info.entries[entry_idx];
if file.is_directory {
entry_idx += 1;
continue;
}
break;
}
if entry_idx >= files_info.entries.len() {
break;
}
let size = substreams
.unpack_sizes
.get(stream_offset + stream_idx)
.copied()
.unwrap_or(0);
entries.insert(
entry_idx,
EntryLocation {
folder_index: folder_idx,
offset: folder_offset,
size,
},
);
folder_offset += size;
entry_idx += 1;
}
stream_offset += num_streams as usize;
}
Self { entries }
}
pub fn get(&self, entry_index: usize) -> Option<&EntryLocation> {
self.entries.get(&entry_index)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_stats_hit_ratio() {
let stats = PoolStats {
hits: 3,
misses: 1,
bytes_skipped: 1000,
bytes_redecompressed: 500,
};
assert!((stats.hit_ratio() - 0.75).abs() < f64::EPSILON);
}
#[test]
fn test_pool_stats_zero_total() {
let stats = PoolStats::default();
assert!((stats.hit_ratio() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_decoder_pool_capacity() {
let pool = DecoderPool::new(4);
assert_eq!(pool.capacity(), 4);
assert!(pool.is_empty());
}
#[test]
fn test_decoder_pool_clear() {
let mut pool = DecoderPool::new(4);
pool.clear();
assert!(pool.is_empty());
}
#[test]
fn test_decoder_pool_auto_sized() {
let pool = DecoderPool::auto_sized();
let expected = DecoderPool::detected_cpu_count();
assert_eq!(pool.capacity(), expected);
assert!(pool.capacity() >= 1);
}
#[test]
fn test_decoder_pool_with_multiplier() {
let pool = DecoderPool::with_multiplier(2);
let expected = DecoderPool::detected_cpu_count() * 2;
assert_eq!(pool.capacity(), expected);
}
#[test]
fn test_decoder_pool_with_multiplier_zero() {
let pool = DecoderPool::with_multiplier(0);
assert_eq!(pool.capacity(), 1);
}
#[test]
fn test_decoder_pool_default() {
let pool = DecoderPool::default();
let auto_pool = DecoderPool::auto_sized();
assert_eq!(pool.capacity(), auto_pool.capacity());
}
#[test]
fn test_detected_cpu_count() {
let count = DecoderPool::detected_cpu_count();
assert!(count >= 1, "CPU count should be at least 1");
}
}