1use crate::error::{Error, Result};
25use crate::hash::global_hash_registry;
26use crate::{compress, compression_ratio, decompress, Block, BlockBuilder, Cid, CidBuilder};
27use crate::{CompressionAlgorithm, HashAlgorithm};
28use bytes::Bytes;
29use rayon::prelude::*;
30
31pub struct BatchProcessor {
36 hash_algorithm: HashAlgorithm,
37}
38
39impl BatchProcessor {
40 pub fn new() -> Self {
42 Self {
43 hash_algorithm: HashAlgorithm::Sha256,
44 }
45 }
46
47 pub fn with_hash_algorithm(hash_algorithm: HashAlgorithm) -> Self {
49 Self { hash_algorithm }
50 }
51
52 pub fn create_blocks_parallel(&self, data_chunks: Vec<Bytes>) -> Result<Vec<Block>> {
57 let hash_algo = self.hash_algorithm;
58
59 data_chunks
60 .into_par_iter()
61 .map(|data| BlockBuilder::new().hash_algorithm(hash_algo).build(data))
62 .collect()
63 }
64
65 pub fn generate_cids_parallel(&self, data_chunks: Vec<Bytes>) -> Result<Vec<(Bytes, Cid)>> {
69 let hash_algo = self.hash_algorithm;
70
71 data_chunks
72 .into_par_iter()
73 .map(|data| {
74 let cid = CidBuilder::new().hash_algorithm(hash_algo).build(&data)?;
75 Ok((data, cid))
76 })
77 .collect()
78 }
79
80 pub fn verify_blocks_parallel(&self, blocks: &[Block]) -> Result<()> {
84 let all_valid: Result<bool> = blocks
85 .par_iter()
86 .try_fold(
87 || true,
88 |acc, block| -> Result<bool> {
89 let valid = block.verify()?;
90 Ok(acc && valid)
91 },
92 )
93 .try_reduce(|| true, |a, b| -> Result<bool> { Ok(a && b) });
94
95 if all_valid? {
96 Ok(())
97 } else {
98 Err(Error::Verification(
99 "One or more blocks failed verification".into(),
100 ))
101 }
102 }
103
104 pub fn compute_hashes_parallel(&self, data_chunks: &[&[u8]]) -> Result<Vec<Vec<u8>>> {
108 let code = self.hash_algorithm.code();
109
110 let registry = global_hash_registry();
111 let engine = registry.get(code).ok_or_else(|| {
112 Error::InvalidInput(format!(
113 "Hash algorithm {} not supported",
114 self.hash_algorithm.name()
115 ))
116 })?;
117
118 Ok(data_chunks
119 .par_iter()
120 .map(|data| engine.digest(data))
121 .collect())
122 }
123
124 pub fn total_bytes_parallel(&self, blocks: &[Block]) -> usize {
126 blocks.par_iter().map(|block| block.data().len()).sum()
127 }
128
129 pub fn filter_blocks_parallel<F>(&self, blocks: Vec<Block>, predicate: F) -> Vec<Block>
131 where
132 F: Fn(&Block) -> bool + Sync + Send,
133 {
134 blocks
135 .into_par_iter()
136 .filter(|block| predicate(block))
137 .collect()
138 }
139
140 pub fn unique_cids_parallel(&self, blocks: &[Block]) -> Vec<Cid> {
142 use std::collections::HashSet;
143 use std::sync::Mutex;
144
145 let seen = Mutex::new(HashSet::new());
146 let unique: Vec<Cid> = blocks
147 .par_iter()
148 .filter_map(|block| {
149 let cid = *block.cid();
150 let mut seen = seen.lock().unwrap();
151 if seen.insert(cid.to_string()) {
152 Some(cid)
153 } else {
154 None
155 }
156 })
157 .collect();
158
159 unique
160 }
161
162 pub fn compress_data_parallel(
198 &self,
199 data_chunks: Vec<Bytes>,
200 algorithm: CompressionAlgorithm,
201 level: u8,
202 ) -> Result<Vec<Bytes>> {
203 data_chunks
204 .into_par_iter()
205 .map(|data| compress(&data, algorithm, level))
206 .collect()
207 }
208
209 pub fn decompress_data_parallel(
246 &self,
247 compressed_chunks: Vec<Bytes>,
248 algorithm: CompressionAlgorithm,
249 ) -> Result<Vec<Bytes>> {
250 compressed_chunks
251 .into_par_iter()
252 .map(|data| decompress(&data, algorithm))
253 .collect()
254 }
255
256 pub fn analyze_compression_ratios_parallel(
294 &self,
295 data_chunks: &[Bytes],
296 algorithm: CompressionAlgorithm,
297 level: u8,
298 ) -> Result<Vec<f64>> {
299 data_chunks
300 .par_iter()
301 .map(|data| compression_ratio(data, algorithm, level))
302 .collect()
303 }
304}
305
306impl Default for BatchProcessor {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312#[derive(Debug, Clone, PartialEq)]
314pub struct BatchStats {
315 pub items_processed: usize,
317 pub total_bytes: usize,
319 pub unique_cids: usize,
321 pub failed_items: usize,
323 pub compressed_bytes: usize,
325 pub avg_compression_ratio: f64,
327}
328
329impl BatchStats {
330 pub fn new() -> Self {
332 Self {
333 items_processed: 0,
334 total_bytes: 0,
335 unique_cids: 0,
336 failed_items: 0,
337 compressed_bytes: 0,
338 avg_compression_ratio: 0.0,
339 }
340 }
341
342 pub fn dedup_ratio(&self) -> f64 {
344 if self.items_processed == 0 {
345 return 0.0;
346 }
347 1.0 - (self.unique_cids as f64 / self.items_processed as f64)
348 }
349
350 pub fn success_rate(&self) -> f64 {
352 if self.items_processed == 0 {
353 return 1.0;
354 }
355 let successful = self.items_processed - self.failed_items;
356 successful as f64 / self.items_processed as f64
357 }
358
359 pub fn compression_savings(&self) -> i64 {
364 if self.compressed_bytes == 0 {
365 return 0;
366 }
367 self.total_bytes as i64 - self.compressed_bytes as i64
368 }
369
370 pub fn compression_efficiency(&self) -> f64 {
375 if self.total_bytes == 0 || self.compressed_bytes == 0 {
376 return 0.0;
377 }
378 (1.0 - (self.compressed_bytes as f64 / self.total_bytes as f64)) * 100.0
379 }
380}
381
382impl Default for BatchStats {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_create_blocks_parallel() {
394 let processor = BatchProcessor::new();
395 let chunks = vec![
396 Bytes::from("chunk 1"),
397 Bytes::from("chunk 2"),
398 Bytes::from("chunk 3"),
399 ];
400
401 let blocks = processor.create_blocks_parallel(chunks).unwrap();
402 assert_eq!(blocks.len(), 3);
403
404 for block in &blocks {
406 assert!(block.verify().is_ok());
407 }
408 }
409
410 #[test]
411 fn test_generate_cids_parallel() {
412 let processor = BatchProcessor::new();
413 let chunks = vec![
414 Bytes::from("data 1"),
415 Bytes::from("data 2"),
416 Bytes::from("data 3"),
417 ];
418
419 let results = processor.generate_cids_parallel(chunks.clone()).unwrap();
420 assert_eq!(results.len(), 3);
421
422 for (i, (data, _cid)) in results.iter().enumerate() {
424 assert_eq!(data, &chunks[i]);
425 }
426 }
427
428 #[test]
429 fn test_verify_blocks_parallel() {
430 let processor = BatchProcessor::new();
431 let chunks = vec![Bytes::from("test 1"), Bytes::from("test 2")];
432
433 let blocks = processor.create_blocks_parallel(chunks).unwrap();
434 assert!(processor.verify_blocks_parallel(&blocks).is_ok());
435 }
436
437 #[test]
438 fn test_compute_hashes_parallel() {
439 let processor = BatchProcessor::new();
440 let data: Vec<&[u8]> = vec![b"hash1", b"hash2", b"hash3"];
441
442 let hashes = processor.compute_hashes_parallel(&data).unwrap();
443 assert_eq!(hashes.len(), 3);
444
445 for hash in &hashes {
447 assert_eq!(hash.len(), 32);
448 }
449
450 let hashes2 = processor.compute_hashes_parallel(&data).unwrap();
452 assert_eq!(hashes, hashes2);
453 }
454
455 #[test]
456 fn test_total_bytes_parallel() {
457 let processor = BatchProcessor::new();
458 let chunks = vec![
459 Bytes::from("12345"), Bytes::from("1234567890"), Bytes::from("123"), ];
463
464 let blocks = processor.create_blocks_parallel(chunks).unwrap();
465 let total = processor.total_bytes_parallel(&blocks);
466 assert_eq!(total, 18);
467 }
468
469 #[test]
470 fn test_filter_blocks_parallel() {
471 let processor = BatchProcessor::new();
472 let chunks = vec![
473 Bytes::from("short"),
474 Bytes::from("this is a longer chunk"),
475 Bytes::from("tiny"),
476 ];
477
478 let blocks = processor.create_blocks_parallel(chunks).unwrap();
479
480 let filtered = processor.filter_blocks_parallel(blocks, |block| block.data().len() > 10);
482
483 assert_eq!(filtered.len(), 1);
484 assert!(filtered[0].data().len() > 10);
485 }
486
487 #[test]
488 fn test_unique_cids_parallel() {
489 let processor = BatchProcessor::new();
490 let chunks = vec![
491 Bytes::from("unique1"),
492 Bytes::from("unique2"),
493 Bytes::from("unique1"), Bytes::from("unique3"),
495 ];
496
497 let blocks = processor.create_blocks_parallel(chunks).unwrap();
498 let unique = processor.unique_cids_parallel(&blocks);
499
500 assert_eq!(unique.len(), 3); }
502
503 #[test]
504 fn test_batch_stats() {
505 let mut stats = BatchStats::new();
506 assert_eq!(stats.dedup_ratio(), 0.0);
507 assert_eq!(stats.success_rate(), 1.0);
508
509 stats.items_processed = 10;
510 stats.unique_cids = 7;
511 stats.failed_items = 1;
512
513 assert!((stats.dedup_ratio() - 0.3).abs() < 0.0001);
515 assert!((stats.success_rate() - 0.9).abs() < 0.0001);
516 }
517
518 #[test]
519 fn test_with_different_hash_algorithms() {
520 let processor_sha256 = BatchProcessor::with_hash_algorithm(HashAlgorithm::Sha256);
521 let processor_sha3 = BatchProcessor::with_hash_algorithm(HashAlgorithm::Sha3_256);
522
523 let data = vec![Bytes::from("test data")];
524
525 let blocks_sha256 = processor_sha256
526 .create_blocks_parallel(data.clone())
527 .unwrap();
528 let blocks_sha3 = processor_sha3.create_blocks_parallel(data).unwrap();
529
530 assert_ne!(blocks_sha256[0].cid(), blocks_sha3[0].cid());
532 }
533
534 #[test]
535 fn test_large_batch_performance() {
536 let processor = BatchProcessor::new();
537
538 let chunks: Vec<Bytes> = (0..1000)
540 .map(|i| Bytes::from(format!("chunk {}", i)))
541 .collect();
542
543 let blocks = processor.create_blocks_parallel(chunks).unwrap();
544 assert_eq!(blocks.len(), 1000);
545
546 assert!(processor.verify_blocks_parallel(&blocks).is_ok());
548 }
549
550 #[test]
551 fn test_empty_batch() {
552 let processor = BatchProcessor::new();
553 let empty: Vec<Bytes> = vec![];
554
555 let blocks = processor.create_blocks_parallel(empty).unwrap();
556 assert_eq!(blocks.len(), 0);
557 }
558
559 #[test]
560 fn test_compress_data_parallel() {
561 let processor = BatchProcessor::new();
562 let data = vec![
563 Bytes::from(vec![0u8; 1000]),
564 Bytes::from(vec![1u8; 1000]),
565 Bytes::from(vec![2u8; 1000]),
566 ];
567
568 let compressed = processor
570 .compress_data_parallel(data.clone(), CompressionAlgorithm::Zstd, 3)
571 .unwrap();
572 assert_eq!(compressed.len(), 3);
573
574 for (i, comp) in compressed.iter().enumerate() {
576 assert!(comp.len() < data[i].len());
577 }
578 }
579
580 #[test]
581 fn test_decompress_data_parallel() {
582 let processor = BatchProcessor::new();
583 let original = vec![Bytes::from(vec![0u8; 500]), Bytes::from(vec![1u8; 500])];
584
585 let compressed = processor
587 .compress_data_parallel(original.clone(), CompressionAlgorithm::Lz4, 3)
588 .unwrap();
589
590 let decompressed = processor
591 .decompress_data_parallel(compressed, CompressionAlgorithm::Lz4)
592 .unwrap();
593
594 assert_eq!(decompressed.len(), original.len());
595 for (i, decomp) in decompressed.iter().enumerate() {
596 assert_eq!(decomp, &original[i]);
597 }
598 }
599
600 #[test]
601 fn test_analyze_compression_ratios_parallel() {
602 let processor = BatchProcessor::new();
603 let data = vec![
604 Bytes::from(vec![0u8; 1000]), Bytes::from(vec![1u8; 1000]), ];
607
608 let ratios = processor
609 .analyze_compression_ratios_parallel(&data, CompressionAlgorithm::Zstd, 6)
610 .unwrap();
611
612 assert_eq!(ratios.len(), 2);
613
614 for ratio in &ratios {
616 assert!(*ratio >= 0.0 && *ratio <= 1.0);
617 }
618
619 for ratio in &ratios {
621 assert!(*ratio < 0.5);
622 }
623 }
624
625 #[test]
626 fn test_compression_with_none_algorithm() {
627 let processor = BatchProcessor::new();
628 let data = vec![Bytes::from("test data"), Bytes::from("more data")];
629
630 let compressed = processor
632 .compress_data_parallel(data.clone(), CompressionAlgorithm::None, 0)
633 .unwrap();
634
635 assert_eq!(compressed.len(), data.len());
636 for (i, comp) in compressed.iter().enumerate() {
637 assert_eq!(comp, &data[i]);
638 }
639 }
640
641 #[test]
642 fn test_batch_stats_compression() {
643 let mut stats = BatchStats::new();
644 stats.items_processed = 100;
645 stats.total_bytes = 10000;
646 stats.compressed_bytes = 5000;
647
648 assert_eq!(stats.compression_savings(), 5000);
650
651 assert!((stats.compression_efficiency() - 50.0).abs() < 0.01);
653 }
654
655 #[test]
656 fn test_batch_stats_no_compression() {
657 let stats = BatchStats::new();
658
659 assert_eq!(stats.compression_savings(), 0);
661 assert_eq!(stats.compression_efficiency(), 0.0);
662 }
663
664 #[test]
665 fn test_large_batch_compression() {
666 let processor = BatchProcessor::new();
667
668 let data: Vec<Bytes> = (0..100).map(|i| Bytes::from(vec![i as u8; 500])).collect();
670
671 let compressed = processor
672 .compress_data_parallel(data.clone(), CompressionAlgorithm::Zstd, 3)
673 .unwrap();
674
675 assert_eq!(compressed.len(), 100);
676
677 let decompressed = processor
679 .decompress_data_parallel(compressed, CompressionAlgorithm::Zstd)
680 .unwrap();
681
682 assert_eq!(decompressed, data);
683 }
684
685 #[test]
686 fn test_empty_compression_batch() {
687 let processor = BatchProcessor::new();
688 let empty: Vec<Bytes> = vec![];
689
690 let compressed = processor
691 .compress_data_parallel(empty, CompressionAlgorithm::Lz4, 3)
692 .unwrap();
693
694 assert_eq!(compressed.len(), 0);
695 }
696}