hexz_core/ops/
parallel_pack.rs1use crossbeam::channel::{Receiver, Sender, bounded};
24use std::sync::Arc;
25use std::thread;
26
27use crate::algo::compression::Compressor;
28use hexz_common::{Error, Result};
29
30const MAX_CHUNKS_IN_FLIGHT: usize = 1000;
32
33#[derive(Clone)]
35pub struct RawChunk {
36 pub data: Vec<u8>,
38 pub logical_offset: u64,
40}
41
42pub struct CompressedChunk {
44 pub compressed: Vec<u8>,
46 pub hash: [u8; 32],
48 pub logical_offset: u64,
50 pub original_size: usize,
52}
53
54#[derive(Debug, Clone)]
56pub struct ParallelPackConfig {
57 pub num_workers: usize,
59 pub max_chunks_in_flight: usize,
61}
62
63impl Default for ParallelPackConfig {
64 fn default() -> Self {
65 Self {
66 num_workers: num_cpus::get(),
67 max_chunks_in_flight: MAX_CHUNKS_IN_FLIGHT,
68 }
69 }
70}
71
72fn compress_worker(
74 rx_chunks: Receiver<RawChunk>,
75 tx_compressed: Sender<CompressedChunk>,
76 compressor: Arc<Box<dyn Compressor + Send + Sync>>,
77) -> Result<()> {
78 for chunk in rx_chunks {
79 let compressed = compressor.compress(&chunk.data)?;
81
82 let hash = blake3::hash(&chunk.data);
84
85 tx_compressed
87 .send(CompressedChunk {
88 compressed,
89 hash: hash.into(),
90 logical_offset: chunk.logical_offset,
91 original_size: chunk.data.len(),
92 })
93 .map_err(|_| {
94 Error::Io(std::io::Error::new(
95 std::io::ErrorKind::BrokenPipe,
96 "Compressed chunk channel send failed",
97 ))
98 })?;
99 }
100
101 Ok(())
102}
103
104pub fn process_chunks_parallel<W>(
117 chunks: Vec<RawChunk>,
118 compressor: Box<dyn Compressor + Send + Sync>,
119 config: ParallelPackConfig,
120 mut writer: W,
121) -> Result<()>
122where
123 W: FnMut(CompressedChunk) -> Result<()>,
124{
125 if chunks.is_empty() {
127 return Ok(());
128 }
129
130 let compressor = Arc::new(compressor);
131
132 let (tx_chunks, rx_chunks) = bounded::<RawChunk>(config.max_chunks_in_flight);
134 let (tx_compressed, rx_compressed) = bounded::<CompressedChunk>(config.max_chunks_in_flight);
135
136 let workers: Vec<_> = (0..config.num_workers)
138 .map(|_| {
139 let rx = rx_chunks.clone();
140 let tx = tx_compressed.clone();
141 let comp = compressor.clone();
142
143 thread::spawn(move || compress_worker(rx, tx, comp))
144 })
145 .collect();
146
147 let feeder = {
149 let tx = tx_chunks.clone();
150 thread::spawn(move || {
151 for chunk in chunks {
152 if tx.send(chunk).is_err() {
153 break;
155 }
156 }
157 })
159 };
160
161 drop(tx_chunks);
163 drop(tx_compressed);
164
165 for compressed in rx_compressed {
168 writer(compressed)?;
169 }
170
171 for worker in workers {
173 worker
174 .join()
175 .map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))??;
176 }
177
178 feeder
179 .join()
180 .map_err(|_| Error::Io(std::io::Error::other("Feeder thread panicked")))?;
181
182 Ok(())
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::algo::compression::lz4::Lz4Compressor;
189
190 #[test]
191 fn test_parallel_pack_empty_chunks() {
192 let chunks = vec![];
193 let compressor = Box::new(Lz4Compressor::new());
194 let config = ParallelPackConfig::default();
195
196 let result = process_chunks_parallel(chunks, compressor, config, |_| Ok(()));
197 assert!(result.is_ok());
198 }
199
200 #[test]
201 fn test_parallel_pack_single_chunk() {
202 let chunks = vec![RawChunk {
203 data: vec![1, 2, 3, 4, 5],
204 logical_offset: 0,
205 }];
206
207 let compressor = Box::new(Lz4Compressor::new());
208 let config = ParallelPackConfig {
209 num_workers: 2,
210 max_chunks_in_flight: 10,
211 };
212
213 let mut received = Vec::new();
214 let result = process_chunks_parallel(chunks, compressor, config, |chunk| {
215 received.push(chunk.original_size);
216 Ok(())
217 });
218
219 assert!(result.is_ok());
220 assert_eq!(received.len(), 1);
221 assert_eq!(received[0], 5);
222 }
223
224 #[test]
225 fn test_parallel_pack_multiple_chunks() {
226 let chunks: Vec<_> = (0..100)
227 .map(|i| RawChunk {
228 data: vec![i as u8; 1000],
229 logical_offset: i * 1000,
230 })
231 .collect();
232
233 let compressor = Box::new(Lz4Compressor::new());
234 let config = ParallelPackConfig {
235 num_workers: 4,
236 max_chunks_in_flight: 20,
237 };
238
239 let mut count = 0;
240 let result = process_chunks_parallel(chunks, compressor, config, |_chunk| {
241 count += 1;
242 Ok(())
243 });
244
245 assert!(result.is_ok());
246 assert_eq!(count, 100);
247 }
248}