Skip to main content

hexz_ops/
parallel_pack.rs

1//! Parallel packing pipeline for multi-threaded compression.
2//!
3//! This module implements a producer-consumer architecture where:
4//! - Main thread reads and chunks input
5//! - Worker threads compress chunks in parallel
6//! - Single dedup thread writes unique blocks
7//!
8//! # Architecture
9//!
10//! ```text
11//!                     ┌─→ Worker 1: Compress + Hash ─┐
12//!                     │                                │
13//! Main: Read chunks ──┼─→ Worker 2: Compress + Hash ──┼─→ Dedup + Write
14//!                     │                                │   (sequential)
15//!                     └─→ Worker N: Compress + Hash ─┘
16//!
17//! Parallel (N cores)                                   Sequential
18//! ```
19//!
20//! This design maximizes CPU utilization while keeping dedup single-threaded
21//! to avoid hash table locking overhead.
22
23use crossbeam::channel::{Receiver, Sender, bounded};
24use std::sync::Arc;
25use std::thread;
26
27use hexz_common::{Error, Result};
28use hexz_core::algo::compression::Compressor;
29
30/// Maximum chunks in flight to control memory usage
31const MAX_CHUNKS_IN_FLIGHT: usize = 1000;
32
33/// Raw chunk data from input
34#[derive(Debug, Clone)]
35pub struct RawChunk {
36    /// Chunk data (uncompressed)
37    pub data: Vec<u8>,
38    /// Logical offset in the stream
39    pub logical_offset: u64,
40}
41
42/// Compressed chunk with hash for deduplication
43#[derive(Debug)]
44pub struct CompressedChunk {
45    /// Compressed data
46    pub compressed: Vec<u8>,
47    /// BLAKE3 hash of original data (for dedup)
48    pub hash: [u8; 32],
49    /// Logical offset in the stream
50    pub logical_offset: u64,
51    /// Original uncompressed size
52    pub original_size: usize,
53}
54
55/// Configuration for parallel packing
56#[derive(Debug, Clone)]
57pub struct ParallelPackConfig {
58    /// Number of worker threads
59    pub num_workers: usize,
60    /// Maximum chunks in flight (controls memory)
61    pub max_chunks_in_flight: usize,
62}
63
64impl Default for ParallelPackConfig {
65    fn default() -> Self {
66        Self {
67            num_workers: num_cpus::get(),
68            max_chunks_in_flight: MAX_CHUNKS_IN_FLIGHT,
69        }
70    }
71}
72
73/// Compress worker: receives raw chunks, compresses them, computes hashes
74fn compress_worker(
75    rx_chunks: &Receiver<RawChunk>,
76    tx_compressed: &Sender<CompressedChunk>,
77    compressor: &(dyn Compressor + Send + Sync),
78) -> Result<()> {
79    for chunk in rx_chunks {
80        // Compress the chunk
81        let compressed = compressor.compress(&chunk.data)?;
82
83        // Hash original data for deduplication
84        let hash = blake3::hash(&chunk.data);
85
86        // Send to dedup thread
87        tx_compressed
88            .send(CompressedChunk {
89                compressed,
90                hash: hash.into(),
91                logical_offset: chunk.logical_offset,
92                original_size: chunk.data.len(),
93            })
94            .map_err(|_| {
95                Error::Io(std::io::Error::new(
96                    std::io::ErrorKind::BrokenPipe,
97                    "Compressed chunk channel send failed",
98                ))
99            })?;
100    }
101
102    Ok(())
103}
104
105/// Process chunks in parallel with multiple worker threads
106///
107/// # Arguments
108///
109/// * `chunks` - Vector of raw chunks to process
110/// * `compressor` - Compressor to use (must be Send + Sync)
111/// * `config` - Parallel processing configuration
112/// * `writer` - Callback for each compressed chunk (handles dedup + write)
113///
114/// # Returns
115///
116/// Ok(()) on success, or Error if any worker fails
117pub fn process_chunks_parallel<W>(
118    chunks: Vec<RawChunk>,
119    compressor: Box<dyn Compressor + Send + Sync>,
120    config: &ParallelPackConfig,
121    mut writer: W,
122) -> Result<()>
123where
124    W: FnMut(CompressedChunk) -> Result<()>,
125{
126    // Can't parallelize if we have fewer chunks than workers
127    if chunks.is_empty() {
128        return Ok(());
129    }
130
131    let compressor = Arc::new(compressor);
132
133    // Create bounded channels to control memory usage
134    let (tx_chunks, rx_chunks) = bounded::<RawChunk>(config.max_chunks_in_flight);
135    let (tx_compressed, rx_compressed) = bounded::<CompressedChunk>(config.max_chunks_in_flight);
136
137    // Spawn compression worker threads
138    let workers: Vec<_> = (0..config.num_workers)
139        .map(|_| {
140            let rx = rx_chunks.clone();
141            let tx = tx_compressed.clone();
142            let comp = compressor.clone();
143
144            thread::spawn(move || compress_worker(&rx, &tx, comp.as_ref().as_ref()))
145        })
146        .collect();
147
148    // Spawn feeder thread to send chunks to workers
149    let feeder = {
150        let tx = tx_chunks.clone();
151        thread::spawn(move || {
152            for chunk in chunks {
153                if tx.send(chunk).is_err() {
154                    // Channel closed, workers are done
155                    break;
156                }
157            }
158            // Drop tx to signal workers we're done
159        })
160    };
161
162    // Drop our sender handles so workers will finish when feeder is done
163    drop(tx_chunks);
164    drop(tx_compressed);
165
166    // Main thread: receive compressed chunks and process (dedup + write)
167    // This must be sequential for dedup correctness
168    for compressed in rx_compressed {
169        writer(compressed)?;
170    }
171
172    // Wait for all workers to finish and check for errors
173    for worker in workers {
174        worker
175            .join()
176            .map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))??;
177    }
178
179    feeder
180        .join()
181        .map_err(|_| Error::Io(std::io::Error::other("Feeder thread panicked")))?;
182
183    Ok(())
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use hexz_core::algo::compression::lz4::Lz4Compressor;
190
191    #[test]
192    fn test_parallel_pack_empty_chunks() {
193        let chunks = vec![];
194        let compressor = Box::new(Lz4Compressor::new());
195        let config = ParallelPackConfig::default();
196
197        let result = process_chunks_parallel(chunks, compressor, &config, |_| Ok(()));
198        assert!(result.is_ok());
199    }
200
201    #[test]
202    fn test_parallel_pack_single_chunk() {
203        let chunks = vec![RawChunk {
204            data: vec![1, 2, 3, 4, 5],
205            logical_offset: 0,
206        }];
207
208        let compressor = Box::new(Lz4Compressor::new());
209        let config = ParallelPackConfig {
210            num_workers: 2,
211            max_chunks_in_flight: 10,
212        };
213
214        let mut received = Vec::new();
215        let result = process_chunks_parallel(chunks, compressor, &config, |chunk| {
216            received.push(chunk.original_size);
217            Ok(())
218        });
219
220        assert!(result.is_ok());
221        assert_eq!(received.len(), 1);
222        assert_eq!(received[0], 5);
223    }
224
225    #[test]
226    fn test_parallel_pack_multiple_chunks() {
227        let chunks: Vec<_> = (0..100)
228            .map(|i| RawChunk {
229                data: vec![i as u8; 1000],
230                logical_offset: i * 1000,
231            })
232            .collect();
233
234        let compressor = Box::new(Lz4Compressor::new());
235        let config = ParallelPackConfig {
236            num_workers: 4,
237            max_chunks_in_flight: 20,
238        };
239
240        let mut count = 0;
241        let result = process_chunks_parallel(chunks, compressor, &config, |_chunk| {
242            count += 1;
243            Ok(())
244        });
245
246        assert!(result.is_ok());
247        assert_eq!(count, 100);
248    }
249}