hexz_ops/
parallel_pack.rs1use crossbeam::channel::{Receiver, Sender, bounded};
24use std::sync::Arc;
25use std::thread;
26
27use hexz_common::{Error, Result};
28use hexz_core::algo::compression::Compressor;
29
30const MAX_CHUNKS_IN_FLIGHT: usize = 1000;
32
33#[derive(Debug, Clone)]
35pub struct RawChunk {
36 pub data: Vec<u8>,
38 pub logical_offset: u64,
40}
41
42#[derive(Debug)]
44pub struct CompressedChunk {
45 pub compressed: Vec<u8>,
47 pub hash: [u8; 32],
49 pub logical_offset: u64,
51 pub original_size: usize,
53}
54
55#[derive(Debug, Clone)]
57pub struct ParallelPackConfig {
58 pub num_workers: usize,
60 pub max_chunks_in_flight: usize,
62}
63
64impl Default for ParallelPackConfig {
65 fn default() -> Self {
66 Self {
67 num_workers: num_cpus::get(),
68 max_chunks_in_flight: MAX_CHUNKS_IN_FLIGHT,
69 }
70 }
71}
72
73fn compress_worker(
75 rx_chunks: &Receiver<RawChunk>,
76 tx_compressed: &Sender<CompressedChunk>,
77 compressor: &(dyn Compressor + Send + Sync),
78) -> Result<()> {
79 for chunk in rx_chunks {
80 let compressed = compressor.compress(&chunk.data)?;
82
83 let hash = blake3::hash(&chunk.data);
85
86 tx_compressed
88 .send(CompressedChunk {
89 compressed,
90 hash: hash.into(),
91 logical_offset: chunk.logical_offset,
92 original_size: chunk.data.len(),
93 })
94 .map_err(|_| {
95 Error::Io(std::io::Error::new(
96 std::io::ErrorKind::BrokenPipe,
97 "Compressed chunk channel send failed",
98 ))
99 })?;
100 }
101
102 Ok(())
103}
104
105pub fn process_chunks_parallel<W>(
118 chunks: Vec<RawChunk>,
119 compressor: Box<dyn Compressor + Send + Sync>,
120 config: &ParallelPackConfig,
121 mut writer: W,
122) -> Result<()>
123where
124 W: FnMut(CompressedChunk) -> Result<()>,
125{
126 if chunks.is_empty() {
128 return Ok(());
129 }
130
131 let compressor = Arc::new(compressor);
132
133 let (tx_chunks, rx_chunks) = bounded::<RawChunk>(config.max_chunks_in_flight);
135 let (tx_compressed, rx_compressed) = bounded::<CompressedChunk>(config.max_chunks_in_flight);
136
137 let workers: Vec<_> = (0..config.num_workers)
139 .map(|_| {
140 let rx = rx_chunks.clone();
141 let tx = tx_compressed.clone();
142 let comp = compressor.clone();
143
144 thread::spawn(move || compress_worker(&rx, &tx, comp.as_ref().as_ref()))
145 })
146 .collect();
147
148 let feeder = {
150 let tx = tx_chunks.clone();
151 thread::spawn(move || {
152 for chunk in chunks {
153 if tx.send(chunk).is_err() {
154 break;
156 }
157 }
158 })
160 };
161
162 drop(tx_chunks);
164 drop(tx_compressed);
165
166 for compressed in rx_compressed {
169 writer(compressed)?;
170 }
171
172 for worker in workers {
174 worker
175 .join()
176 .map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))??;
177 }
178
179 feeder
180 .join()
181 .map_err(|_| Error::Io(std::io::Error::other("Feeder thread panicked")))?;
182
183 Ok(())
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use hexz_core::algo::compression::lz4::Lz4Compressor;
190
191 #[test]
192 fn test_parallel_pack_empty_chunks() {
193 let chunks = vec![];
194 let compressor = Box::new(Lz4Compressor::new());
195 let config = ParallelPackConfig::default();
196
197 let result = process_chunks_parallel(chunks, compressor, &config, |_| Ok(()));
198 assert!(result.is_ok());
199 }
200
201 #[test]
202 fn test_parallel_pack_single_chunk() {
203 let chunks = vec![RawChunk {
204 data: vec![1, 2, 3, 4, 5],
205 logical_offset: 0,
206 }];
207
208 let compressor = Box::new(Lz4Compressor::new());
209 let config = ParallelPackConfig {
210 num_workers: 2,
211 max_chunks_in_flight: 10,
212 };
213
214 let mut received = Vec::new();
215 let result = process_chunks_parallel(chunks, compressor, &config, |chunk| {
216 received.push(chunk.original_size);
217 Ok(())
218 });
219
220 assert!(result.is_ok());
221 assert_eq!(received.len(), 1);
222 assert_eq!(received[0], 5);
223 }
224
225 #[test]
226 fn test_parallel_pack_multiple_chunks() {
227 let chunks: Vec<_> = (0..100)
228 .map(|i| RawChunk {
229 data: vec![i as u8; 1000],
230 logical_offset: i * 1000,
231 })
232 .collect();
233
234 let compressor = Box::new(Lz4Compressor::new());
235 let config = ParallelPackConfig {
236 num_workers: 4,
237 max_chunks_in_flight: 20,
238 };
239
240 let mut count = 0;
241 let result = process_chunks_parallel(chunks, compressor, &config, |_chunk| {
242 count += 1;
243 Ok(())
244 });
245
246 assert!(result.is_ok());
247 assert_eq!(count, 100);
248 }
249}