use crossbeam::channel::{Receiver, Sender, bounded};
use std::sync::Arc;
use std::thread;
use hexz_common::{Error, Result};
use hexz_core::algo::compression::Compressor;
const MAX_CHUNKS_IN_FLIGHT: usize = 1000;
#[derive(Debug, Clone)]
pub struct RawChunk {
pub data: Vec<u8>,
pub logical_offset: u64,
}
#[derive(Debug)]
pub struct CompressedChunk {
pub compressed: Vec<u8>,
pub hash: [u8; 32],
pub logical_offset: u64,
pub original_size: usize,
}
#[derive(Debug, Clone)]
pub struct ParallelPackConfig {
pub num_workers: usize,
pub max_chunks_in_flight: usize,
}
impl Default for ParallelPackConfig {
fn default() -> Self {
Self {
num_workers: num_cpus::get(),
max_chunks_in_flight: MAX_CHUNKS_IN_FLIGHT,
}
}
}
fn compress_worker(
rx_chunks: &Receiver<RawChunk>,
tx_compressed: &Sender<CompressedChunk>,
compressor: &(dyn Compressor + Send + Sync),
) -> Result<()> {
for chunk in rx_chunks {
let compressed = compressor.compress(&chunk.data)?;
let hash = blake3::hash(&chunk.data);
tx_compressed
.send(CompressedChunk {
compressed,
hash: hash.into(),
logical_offset: chunk.logical_offset,
original_size: chunk.data.len(),
})
.map_err(|_| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Compressed chunk channel send failed",
))
})?;
}
Ok(())
}
pub fn process_chunks_parallel<W>(
chunks: Vec<RawChunk>,
compressor: Box<dyn Compressor + Send + Sync>,
config: &ParallelPackConfig,
mut writer: W,
) -> Result<()>
where
W: FnMut(CompressedChunk) -> Result<()>,
{
if chunks.is_empty() {
return Ok(());
}
let compressor = Arc::new(compressor);
let (tx_chunks, rx_chunks) = bounded::<RawChunk>(config.max_chunks_in_flight);
let (tx_compressed, rx_compressed) = bounded::<CompressedChunk>(config.max_chunks_in_flight);
let workers: Vec<_> = (0..config.num_workers)
.map(|_| {
let rx = rx_chunks.clone();
let tx = tx_compressed.clone();
let comp = compressor.clone();
thread::spawn(move || compress_worker(&rx, &tx, comp.as_ref().as_ref()))
})
.collect();
let feeder = {
let tx = tx_chunks.clone();
thread::spawn(move || {
for chunk in chunks {
if tx.send(chunk).is_err() {
break;
}
}
})
};
drop(tx_chunks);
drop(tx_compressed);
for compressed in rx_compressed {
writer(compressed)?;
}
for worker in workers {
worker
.join()
.map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))??;
}
feeder
.join()
.map_err(|_| Error::Io(std::io::Error::other("Feeder thread panicked")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use hexz_core::algo::compression::lz4::Lz4Compressor;
#[test]
fn test_parallel_pack_empty_chunks() {
let chunks = vec![];
let compressor = Box::new(Lz4Compressor::new());
let config = ParallelPackConfig::default();
let result = process_chunks_parallel(chunks, compressor, &config, |_| Ok(()));
assert!(result.is_ok());
}
#[test]
fn test_parallel_pack_single_chunk() {
let chunks = vec![RawChunk {
data: vec![1, 2, 3, 4, 5],
logical_offset: 0,
}];
let compressor = Box::new(Lz4Compressor::new());
let config = ParallelPackConfig {
num_workers: 2,
max_chunks_in_flight: 10,
};
let mut received = Vec::new();
let result = process_chunks_parallel(chunks, compressor, &config, |chunk| {
received.push(chunk.original_size);
Ok(())
});
assert!(result.is_ok());
assert_eq!(received.len(), 1);
assert_eq!(received[0], 5);
}
#[test]
fn test_parallel_pack_multiple_chunks() {
let chunks: Vec<_> = (0..100)
.map(|i| RawChunk {
data: vec![i as u8; 1000],
logical_offset: i * 1000,
})
.collect();
let compressor = Box::new(Lz4Compressor::new());
let config = ParallelPackConfig {
num_workers: 4,
max_chunks_in_flight: 20,
};
let mut count = 0;
let result = process_chunks_parallel(chunks, compressor, &config, |_chunk| {
count += 1;
Ok(())
});
assert!(result.is_ok());
assert_eq!(count, 100);
}
}