use crate::chunked_read::ChunkInfo;
use crate::error::FormatError;
use crate::filter_pipeline::FilterPipeline;
use crate::filters::decompress_chunk;
use crate::lane_partition::{self, LaneStats, PartitionStats};
const PARALLEL_THRESHOLD: usize = 4;
struct DecompressedChunk {
index: usize,
data: Vec<u8>,
}
pub fn should_use_parallel(chunk_count: usize) -> bool {
chunk_count > PARALLEL_THRESHOLD
}
pub fn decompress_chunks_lane_partitioned(
file_data: &[u8],
chunks: &[ChunkInfo],
pipeline: &FilterPipeline,
chunk_total_bytes: usize,
element_size: u32,
seed: u64,
num_lanes: Option<usize>,
) -> Result<(Vec<Vec<u8>>, PartitionStats), FormatError> {
use rayon::prelude::*;
let lanes = num_lanes.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
});
let assignments = lane_partition::partition_chunks(chunks.len(), lanes, seed);
let num_lanes = assignments.len();
let lane_results: Result<Vec<(Vec<DecompressedChunk>, LaneStats)>, FormatError> = assignments
.into_par_iter()
.map(|indices| {
let mut results = Vec::with_capacity(indices.len());
let mut stats = LaneStats::default();
for &index in &indices {
let chunk_info = &chunks[index];
let c_addr = chunk_info.address as usize;
let size = chunk_info.chunk_size as usize;
if c_addr + size > file_data.len() {
return Err(FormatError::UnexpectedEof {
expected: c_addr + size,
available: file_data.len(),
});
}
let raw_chunk = &file_data[c_addr..c_addr + size];
let decompressed = if chunk_info.filter_mask == 0 {
decompress_chunk(raw_chunk, pipeline, chunk_total_bytes, element_size)?
} else {
raw_chunk.to_vec()
};
stats.chunks_processed += 1;
stats.compressed_bytes += size as u64;
stats.decompressed_bytes += decompressed.len() as u64;
results.push(DecompressedChunk { index, data: decompressed });
}
Ok((results, stats))
})
.collect();
let lane_results = lane_results?;
let mut partition_stats = PartitionStats::new(num_lanes);
partition_stats.total_chunks = chunks.len();
for (lane_idx, (_, stats)) in lane_results.iter().enumerate() {
partition_stats.per_lane[lane_idx] = stats.clone();
}
let mut all_chunks: Vec<DecompressedChunk> = lane_results
.into_iter()
.flat_map(|(chunks, _)| chunks)
.collect();
all_chunks.sort_by_key(|dc| dc.index);
let ordered = all_chunks.into_iter().map(|dc| dc.data).collect();
Ok((ordered, partition_stats))
}
pub fn decompress_chunks_parallel(
file_data: &[u8],
chunks: &[ChunkInfo],
pipeline: &FilterPipeline,
chunk_total_bytes: usize,
element_size: u32,
) -> Result<Vec<Vec<u8>>, FormatError> {
use rayon::prelude::*;
let results: Result<Vec<DecompressedChunk>, FormatError> = chunks
.par_iter()
.enumerate()
.map(|(index, chunk_info)| {
let c_addr = chunk_info.address as usize;
let size = chunk_info.chunk_size as usize;
if c_addr + size > file_data.len() {
return Err(FormatError::UnexpectedEof {
expected: c_addr + size,
available: file_data.len(),
});
}
let raw_chunk = &file_data[c_addr..c_addr + size];
let decompressed = if chunk_info.filter_mask == 0 {
decompress_chunk(raw_chunk, pipeline, chunk_total_bytes, element_size)?
} else {
raw_chunk.to_vec()
};
Ok(DecompressedChunk { index, data: decompressed })
})
.collect();
let mut result_vec = results?;
result_vec.sort_by_key(|dc| dc.index);
Ok(result_vec.into_iter().map(|dc| dc.data).collect())
}
pub fn decompress_chunks_sequential(
file_data: &[u8],
chunks: &[ChunkInfo],
pipeline: Option<&FilterPipeline>,
chunk_total_bytes: usize,
element_size: u32,
) -> Result<Vec<Vec<u8>>, FormatError> {
let mut result = Vec::with_capacity(chunks.len());
for chunk_info in chunks {
let c_addr = chunk_info.address as usize;
let size = chunk_info.chunk_size as usize;
if c_addr + size > file_data.len() {
return Err(FormatError::UnexpectedEof {
expected: c_addr + size,
available: file_data.len(),
});
}
let raw_chunk = &file_data[c_addr..c_addr + size];
let decompressed = if let Some(pl) = pipeline {
if chunk_info.filter_mask == 0 {
decompress_chunk(raw_chunk, pl, chunk_total_bytes, element_size)?
} else {
raw_chunk.to_vec()
}
} else {
raw_chunk.to_vec()
};
result.push(decompressed);
}
Ok(result)
}