Skip to main content

hexz_core/ops/
parallel_pack.rs

1//! Parallel packing pipeline for multi-threaded compression.
2//!
3//! This module implements a producer-consumer architecture where:
4//! - Main thread reads and chunks input
5//! - Worker threads compress chunks in parallel
6//! - Single dedup thread writes unique blocks
7//!
8//! # Architecture
9//!
10//! ```text
11//!                     ┌─→ Worker 1: Compress + Hash ─┐
12//!                     │                                │
13//! Main: Read chunks ──┼─→ Worker 2: Compress + Hash ──┼─→ Dedup + Write
14//!                     │                                │   (sequential)
15//!                     └─→ Worker N: Compress + Hash ─┘
16//!
17//! Parallel (N cores)                                   Sequential
18//! ```
19//!
20//! This design maximizes CPU utilization while keeping dedup single-threaded
21//! to avoid hash table locking overhead.
22
23use crossbeam::channel::{Receiver, Sender, bounded};
24use std::sync::Arc;
25use std::thread;
26
27use crate::algo::compression::Compressor;
28use hexz_common::{Error, Result};
29
30/// Maximum chunks in flight to control memory usage
31const MAX_CHUNKS_IN_FLIGHT: usize = 1000;
32
33/// Raw chunk data from input
34#[derive(Clone)]
35pub struct RawChunk {
36    /// Chunk data (uncompressed)
37    pub data: Vec<u8>,
38    /// Logical offset in the stream
39    pub logical_offset: u64,
40}
41
42/// Compressed chunk with hash for deduplication
43pub struct CompressedChunk {
44    /// Compressed data
45    pub compressed: Vec<u8>,
46    /// BLAKE3 hash of original data (for dedup)
47    pub hash: [u8; 32],
48    /// Logical offset in the stream
49    pub logical_offset: u64,
50    /// Original uncompressed size
51    pub original_size: usize,
52}
53
54/// Configuration for parallel packing
55#[derive(Debug, Clone)]
56pub struct ParallelPackConfig {
57    /// Number of worker threads
58    pub num_workers: usize,
59    /// Maximum chunks in flight (controls memory)
60    pub max_chunks_in_flight: usize,
61}
62
63impl Default for ParallelPackConfig {
64    fn default() -> Self {
65        Self {
66            num_workers: num_cpus::get(),
67            max_chunks_in_flight: MAX_CHUNKS_IN_FLIGHT,
68        }
69    }
70}
71
72/// Compress worker: receives raw chunks, compresses them, computes hashes
73fn compress_worker(
74    rx_chunks: Receiver<RawChunk>,
75    tx_compressed: Sender<CompressedChunk>,
76    compressor: Arc<Box<dyn Compressor + Send + Sync>>,
77) -> Result<()> {
78    for chunk in rx_chunks {
79        // Compress the chunk
80        let compressed = compressor.compress(&chunk.data)?;
81
82        // Hash original data for deduplication
83        let hash = blake3::hash(&chunk.data);
84
85        // Send to dedup thread
86        tx_compressed
87            .send(CompressedChunk {
88                compressed,
89                hash: hash.into(),
90                logical_offset: chunk.logical_offset,
91                original_size: chunk.data.len(),
92            })
93            .map_err(|_| {
94                Error::Io(std::io::Error::new(
95                    std::io::ErrorKind::BrokenPipe,
96                    "Compressed chunk channel send failed",
97                ))
98            })?;
99    }
100
101    Ok(())
102}
103
104/// Process chunks in parallel with multiple worker threads
105///
106/// # Arguments
107///
108/// * `chunks` - Vector of raw chunks to process
109/// * `compressor` - Compressor to use (must be Send + Sync)
110/// * `config` - Parallel processing configuration
111/// * `writer` - Callback for each compressed chunk (handles dedup + write)
112///
113/// # Returns
114///
115/// Ok(()) on success, or Error if any worker fails
116pub fn process_chunks_parallel<W>(
117    chunks: Vec<RawChunk>,
118    compressor: Box<dyn Compressor + Send + Sync>,
119    config: ParallelPackConfig,
120    mut writer: W,
121) -> Result<()>
122where
123    W: FnMut(CompressedChunk) -> Result<()>,
124{
125    // Can't parallelize if we have fewer chunks than workers
126    if chunks.is_empty() {
127        return Ok(());
128    }
129
130    let compressor = Arc::new(compressor);
131
132    // Create bounded channels to control memory usage
133    let (tx_chunks, rx_chunks) = bounded::<RawChunk>(config.max_chunks_in_flight);
134    let (tx_compressed, rx_compressed) = bounded::<CompressedChunk>(config.max_chunks_in_flight);
135
136    // Spawn compression worker threads
137    let workers: Vec<_> = (0..config.num_workers)
138        .map(|_| {
139            let rx = rx_chunks.clone();
140            let tx = tx_compressed.clone();
141            let comp = compressor.clone();
142
143            thread::spawn(move || compress_worker(rx, tx, comp))
144        })
145        .collect();
146
147    // Spawn feeder thread to send chunks to workers
148    let feeder = {
149        let tx = tx_chunks.clone();
150        thread::spawn(move || {
151            for chunk in chunks {
152                if tx.send(chunk).is_err() {
153                    // Channel closed, workers are done
154                    break;
155                }
156            }
157            // Drop tx to signal workers we're done
158        })
159    };
160
161    // Drop our sender handles so workers will finish when feeder is done
162    drop(tx_chunks);
163    drop(tx_compressed);
164
165    // Main thread: receive compressed chunks and process (dedup + write)
166    // This must be sequential for dedup correctness
167    for compressed in rx_compressed {
168        writer(compressed)?;
169    }
170
171    // Wait for all workers to finish and check for errors
172    for worker in workers {
173        worker
174            .join()
175            .map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))??;
176    }
177
178    feeder
179        .join()
180        .map_err(|_| Error::Io(std::io::Error::other("Feeder thread panicked")))?;
181
182    Ok(())
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::algo::compression::lz4::Lz4Compressor;
189
190    #[test]
191    fn test_parallel_pack_empty_chunks() {
192        let chunks = vec![];
193        let compressor = Box::new(Lz4Compressor::new());
194        let config = ParallelPackConfig::default();
195
196        let result = process_chunks_parallel(chunks, compressor, config, |_| Ok(()));
197        assert!(result.is_ok());
198    }
199
200    #[test]
201    fn test_parallel_pack_single_chunk() {
202        let chunks = vec![RawChunk {
203            data: vec![1, 2, 3, 4, 5],
204            logical_offset: 0,
205        }];
206
207        let compressor = Box::new(Lz4Compressor::new());
208        let config = ParallelPackConfig {
209            num_workers: 2,
210            max_chunks_in_flight: 10,
211        };
212
213        let mut received = Vec::new();
214        let result = process_chunks_parallel(chunks, compressor, config, |chunk| {
215            received.push(chunk.original_size);
216            Ok(())
217        });
218
219        assert!(result.is_ok());
220        assert_eq!(received.len(), 1);
221        assert_eq!(received[0], 5);
222    }
223
224    #[test]
225    fn test_parallel_pack_multiple_chunks() {
226        let chunks: Vec<_> = (0..100)
227            .map(|i| RawChunk {
228                data: vec![i as u8; 1000],
229                logical_offset: i * 1000,
230            })
231            .collect();
232
233        let compressor = Box::new(Lz4Compressor::new());
234        let config = ParallelPackConfig {
235            num_workers: 4,
236            max_chunks_in_flight: 20,
237        };
238
239        let mut count = 0;
240        let result = process_chunks_parallel(chunks, compressor, config, |_chunk| {
241            count += 1;
242            Ok(())
243        });
244
245        assert!(result.is_ok());
246        assert_eq!(count, 100);
247    }
248}