1use crate::block::{Block, MAX_BLOCK_SIZE};
29use crate::chunking::{
30 ChunkingStrategy, DagLink, DagNode, DeduplicationStats, DEFAULT_CHUNK_SIZE, MAX_LINKS_PER_NODE,
31 MIN_CHUNK_SIZE,
32};
33use crate::cid::{Cid, HashAlgorithm};
34use crate::error::{Error, Result};
35use crate::metrics::global_metrics;
36use bytes::Bytes;
37use rayon::prelude::*;
38use std::sync::{Arc, Mutex};
39use std::time::{Duration, Instant};
40
41#[cfg(test)]
42use crate::cid::CidBuilder;
43
44#[derive(Debug, Clone)]
46pub struct ParallelChunkingConfig {
47 pub chunk_size: usize,
49 pub strategy: ChunkingStrategy,
51 pub max_links_per_node: usize,
53 pub hash_algorithm: HashAlgorithm,
55 pub num_threads: Option<usize>,
57}
58
59impl Default for ParallelChunkingConfig {
60 fn default() -> Self {
61 Self {
62 chunk_size: DEFAULT_CHUNK_SIZE,
63 strategy: ChunkingStrategy::FixedSize,
64 max_links_per_node: MAX_LINKS_PER_NODE,
65 hash_algorithm: HashAlgorithm::Sha256,
66 num_threads: None,
67 }
68 }
69}
70
71impl ParallelChunkingConfig {
72 pub fn with_chunk_size(chunk_size: usize) -> Result<Self> {
74 if chunk_size < MIN_CHUNK_SIZE {
75 return Err(Error::InvalidInput(format!(
76 "Chunk size {} is below minimum {}",
77 chunk_size, MIN_CHUNK_SIZE
78 )));
79 }
80 if chunk_size > MAX_BLOCK_SIZE {
81 return Err(Error::InvalidInput(format!(
82 "Chunk size {} exceeds maximum {}",
83 chunk_size, MAX_BLOCK_SIZE
84 )));
85 }
86 Ok(Self {
87 chunk_size,
88 ..Default::default()
89 })
90 }
91
92 pub fn with_threads(mut self, num_threads: usize) -> Self {
94 self.num_threads = Some(num_threads);
95 self
96 }
97
98 pub fn with_hash_algorithm(mut self, algorithm: HashAlgorithm) -> Self {
100 self.hash_algorithm = algorithm;
101 self
102 }
103
104 pub fn with_content_defined(mut self) -> Self {
106 self.strategy = ChunkingStrategy::ContentDefined;
107 self
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct ParallelChunkingResult {
114 pub root_cid: Cid,
116 pub chunk_count: usize,
118 pub total_bytes: usize,
120 pub dedup_stats: DeduplicationStats,
122 pub duration: Duration,
124 pub chunk_cids: Vec<Cid>,
126 pub dag_nodes: Vec<DagNode>,
128}
129
130pub struct ParallelChunker {
132 config: ParallelChunkingConfig,
133}
134
135impl ParallelChunker {
136 pub fn new() -> Self {
138 Self {
139 config: ParallelChunkingConfig::default(),
140 }
141 }
142
143 pub fn with_config(config: ParallelChunkingConfig) -> Self {
145 Self { config }
146 }
147
148 pub fn chunk_parallel(&self, data: &[u8]) -> Result<ParallelChunkingResult> {
153 let start = Instant::now();
154 let metrics = global_metrics();
155
156 if data.len() < 1_000_000 {
158 return self.chunk_sequential(data, start);
159 }
160
161 let chunk_ranges = self.calculate_chunk_ranges(data.len());
163
164 let chunk_results: Vec<_> = chunk_ranges
166 .par_iter()
167 .map(|(start, end)| {
168 let chunk_data = &data[*start..*end];
169 let block = Block::new(Bytes::copy_from_slice(chunk_data))
170 .map_err(|e| Error::InvalidData(e.to_string()))?;
171 Ok((*block.cid(), block.data().len()))
172 })
173 .collect::<Result<Vec<_>>>()?;
174
175 let dag_result = self.build_dag_parallel(&chunk_results)?;
177
178 let duration = start.elapsed();
179 metrics.record_chunking(chunk_results.len(), duration.as_micros() as u64);
180
181 Ok(ParallelChunkingResult {
182 root_cid: dag_result.root_cid,
183 chunk_count: chunk_results.len(),
184 total_bytes: data.len(),
185 dedup_stats: DeduplicationStats {
186 unique_chunks: chunk_results.len(),
187 total_chunks: chunk_results.len(),
188 reused_chunks: 0,
189 space_savings_percent: 0.0,
190 total_data_size: data.len() as u64,
191 deduplicated_size: data.len() as u64,
192 },
193 duration,
194 chunk_cids: chunk_results.iter().map(|(cid, _)| *cid).collect(),
195 dag_nodes: dag_result.nodes,
196 })
197 }
198
199 fn calculate_chunk_ranges(&self, data_len: usize) -> Vec<(usize, usize)> {
201 let chunk_size = self.config.chunk_size;
202 let mut ranges = Vec::new();
203 let mut offset = 0;
204
205 while offset < data_len {
206 let end = (offset + chunk_size).min(data_len);
207 ranges.push((offset, end));
208 offset = end;
209 }
210
211 ranges
212 }
213
214 fn build_dag_parallel(&self, chunks: &[(Cid, usize)]) -> Result<DagBuildResult> {
216 if chunks.is_empty() {
217 return Err(Error::InvalidInput(
218 "no chunks to build DAG from".to_string(),
219 ));
220 }
221
222 if chunks.len() == 1 {
224 return Ok(DagBuildResult {
225 root_cid: chunks[0].0,
226 nodes: vec![],
227 });
228 }
229
230 let mut current_level: Vec<Cid> = chunks.iter().map(|(cid, _)| *cid).collect();
232 let all_nodes = Arc::new(Mutex::new(Vec::new()));
233
234 while current_level.len() > 1 {
235 let max_links = self.config.max_links_per_node;
236
237 let groups: Vec<_> = current_level.chunks(max_links).collect();
239
240 let parent_results: Vec<_> = groups
241 .par_iter()
242 .map(|group| {
243 let links: Vec<DagLink> = group
245 .iter()
246 .enumerate()
247 .map(|(idx, cid)| DagLink::with_name(*cid, 0, format!("chunk-{}", idx)))
248 .collect();
249
250 let node = DagNode {
251 links,
252 total_size: 0, data: None,
254 };
255
256 let ipld = node.to_ipld();
258 let cbor = ipld
259 .to_dag_cbor()
260 .map_err(|e| Error::Serialization(e.to_string()))?;
261
262 let block = Block::new(Bytes::from(cbor))
263 .map_err(|e| Error::InvalidData(e.to_string()))?;
264
265 Ok((*block.cid(), node))
266 })
267 .collect::<Result<Vec<_>>>()?;
268
269 let mut nodes_lock = all_nodes.lock().unwrap();
271 nodes_lock.extend(parent_results.iter().map(|(_, node)| node.clone()));
272 drop(nodes_lock);
273
274 current_level = parent_results.into_iter().map(|(cid, _)| cid).collect();
276 }
277
278 let nodes = Arc::try_unwrap(all_nodes).unwrap().into_inner().unwrap();
279
280 Ok(DagBuildResult {
281 root_cid: current_level[0],
282 nodes,
283 })
284 }
285
286 fn chunk_sequential(&self, data: &[u8], start: Instant) -> Result<ParallelChunkingResult> {
288 let chunk_ranges = self.calculate_chunk_ranges(data.len());
289
290 let mut chunk_cids = Vec::new();
291 for (start_offset, end_offset) in chunk_ranges {
292 let chunk_data = &data[start_offset..end_offset];
293 let block = Block::new(Bytes::copy_from_slice(chunk_data))?;
294 chunk_cids.push((*block.cid(), block.data().len()));
295 }
296
297 let dag_result = self.build_dag_parallel(&chunk_cids)?;
298
299 Ok(ParallelChunkingResult {
300 root_cid: dag_result.root_cid,
301 chunk_count: chunk_cids.len(),
302 total_bytes: data.len(),
303 dedup_stats: DeduplicationStats {
304 unique_chunks: chunk_cids.len(),
305 total_chunks: chunk_cids.len(),
306 reused_chunks: 0,
307 space_savings_percent: 0.0,
308 total_data_size: data.len() as u64,
309 deduplicated_size: data.len() as u64,
310 },
311 duration: start.elapsed(),
312 chunk_cids: chunk_cids.iter().map(|(cid, _)| *cid).collect(),
313 dag_nodes: dag_result.nodes,
314 })
315 }
316
317 pub fn chunk_files_parallel(&self, files: &[Vec<u8>]) -> Result<Vec<ParallelChunkingResult>> {
319 files
320 .par_iter()
321 .map(|data| self.chunk_parallel(data))
322 .collect()
323 }
324}
325
326impl Default for ParallelChunker {
327 fn default() -> Self {
328 Self::new()
329 }
330}
331
332struct DagBuildResult {
334 root_cid: Cid,
335 nodes: Vec<DagNode>,
336}
337
338pub struct ParallelDeduplicator {
340 seen_cids: Arc<Mutex<std::collections::HashSet<Cid>>>,
341 stats: Arc<Mutex<DeduplicationStats>>,
342}
343
344impl ParallelDeduplicator {
345 pub fn new() -> Self {
347 Self {
348 seen_cids: Arc::new(Mutex::new(std::collections::HashSet::new())),
349 stats: Arc::new(Mutex::new(DeduplicationStats {
350 unique_chunks: 0,
351 total_chunks: 0,
352 reused_chunks: 0,
353 space_savings_percent: 0.0,
354 total_data_size: 0,
355 deduplicated_size: 0,
356 })),
357 }
358 }
359
360 pub fn check_unique(&self, cid: &Cid, size: usize) -> bool {
362 let mut seen = self.seen_cids.lock().unwrap();
363 let mut stats = self.stats.lock().unwrap();
364
365 stats.total_chunks += 1;
366 stats.total_data_size += size as u64;
367
368 if seen.insert(*cid) {
369 stats.unique_chunks += 1;
370 stats.deduplicated_size += size as u64;
371 true
372 } else {
373 stats.reused_chunks += 1;
374 false
375 }
376 }
377
378 pub fn stats(&self) -> DeduplicationStats {
380 let stats = self.stats.lock().unwrap();
381 let mut result = stats.clone();
382 if result.total_data_size > 0 {
383 result.space_savings_percent =
384 (1.0 - (result.deduplicated_size as f64 / result.total_data_size as f64)) * 100.0;
385 }
386 result
387 }
388}
389
390impl Default for ParallelDeduplicator {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_parallel_chunking_basic() {
402 let data = vec![0u8; 1_000_000]; let chunker = ParallelChunker::new();
404 let result = chunker.chunk_parallel(&data).unwrap();
405
406 assert!(result.chunk_count > 0);
407 assert_eq!(result.total_bytes, 1_000_000);
408 assert!(result.duration.as_micros() > 0);
409 }
410
411 #[test]
412 fn test_parallel_chunking_small_file() {
413 let data = vec![0u8; 1024]; let chunker = ParallelChunker::new();
415 let result = chunker.chunk_parallel(&data).unwrap();
416
417 assert_eq!(result.chunk_count, 1);
418 assert_eq!(result.total_bytes, 1024);
419 }
420
421 #[test]
422 fn test_parallel_chunking_custom_size() {
423 let config = ParallelChunkingConfig::with_chunk_size(128 * 1024).unwrap();
424 let chunker = ParallelChunker::with_config(config);
425 let data = vec![0u8; 1_000_000];
426 let result = chunker.chunk_parallel(&data).unwrap();
427
428 assert!(result.chunk_count > 0);
429 }
430
431 #[test]
432 fn test_parallel_chunking_multiple_files() {
433 let files = vec![vec![0u8; 500_000], vec![1u8; 500_000], vec![2u8; 500_000]];
434
435 let chunker = ParallelChunker::new();
436 let results = chunker.chunk_files_parallel(&files).unwrap();
437
438 assert_eq!(results.len(), 3);
439 for result in results {
440 assert!(result.chunk_count > 0);
441 }
442 }
443
444 #[test]
445 fn test_chunk_ranges() {
446 let chunker = ParallelChunker::new();
447 let ranges = chunker.calculate_chunk_ranges(1_000_000);
448
449 assert!(!ranges.is_empty());
450 assert_eq!(ranges[0].0, 0);
451
452 for i in 1..ranges.len() {
454 assert_eq!(ranges[i - 1].1, ranges[i].0);
455 }
456
457 assert_eq!(ranges.last().unwrap().1, 1_000_000);
459 }
460
461 #[test]
462 fn test_parallel_deduplicator() {
463 let dedup = ParallelDeduplicator::new();
464 let cid = CidBuilder::new().build(b"test").unwrap();
465
466 assert!(dedup.check_unique(&cid, 100));
467 assert!(!dedup.check_unique(&cid, 100));
468
469 let stats = dedup.stats();
470 assert_eq!(stats.unique_chunks, 1);
471 assert_eq!(stats.total_chunks, 2);
472 assert!(stats.space_savings_percent > 0.0);
473 }
474
475 #[test]
476 fn test_config_validation() {
477 assert!(ParallelChunkingConfig::with_chunk_size(100).is_err());
479
480 assert!(ParallelChunkingConfig::with_chunk_size(128 * 1024).is_ok());
482
483 assert!(ParallelChunkingConfig::with_chunk_size(10_000_000).is_err());
485 }
486
487 #[test]
488 fn test_config_builder() {
489 let config = ParallelChunkingConfig::default()
490 .with_threads(4)
491 .with_hash_algorithm(HashAlgorithm::Sha3_256)
492 .with_content_defined();
493
494 assert_eq!(config.num_threads, Some(4));
495 assert_eq!(config.hash_algorithm, HashAlgorithm::Sha3_256);
496 assert_eq!(config.strategy, ChunkingStrategy::ContentDefined);
497 }
498
499 #[test]
500 fn test_empty_data() {
501 let chunker = ParallelChunker::new();
502 let data: Vec<u8> = vec![];
503 let result = chunker.chunk_parallel(&data);
504 assert!(result.is_err());
505 }
506
507 #[test]
508 fn test_single_chunk() {
509 let data = vec![42u8; 1024];
510 let chunker = ParallelChunker::new();
511 let result = chunker.chunk_parallel(&data).unwrap();
512
513 assert_eq!(result.chunk_count, 1);
514 assert!(!result.chunk_cids.is_empty());
515 }
516
517 #[test]
518 fn test_dag_building() {
519 let data = vec![0u8; 5_000_000]; let chunker = ParallelChunker::new();
521 let result = chunker.chunk_parallel(&data).unwrap();
522
523 assert!(result.chunk_count > 1);
524 assert!(!result.chunk_cids.is_empty());
525 }
526}