ipfrs_core/
batch.rs

1//! Batch processing utilities with parallel execution
2//!
3//! This module provides high-performance batch operations for processing
4//! multiple blocks, CIDs, and hashes in parallel using Rayon.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use ipfrs_core::batch::BatchProcessor;
10//! use bytes::Bytes;
11//!
12//! let data_chunks = vec![
13//!     Bytes::from("chunk 1"),
14//!     Bytes::from("chunk 2"),
15//!     Bytes::from("chunk 3"),
16//! ];
17//!
18//! // Create blocks in parallel
19//! let processor = BatchProcessor::new();
20//! let blocks = processor.create_blocks_parallel(data_chunks).unwrap();
21//! assert_eq!(blocks.len(), 3);
22//! ```
23
24use crate::error::{Error, Result};
25use crate::hash::global_hash_registry;
26use crate::{compress, compression_ratio, decompress, Block, BlockBuilder, Cid, CidBuilder};
27use crate::{CompressionAlgorithm, HashAlgorithm};
28use bytes::Bytes;
29use rayon::prelude::*;
30
31/// High-performance batch processor for parallel operations
32///
33/// Provides parallel processing of multiple blocks, CIDs, and hash computations
34/// using Rayon's thread pool.
35pub struct BatchProcessor {
36    hash_algorithm: HashAlgorithm,
37}
38
39impl BatchProcessor {
40    /// Create a new batch processor with default settings (SHA2-256)
41    pub fn new() -> Self {
42        Self {
43            hash_algorithm: HashAlgorithm::Sha256,
44        }
45    }
46
47    /// Create a batch processor with a specific hash algorithm
48    pub fn with_hash_algorithm(hash_algorithm: HashAlgorithm) -> Self {
49        Self { hash_algorithm }
50    }
51
52    /// Create multiple blocks in parallel from data chunks
53    ///
54    /// This is significantly faster than creating blocks sequentially
55    /// when processing many chunks.
56    pub fn create_blocks_parallel(&self, data_chunks: Vec<Bytes>) -> Result<Vec<Block>> {
57        let hash_algo = self.hash_algorithm;
58
59        data_chunks
60            .into_par_iter()
61            .map(|data| BlockBuilder::new().hash_algorithm(hash_algo).build(data))
62            .collect()
63    }
64
65    /// Generate CIDs in parallel for multiple data chunks
66    ///
67    /// Returns a vector of (data, CID) pairs.
68    pub fn generate_cids_parallel(&self, data_chunks: Vec<Bytes>) -> Result<Vec<(Bytes, Cid)>> {
69        let hash_algo = self.hash_algorithm;
70
71        data_chunks
72            .into_par_iter()
73            .map(|data| {
74                let cid = CidBuilder::new().hash_algorithm(hash_algo).build(&data)?;
75                Ok((data, cid))
76            })
77            .collect()
78    }
79
80    /// Verify multiple blocks in parallel
81    ///
82    /// Returns `Ok(())` if all blocks are valid, or an error for the first invalid block.
83    pub fn verify_blocks_parallel(&self, blocks: &[Block]) -> Result<()> {
84        let all_valid: Result<bool> = blocks
85            .par_iter()
86            .try_fold(
87                || true,
88                |acc, block| -> Result<bool> {
89                    let valid = block.verify()?;
90                    Ok(acc && valid)
91                },
92            )
93            .try_reduce(|| true, |a, b| -> Result<bool> { Ok(a && b) });
94
95        if all_valid? {
96            Ok(())
97        } else {
98            Err(Error::Verification(
99                "One or more blocks failed verification".into(),
100            ))
101        }
102    }
103
104    /// Compute hashes in parallel for multiple data chunks
105    ///
106    /// Returns a vector of hash digests.
107    pub fn compute_hashes_parallel(&self, data_chunks: &[&[u8]]) -> Result<Vec<Vec<u8>>> {
108        let code = self.hash_algorithm.code();
109
110        let registry = global_hash_registry();
111        let engine = registry.get(code).ok_or_else(|| {
112            Error::InvalidInput(format!(
113                "Hash algorithm {} not supported",
114                self.hash_algorithm.name()
115            ))
116        })?;
117
118        Ok(data_chunks
119            .par_iter()
120            .map(|data| engine.digest(data))
121            .collect())
122    }
123
124    /// Count total bytes across multiple blocks in parallel
125    pub fn total_bytes_parallel(&self, blocks: &[Block]) -> usize {
126        blocks.par_iter().map(|block| block.data().len()).sum()
127    }
128
129    /// Find blocks matching a predicate in parallel
130    pub fn filter_blocks_parallel<F>(&self, blocks: Vec<Block>, predicate: F) -> Vec<Block>
131    where
132        F: Fn(&Block) -> bool + Sync + Send,
133    {
134        blocks
135            .into_par_iter()
136            .filter(|block| predicate(block))
137            .collect()
138    }
139
140    /// Collect unique CIDs from blocks in parallel
141    pub fn unique_cids_parallel(&self, blocks: &[Block]) -> Vec<Cid> {
142        use std::collections::HashSet;
143        use std::sync::Mutex;
144
145        let seen = Mutex::new(HashSet::new());
146        let unique: Vec<Cid> = blocks
147            .par_iter()
148            .filter_map(|block| {
149                let cid = *block.cid();
150                let mut seen = seen.lock().unwrap();
151                if seen.insert(cid.to_string()) {
152                    Some(cid)
153                } else {
154                    None
155                }
156            })
157            .collect();
158
159        unique
160    }
161
162    /// Compress multiple data chunks in parallel
163    ///
164    /// Compresses each data chunk using the specified algorithm and level.
165    /// Returns a vector of compressed data, maintaining the same order as input.
166    ///
167    /// # Arguments
168    ///
169    /// * `data_chunks` - Vector of data chunks to compress
170    /// * `algorithm` - Compression algorithm to use
171    /// * `level` - Compression level (0-9)
172    ///
173    /// # Returns
174    ///
175    /// Vector of compressed data chunks
176    ///
177    /// # Example
178    ///
179    /// ```rust
180    /// use ipfrs_core::batch::BatchProcessor;
181    /// use ipfrs_core::CompressionAlgorithm;
182    /// use bytes::Bytes;
183    ///
184    /// let processor = BatchProcessor::new();
185    /// let data = vec![
186    ///     Bytes::from(vec![0u8; 1000]),
187    ///     Bytes::from(vec![1u8; 1000]),
188    /// ];
189    ///
190    /// let compressed = processor.compress_data_parallel(
191    ///     data,
192    ///     CompressionAlgorithm::Zstd,
193    ///     3
194    /// ).unwrap();
195    /// assert_eq!(compressed.len(), 2);
196    /// ```
197    pub fn compress_data_parallel(
198        &self,
199        data_chunks: Vec<Bytes>,
200        algorithm: CompressionAlgorithm,
201        level: u8,
202    ) -> Result<Vec<Bytes>> {
203        data_chunks
204            .into_par_iter()
205            .map(|data| compress(&data, algorithm, level))
206            .collect()
207    }
208
209    /// Decompress multiple compressed chunks in parallel
210    ///
211    /// Decompresses each chunk using the specified algorithm.
212    /// Returns a vector of decompressed data, maintaining the same order as input.
213    ///
214    /// # Arguments
215    ///
216    /// * `compressed_chunks` - Vector of compressed data chunks
217    /// * `algorithm` - Compression algorithm that was used
218    ///
219    /// # Returns
220    ///
221    /// Vector of decompressed data chunks
222    ///
223    /// # Example
224    ///
225    /// ```rust
226    /// use ipfrs_core::batch::BatchProcessor;
227    /// use ipfrs_core::CompressionAlgorithm;
228    /// use bytes::Bytes;
229    ///
230    /// let processor = BatchProcessor::new();
231    /// let data = vec![Bytes::from(vec![0u8; 1000])];
232    ///
233    /// let compressed = processor.compress_data_parallel(
234    ///     data.clone(),
235    ///     CompressionAlgorithm::Lz4,
236    ///     3
237    /// ).unwrap();
238    ///
239    /// let decompressed = processor.decompress_data_parallel(
240    ///     compressed,
241    ///     CompressionAlgorithm::Lz4
242    /// ).unwrap();
243    /// assert_eq!(decompressed, data);
244    /// ```
245    pub fn decompress_data_parallel(
246        &self,
247        compressed_chunks: Vec<Bytes>,
248        algorithm: CompressionAlgorithm,
249    ) -> Result<Vec<Bytes>> {
250        compressed_chunks
251            .into_par_iter()
252            .map(|data| decompress(&data, algorithm))
253            .collect()
254    }
255
256    /// Analyze compression ratios for multiple data chunks in parallel
257    ///
258    /// Computes compression ratio estimates for each chunk.
259    /// Returns a vector of ratios (compressed_size / original_size), where lower is better.
260    ///
261    /// # Arguments
262    ///
263    /// * `data_chunks` - Vector of data chunks to analyze
264    /// * `algorithm` - Compression algorithm to use for estimation
265    /// * `level` - Compression level (0-9)
266    ///
267    /// # Returns
268    ///
269    /// Vector of compression ratios (0.0 to 1.0, where 0.5 means 50% size reduction)
270    ///
271    /// # Example
272    ///
273    /// ```rust
274    /// use ipfrs_core::batch::BatchProcessor;
275    /// use ipfrs_core::CompressionAlgorithm;
276    /// use bytes::Bytes;
277    ///
278    /// let processor = BatchProcessor::new();
279    /// let data = vec![
280    ///     Bytes::from(vec![0u8; 1000]), // Highly compressible
281    ///     Bytes::from(vec![1u8; 1000]), // Highly compressible
282    /// ];
283    ///
284    /// let ratios = processor.analyze_compression_ratios_parallel(
285    ///     &data,
286    ///     CompressionAlgorithm::Zstd,
287    ///     3
288    /// ).unwrap();
289    /// assert_eq!(ratios.len(), 2);
290    /// // Repetitive data should compress well (ratio < 0.5)
291    /// assert!(ratios[0] < 0.5);
292    /// ```
293    pub fn analyze_compression_ratios_parallel(
294        &self,
295        data_chunks: &[Bytes],
296        algorithm: CompressionAlgorithm,
297        level: u8,
298    ) -> Result<Vec<f64>> {
299        data_chunks
300            .par_iter()
301            .map(|data| compression_ratio(data, algorithm, level))
302            .collect()
303    }
304}
305
306impl Default for BatchProcessor {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Statistics for batch operations
313#[derive(Debug, Clone, PartialEq)]
314pub struct BatchStats {
315    /// Number of items processed
316    pub items_processed: usize,
317    /// Total bytes processed
318    pub total_bytes: usize,
319    /// Number of unique CIDs
320    pub unique_cids: usize,
321    /// Number of failed items
322    pub failed_items: usize,
323    /// Total bytes after compression (0 if not compressed)
324    pub compressed_bytes: usize,
325    /// Average compression ratio (0.0 if not compressed)
326    pub avg_compression_ratio: f64,
327}
328
329impl BatchStats {
330    /// Create new batch statistics
331    pub fn new() -> Self {
332        Self {
333            items_processed: 0,
334            total_bytes: 0,
335            unique_cids: 0,
336            failed_items: 0,
337            compressed_bytes: 0,
338            avg_compression_ratio: 0.0,
339        }
340    }
341
342    /// Calculate deduplication ratio (0.0 = no dedup, 1.0 = all duplicates)
343    pub fn dedup_ratio(&self) -> f64 {
344        if self.items_processed == 0 {
345            return 0.0;
346        }
347        1.0 - (self.unique_cids as f64 / self.items_processed as f64)
348    }
349
350    /// Calculate success rate (0.0 to 1.0)
351    pub fn success_rate(&self) -> f64 {
352        if self.items_processed == 0 {
353            return 1.0;
354        }
355        let successful = self.items_processed - self.failed_items;
356        successful as f64 / self.items_processed as f64
357    }
358
359    /// Calculate compression savings in bytes
360    ///
361    /// Returns the number of bytes saved by compression.
362    /// Positive values indicate compression saved space.
363    pub fn compression_savings(&self) -> i64 {
364        if self.compressed_bytes == 0 {
365            return 0;
366        }
367        self.total_bytes as i64 - self.compressed_bytes as i64
368    }
369
370    /// Calculate compression efficiency percentage (0.0 to 100.0)
371    ///
372    /// Returns the percentage of space saved by compression.
373    /// For example, 50.0 means the compressed data is 50% smaller.
374    pub fn compression_efficiency(&self) -> f64 {
375        if self.total_bytes == 0 || self.compressed_bytes == 0 {
376            return 0.0;
377        }
378        (1.0 - (self.compressed_bytes as f64 / self.total_bytes as f64)) * 100.0
379    }
380}
381
382impl Default for BatchStats {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_create_blocks_parallel() {
394        let processor = BatchProcessor::new();
395        let chunks = vec![
396            Bytes::from("chunk 1"),
397            Bytes::from("chunk 2"),
398            Bytes::from("chunk 3"),
399        ];
400
401        let blocks = processor.create_blocks_parallel(chunks).unwrap();
402        assert_eq!(blocks.len(), 3);
403
404        // Verify all blocks are valid
405        for block in &blocks {
406            assert!(block.verify().is_ok());
407        }
408    }
409
410    #[test]
411    fn test_generate_cids_parallel() {
412        let processor = BatchProcessor::new();
413        let chunks = vec![
414            Bytes::from("data 1"),
415            Bytes::from("data 2"),
416            Bytes::from("data 3"),
417        ];
418
419        let results = processor.generate_cids_parallel(chunks.clone()).unwrap();
420        assert_eq!(results.len(), 3);
421
422        // Verify data matches
423        for (i, (data, _cid)) in results.iter().enumerate() {
424            assert_eq!(data, &chunks[i]);
425        }
426    }
427
428    #[test]
429    fn test_verify_blocks_parallel() {
430        let processor = BatchProcessor::new();
431        let chunks = vec![Bytes::from("test 1"), Bytes::from("test 2")];
432
433        let blocks = processor.create_blocks_parallel(chunks).unwrap();
434        assert!(processor.verify_blocks_parallel(&blocks).is_ok());
435    }
436
437    #[test]
438    fn test_compute_hashes_parallel() {
439        let processor = BatchProcessor::new();
440        let data: Vec<&[u8]> = vec![b"hash1", b"hash2", b"hash3"];
441
442        let hashes = processor.compute_hashes_parallel(&data).unwrap();
443        assert_eq!(hashes.len(), 3);
444
445        // All hashes should be 32 bytes (SHA-256)
446        for hash in &hashes {
447            assert_eq!(hash.len(), 32);
448        }
449
450        // Same input should produce same hash
451        let hashes2 = processor.compute_hashes_parallel(&data).unwrap();
452        assert_eq!(hashes, hashes2);
453    }
454
455    #[test]
456    fn test_total_bytes_parallel() {
457        let processor = BatchProcessor::new();
458        let chunks = vec![
459            Bytes::from("12345"),      // 5 bytes
460            Bytes::from("1234567890"), // 10 bytes
461            Bytes::from("123"),        // 3 bytes
462        ];
463
464        let blocks = processor.create_blocks_parallel(chunks).unwrap();
465        let total = processor.total_bytes_parallel(&blocks);
466        assert_eq!(total, 18);
467    }
468
469    #[test]
470    fn test_filter_blocks_parallel() {
471        let processor = BatchProcessor::new();
472        let chunks = vec![
473            Bytes::from("short"),
474            Bytes::from("this is a longer chunk"),
475            Bytes::from("tiny"),
476        ];
477
478        let blocks = processor.create_blocks_parallel(chunks).unwrap();
479
480        // Filter blocks with data length > 10
481        let filtered = processor.filter_blocks_parallel(blocks, |block| block.data().len() > 10);
482
483        assert_eq!(filtered.len(), 1);
484        assert!(filtered[0].data().len() > 10);
485    }
486
487    #[test]
488    fn test_unique_cids_parallel() {
489        let processor = BatchProcessor::new();
490        let chunks = vec![
491            Bytes::from("unique1"),
492            Bytes::from("unique2"),
493            Bytes::from("unique1"), // duplicate
494            Bytes::from("unique3"),
495        ];
496
497        let blocks = processor.create_blocks_parallel(chunks).unwrap();
498        let unique = processor.unique_cids_parallel(&blocks);
499
500        assert_eq!(unique.len(), 3); // 3 unique CIDs
501    }
502
503    #[test]
504    fn test_batch_stats() {
505        let mut stats = BatchStats::new();
506        assert_eq!(stats.dedup_ratio(), 0.0);
507        assert_eq!(stats.success_rate(), 1.0);
508
509        stats.items_processed = 10;
510        stats.unique_cids = 7;
511        stats.failed_items = 1;
512
513        // Use approximate comparison for floating point
514        assert!((stats.dedup_ratio() - 0.3).abs() < 0.0001);
515        assert!((stats.success_rate() - 0.9).abs() < 0.0001);
516    }
517
518    #[test]
519    fn test_with_different_hash_algorithms() {
520        let processor_sha256 = BatchProcessor::with_hash_algorithm(HashAlgorithm::Sha256);
521        let processor_sha3 = BatchProcessor::with_hash_algorithm(HashAlgorithm::Sha3_256);
522
523        let data = vec![Bytes::from("test data")];
524
525        let blocks_sha256 = processor_sha256
526            .create_blocks_parallel(data.clone())
527            .unwrap();
528        let blocks_sha3 = processor_sha3.create_blocks_parallel(data).unwrap();
529
530        // Different hash algorithms should produce different CIDs
531        assert_ne!(blocks_sha256[0].cid(), blocks_sha3[0].cid());
532    }
533
534    #[test]
535    fn test_large_batch_performance() {
536        let processor = BatchProcessor::new();
537
538        // Create 1000 small chunks
539        let chunks: Vec<Bytes> = (0..1000)
540            .map(|i| Bytes::from(format!("chunk {}", i)))
541            .collect();
542
543        let blocks = processor.create_blocks_parallel(chunks).unwrap();
544        assert_eq!(blocks.len(), 1000);
545
546        // Verify all in parallel
547        assert!(processor.verify_blocks_parallel(&blocks).is_ok());
548    }
549
550    #[test]
551    fn test_empty_batch() {
552        let processor = BatchProcessor::new();
553        let empty: Vec<Bytes> = vec![];
554
555        let blocks = processor.create_blocks_parallel(empty).unwrap();
556        assert_eq!(blocks.len(), 0);
557    }
558
559    #[test]
560    fn test_compress_data_parallel() {
561        let processor = BatchProcessor::new();
562        let data = vec![
563            Bytes::from(vec![0u8; 1000]),
564            Bytes::from(vec![1u8; 1000]),
565            Bytes::from(vec![2u8; 1000]),
566        ];
567
568        // Test Zstd compression
569        let compressed = processor
570            .compress_data_parallel(data.clone(), CompressionAlgorithm::Zstd, 3)
571            .unwrap();
572        assert_eq!(compressed.len(), 3);
573
574        // Compressed data should be smaller than original for repetitive data
575        for (i, comp) in compressed.iter().enumerate() {
576            assert!(comp.len() < data[i].len());
577        }
578    }
579
580    #[test]
581    fn test_decompress_data_parallel() {
582        let processor = BatchProcessor::new();
583        let original = vec![Bytes::from(vec![0u8; 500]), Bytes::from(vec![1u8; 500])];
584
585        // Compress then decompress
586        let compressed = processor
587            .compress_data_parallel(original.clone(), CompressionAlgorithm::Lz4, 3)
588            .unwrap();
589
590        let decompressed = processor
591            .decompress_data_parallel(compressed, CompressionAlgorithm::Lz4)
592            .unwrap();
593
594        assert_eq!(decompressed.len(), original.len());
595        for (i, decomp) in decompressed.iter().enumerate() {
596            assert_eq!(decomp, &original[i]);
597        }
598    }
599
600    #[test]
601    fn test_analyze_compression_ratios_parallel() {
602        let processor = BatchProcessor::new();
603        let data = vec![
604            Bytes::from(vec![0u8; 1000]), // Highly compressible
605            Bytes::from(vec![1u8; 1000]), // Highly compressible
606        ];
607
608        let ratios = processor
609            .analyze_compression_ratios_parallel(&data, CompressionAlgorithm::Zstd, 6)
610            .unwrap();
611
612        assert_eq!(ratios.len(), 2);
613
614        // Ratios should be between 0.0 and 1.0
615        for ratio in &ratios {
616            assert!(*ratio >= 0.0 && *ratio <= 1.0);
617        }
618
619        // Repetitive data should have good compression ratio (< 0.5)
620        for ratio in &ratios {
621            assert!(*ratio < 0.5);
622        }
623    }
624
625    #[test]
626    fn test_compression_with_none_algorithm() {
627        let processor = BatchProcessor::new();
628        let data = vec![Bytes::from("test data"), Bytes::from("more data")];
629
630        // None algorithm should return data unchanged
631        let compressed = processor
632            .compress_data_parallel(data.clone(), CompressionAlgorithm::None, 0)
633            .unwrap();
634
635        assert_eq!(compressed.len(), data.len());
636        for (i, comp) in compressed.iter().enumerate() {
637            assert_eq!(comp, &data[i]);
638        }
639    }
640
641    #[test]
642    fn test_batch_stats_compression() {
643        let mut stats = BatchStats::new();
644        stats.items_processed = 100;
645        stats.total_bytes = 10000;
646        stats.compressed_bytes = 5000;
647
648        // Test compression savings
649        assert_eq!(stats.compression_savings(), 5000);
650
651        // Test compression efficiency (50% reduction)
652        assert!((stats.compression_efficiency() - 50.0).abs() < 0.01);
653    }
654
655    #[test]
656    fn test_batch_stats_no_compression() {
657        let stats = BatchStats::new();
658
659        // With no compression data, savings should be 0
660        assert_eq!(stats.compression_savings(), 0);
661        assert_eq!(stats.compression_efficiency(), 0.0);
662    }
663
664    #[test]
665    fn test_large_batch_compression() {
666        let processor = BatchProcessor::new();
667
668        // Create 100 chunks of compressible data
669        let data: Vec<Bytes> = (0..100).map(|i| Bytes::from(vec![i as u8; 500])).collect();
670
671        let compressed = processor
672            .compress_data_parallel(data.clone(), CompressionAlgorithm::Zstd, 3)
673            .unwrap();
674
675        assert_eq!(compressed.len(), 100);
676
677        // Verify roundtrip
678        let decompressed = processor
679            .decompress_data_parallel(compressed, CompressionAlgorithm::Zstd)
680            .unwrap();
681
682        assert_eq!(decompressed, data);
683    }
684
685    #[test]
686    fn test_empty_compression_batch() {
687        let processor = BatchProcessor::new();
688        let empty: Vec<Bytes> = vec![];
689
690        let compressed = processor
691            .compress_data_parallel(empty, CompressionAlgorithm::Lz4, 3)
692            .unwrap();
693
694        assert_eq!(compressed.len(), 0);
695    }
696}