hexz_core/algo/dedup/cdc.rs
1//! Content-Defined Chunking (CDC) for deduplication analysis.
2//!
3//! This module implements the FastCDC (Fast Content-Defined Chunking) algorithm,
4//! a state-of-the-art variable-length chunking algorithm that splits data streams
5//! into content-defined chunks for efficient deduplication. Unlike fixed-size
6//! chunking, CDC adapts chunk boundaries to data content, enabling detection of
7//! duplicate blocks even when shifted or surrounded by different data.
8//!
9//! # Why Content-Defined Chunking?
10//!
11//! Traditional fixed-size chunking fails to handle the "boundary shift problem":
12//! inserting or deleting even a single byte causes all subsequent chunks to shift,
13//! preventing duplicate detection. CDC solves this by:
14//!
15//! - **Content-based boundaries**: Chunk cuts occur at data-dependent positions
16//! - **Shift resilience**: Boundaries remain stable despite insertions/deletions
17//! - **Variable chunk sizes**: Adapts to data structure for optimal deduplication
18//!
19//! Example: Inserting 100 bytes at the start of a file with fixed 4KB chunks
20//! invalidates every chunk. With CDC, only the chunks containing the insertion
21//! are affected; chunks after the insertion point remain identical and deduplicatable.
22//!
23//! # FastCDC Algorithm Details
24//!
25//! FastCDC improves upon earlier CDC algorithms (Rabin fingerprinting, basic CDC)
26//! by using a normalized chunking approach that reduces chunk size variance while
27//! maintaining content-sensitivity.
28//!
29//! ## Rolling Hash Mechanism
30//!
31//! 1. **Sliding Window**: Maintains a window of `w` bytes (typically 48 bytes)
32//! 2. **Hash Computation**: Computes a rolling hash (Gear hash) over the window
33//! 3. **Cut-Point Detection**: When `hash & mask == 0`, mark a chunk boundary
34//! 4. **Normalization**: Uses different masks in different chunk regions to
35//! achieve more uniform chunk size distribution
36//!
37//! ## Gear Hash Function
38//!
39//! FastCDC uses the Gear hash, a simple rolling hash that provides:
40//! - **O(1) update time** per byte (critical for performance)
41//! - **Good avalanche properties** (small changes → large hash changes)
42//! - **CPU-friendly operations** (no expensive modulo or division)
43//!
44//! The Gear hash works by:
45//! 1. Maintaining a precomputed random 256-entry lookup table
46//! 2. For each byte `b`, XOR the current hash with `table[b]`
47//! 3. Shift the hash left or right (depending on variant)
48//! 4. Check if the hash satisfies the cut-point condition
49//!
50//! **Pseudocode:**
51//! ```text
52//! hash = 0
53//! for each byte b in window:
54//! hash = (hash << 1) ^ GEAR_TABLE[b]
55//! if (hash & mask) == 0:
56//! mark_chunk_boundary()
57//! ```
58//!
59//! The table lookup randomizes each byte's contribution, ensuring that
60//! different byte patterns produce uncorrelated hashes. This is critical
61//! for content-defined chunking to work correctly across diverse data types.
62//!
63//! ## Chunk Size Control
64//!
65//! Three parameters control chunk size distribution:
66//!
67//! - **`m` (min)**: Minimum chunk size (prevents tiny chunks, reduces overhead)
68//! - **`f` (bits)**: Fingerprint bits; average chunk size is `2^f` bytes
69//! - **`z` (max)**: Maximum chunk size (bounds worst-case memory usage)
70//!
71//! The algorithm enforces these constraints strictly:
72//! - Never cuts before `m` bytes
73//! - Always cuts at `z` bytes regardless of hash value
74//! - Statistically averages to `2^f` byte chunks
75//!
76//! ## Normalization Zones
77//!
78//! FastCDC divides each chunk into regions with different cut-point masks
79//! to achieve a more uniform chunk size distribution than basic CDC:
80//!
81//! - **[0, m)**: No cutting allowed (enforces minimum size)
82//! - **[m, avg)**: Relaxed mask (fewer bits required → higher cut probability)
83//! - **[avg, z)**: Standard mask (normal cut-point detection)
84//! - **[z, ∞)**: Force cut at maximum (enforces size bound)
85//!
86//! ### Mask Selection
87//!
88//! The masks are computed based on the number of trailing zero bits required
89//! in the hash value:
90//!
91//! - **Standard mask** (`[avg, z)`): Requires `f` trailing zero bits
92//! - Example: `f=14` → mask = `0x3FFF` → `hash & 0x3FFF == 0`
93//! - Cut probability: `1 / 2^14 ≈ 0.006%` per byte
94//! - Expected spacing: `2^14 = 16,384` bytes
95//!
96//! - **Relaxed mask** (`[m, avg)`): Requires `f-1` or `f-2` trailing zeros
97//! - Example: `f=14` → relaxed uses `f-2=12` → mask = `0xFFF`
98//! - Cut probability: `1 / 2^12 ≈ 0.024%` per byte (4× higher)
99//! - Encourages cuts closer to average, reducing long tails
100//!
101//! ### Effect on Distribution
102//!
103//! This normalization reduces the coefficient of variation (std dev / mean) in
104//! chunk sizes from ~1.0 (pure exponential) to ~0.6 (normalized exponential):
105//!
106//! | Distribution Type | Mean | Std Dev | CV |
107//! |-------------------|------|---------|-----|
108//! | Fixed-size | 16 KB | 0 KB | 0.0 |
109//! | Basic CDC | 16 KB | 16 KB | 1.0 |
110//! | FastCDC (normalized) | 16 KB | 10 KB | 0.6 |
111//!
112//! **Benefits:**
113//! - More consistent chunk sizes → better index density
114//! - Fewer outliers (very small/large chunks) → lower metadata overhead
115//! - Maintains content-sensitivity for deduplication effectiveness
116//!
117//! # Performance Characteristics
118//!
119//! ## Throughput
120//!
121//! - **Single-threaded**: 2.7 GB/s on modern CPUs (i7-14700K, validated via `cargo bench --bench cdc_chunking`)
122//! - **Bottleneck**: Rolling hash computation (CPU-bound, not I/O-bound)
123//! - **Comparison**: 4-5x faster than Rabin fingerprinting, 2x faster than basic CDC
124//! - **Overhead**: ~10× slower than fixed-size chunking (26 GB/s vs 2.7 GB/s), but acceptable for dedup benefits
125//!
126//! ## Parallelization
127//!
128//! FastCDC is fundamentally sequential (each byte depends on previous hash state),
129//! but can be parallelized across independent data streams:
130//!
131//! - **Multiple files**: Chunk each file in a separate thread/task
132//! - **Split input**: Pre-split large files at fixed boundaries (e.g., every 1GB)
133//! and chunk each segment independently (loses some cross-boundary dedup)
134//! - **Pipeline parallelism**: Chunk in one thread, compress in another, write
135//! in a third (producer-consumer pattern)
136//!
137//! **Note**: Parallel chunking of a single stream is possible using recursive
138//! chunking or sketch-based techniques, but adds significant complexity and is
139//! not implemented in this module
140//!
141//! ## Memory Usage
142//!
143//! - **Per-stream buffer**: `2 * z` bytes (default: ~128 KB for z=64KB)
144//! - **Analysis hash set**: ~8 bytes per unique chunk (amortized)
145//! - **Streaming overhead**: Negligible (no hash set)
146//!
147//! ## Chunk Size Distribution
148//!
149//! For typical data with `f=14` (16KB average, default):
150//! - **Mean**: ~16 KB (exactly 2^f)
151//! - **Median**: ~11 KB (skewed by exponential tail)
152//! - **Standard deviation**: ~10 KB (coefficient of variation ~0.6)
153//! - **Range**: [m, z] (typically [2KB, 64KB])
154//!
155//! # Usage Patterns
156//!
157//! This module provides two primary APIs:
158//!
159//! ## 1. Analysis Mode: `analyze_stream()`
160//!
161//! Performs a dry-run to estimate deduplication potential without storing chunks.
162//! Used by the DCAM model to optimize parameters before actual packing.
163//!
164//! - **Input**: Reader + parameters
165//! - **Output**: `CdcStats` (chunk counts, dedup ratio)
166//! - **Memory**: O(unique chunks) for hash set
167//! - **Use case**: Parameter optimization, capacity planning
168//!
169//! ## 2. Streaming Mode: `StreamChunker`
170//!
171//! Iterator that yields chunks on-demand during snapshot creation. Integrates
172//! with the packing pipeline for compression and encryption.
173//!
174//! - **Input**: Reader + parameters
175//! - **Output**: Iterator of `Vec<u8>` chunks
176//! - **Memory**: O(1) - fixed buffer size
177//! - **Use case**: Actual snapshot packing
178//!
179//! # Examples
180//!
181//! ## Basic Analysis
182//!
183//! ```no_run
184//! use hexz_core::algo::dedup::cdc::analyze_stream;
185//! use hexz_core::algo::dedup::dcam::DedupeParams;
186//! use std::fs::File;
187//!
188//! # fn main() -> std::io::Result<()> {
189//! let file = File::open("disk.raw")?;
190//! let params = DedupeParams::default(); // f=14, m=2KB, z=64KB
191//! let stats = analyze_stream(file, ¶ms)?;
192//!
193//! println!("Total chunks: {}", stats.chunk_count);
194//! println!("Unique chunks: {}", stats.unique_chunk_count);
195//! println!("Dedup ratio: {:.2}%",
196//! (1.0 - stats.unique_bytes as f64 /
197//! (stats.unique_bytes as f64 * stats.chunk_count as f64 /
198//! stats.unique_chunk_count as f64)) * 100.0);
199//! # Ok(())
200//! # }
201//! ```
202//!
203//! ## Streaming Chunks
204//!
205//! ```no_run
206//! use hexz_core::algo::dedup::cdc::StreamChunker;
207//! use hexz_core::algo::dedup::dcam::DedupeParams;
208//! use std::fs::File;
209//!
210//! # fn main() -> std::io::Result<()> {
211//! let file = File::open("memory.raw")?;
212//! let params = DedupeParams::default();
213//! let chunker = StreamChunker::new(file, params);
214//!
215//! for (i, chunk_result) in chunker.enumerate() {
216//! let chunk = chunk_result?;
217//! println!("Chunk {}: {} bytes", i, chunk.len());
218//! // Compress, encrypt, hash, store...
219//! }
220//! # Ok(())
221//! # }
222//! ```
223//!
224//! ## Parameter Comparison
225//!
226//! ```no_run
227//! use hexz_core::algo::dedup::cdc::analyze_stream;
228//! use hexz_core::algo::dedup::dcam::DedupeParams;
229//! use std::fs::File;
230//!
231//! # fn main() -> std::io::Result<()> {
232//! let data = std::fs::read("dataset.bin")?;
233//!
234//! // Small chunks (8KB average) - fine-grained dedup
235//! let mut params_small = DedupeParams::default();
236//! params_small.f = 13; // 2^13 = 8KB
237//! let stats_small = analyze_stream(&data[..], ¶ms_small)?;
238//!
239//! // Large chunks (32KB average) - coarse dedup, less overhead
240//! let mut params_large = DedupeParams::default();
241//! params_large.f = 15; // 2^15 = 32KB
242//! let stats_large = analyze_stream(&data[..], ¶ms_large)?;
243//!
244//! println!("Small chunks: {} unique / {} total",
245//! stats_small.unique_chunk_count, stats_small.chunk_count);
246//! println!("Large chunks: {} unique / {} total",
247//! stats_large.unique_chunk_count, stats_large.chunk_count);
248//! # Ok(())
249//! # }
250//! ```
251//!
252//! # Integration with Hexz
253//!
254//! CDC is integrated into the snapshot packing pipeline with multiple stages:
255//!
256//! ## Analysis Phase
257//!
258//! 1. **DCAM Analysis**: `analyze_stream()` performs dry-run chunking to estimate
259//! deduplication potential across different parameter configurations
260//! 2. **Parameter Selection**: DCAM formulas compute optimal `f` value based on
261//! observed change rates and metadata overhead tradeoffs
262//! 3. **Validation**: Verify parameters are within hardware constraints (chunk
263//! sizes fit in memory, hash set doesn't exceed available RAM)
264//!
265//! ## Packing Phase
266//!
267//! 4. **Streaming Chunking**: `StreamChunker` feeds variable-sized chunks to the
268//! compression pipeline (typically zstd or lz4)
269//! 5. **Deduplication Tracking**: Compute cryptographic hash (SHA-256 or BLAKE3)
270//! of each chunk and check against existing snapshot index
271//! 6. **Storage**: Write unique chunks to pack files; record chunk metadata
272//! (offset, compressed size, hash) in snapshot index
273//!
274//! ## Unpacking Phase
275//!
276//! 7. **Reconstruction**: Read snapshot index to determine chunk sequence
277//! 8. **Decompression**: Fetch and decompress chunks from pack files
278//! 9. **Assembly**: Concatenate chunks in original order to reconstruct snapshot
279//!
280//! ## Error Handling
281//!
282//! - **I/O Errors**: Propagated immediately, abort operation
283//! - **Hash Collisions**: Detected during unpacking (content verification fails),
284//! triggers integrity error and potential re-read or fallback
285//! - **Corrupted Chunks**: Compression errors detected during decompression,
286//! logged and may trigger snapshot re-creation if critical
287//!
288//! ## Edge Cases
289//!
290//! - **Empty input**: Returns zero chunks (not an error)
291//! - **Tiny files** (< min_size): Single chunk containing entire file
292//! - **Huge files** (> max_size × chunk_count): Automatic chunking prevents
293//! unbounded memory growth
294//! - **Adversarial input**: Maximum chunk size enforced regardless of hash values
295//!
296//! # Limitations and Considerations
297//!
298//! ## Hash Collision Handling
299//!
300//! The `analyze_stream()` function uses the first 8 bytes of BLAKE3 as a 64-bit
301//! hash. This provides a birthday bound at ~2^32 ≈ 4 billion unique chunks,
302//! which is more than sufficient for analysis workloads:
303//!
304//! - **Negligible collision risk**: Even at 100M unique chunks, P(collision) < 0.1%
305//! - **Not acceptable for storage**: Actual packing uses the full 256-bit BLAKE3
306//! hash via the dedup hash table to avoid data corruption
307//!
308//! ## Small File Overhead
309//!
310//! For files smaller than `min_size`, CDC produces a single chunk containing
311//! the entire file. This is optimal behavior but means:
312//!
313//! - No deduplication benefit for very small files
314//! - Metadata overhead (hash + pointer) is proportionally higher
315//! - Consider alternative strategies for small file workloads (e.g., tar packing)
316//!
317//! ## Boundary Shift Resilience
318//!
319//! While CDC is resilient to insertions/deletions within data, it is NOT
320//! resilient to:
321//!
322//! - **Encryption**: Changes entire content, prevents all deduplication
323//! - **Compression**: Similar effect to encryption for CDC purposes
324//! - **Byte-level reordering**: Shuffles content, breaks chunk boundaries
325//!
326//! **Recommendation**: Apply CDC before compression/encryption, not after.
327//!
328//! ## Adversarial Input
329//!
330//! An attacker could craft input that produces:
331//!
332//! - **All max-size chunks**: Content designed to never trigger hash cut-points
333//! (e.g., all zeros, which may or may not trigger depending on hash function)
334//! - **All min-size chunks**: Content that triggers cut-points too frequently
335//! - **Hash flooding**: Attempt to exhaust hash set memory in `analyze_stream()`
336//!
337//! The `max_size` enforcement prevents unbounded chunk growth, but adversarial
338//! input can still degrade deduplication effectiveness. For untrusted input,
339//! consider additional validation or rate limiting.
340//!
341//! ## Determinism Caveats
342//!
343//! Chunking is deterministic given:
344//! - Same input bytes (bit-identical)
345//! - Same parameters (`f`, `m`, `z`)
346//! - Same fastcdc library version
347//!
348//! However, updating the fastcdc library may change internal hash tables or
349//! algorithms, causing chunk boundaries to shift. This breaks cross-version
350//! deduplication. **Mitigation**: Include CDC version in snapshot metadata.
351//!
352//! # References
353//!
354//! - Xia, W. et al. "FastCDC: a Fast and Efficient Content-Defined Chunking
355//! Approach for Data Deduplication." USENIX ATC 2016.
356//! [https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia](https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia)
357//! - Muthitacharoen, A. et al. "A Low-bandwidth Network File System." SOSP 2001.
358//! (Original Rabin fingerprinting CDC)
359//! - Randall et al. "Optimizing Deduplication Parameters via a Change-Estimation
360//! Analytical Model." 2025. (DCAM integration)
361
362use crate::algo::dedup::dcam::DedupeParams;
363use std::collections::HashSet;
364use std::io::{self, Read};
365
366/// Statistics from a CDC deduplication analysis run.
367///
368/// This structure captures metrics from a content-defined chunking analysis pass,
369/// providing insights into chunk distribution, deduplication effectiveness, and
370/// potential storage savings.
371///
372/// # Use Cases
373///
374/// - **DCAM Model Input**: Feeds into analytical formulas to predict optimal parameters
375/// - **Capacity Planning**: Estimates actual storage requirements after deduplication
376/// - **Performance Analysis**: Evaluates chunk size distribution and variance
377/// - **Comparison Studies**: Benchmarks different parameter configurations
378///
379/// # Invariants
380///
381/// The following relationships always hold for valid statistics:
382///
383/// - `unique_chunk_count <= chunk_count` (can't have more unique than total)
384/// - `unique_bytes <= chunk_count * max_chunk_size` (bounded by chunk constraints)
385/// - `unique_bytes >= unique_chunk_count * min_chunk_size` (minimum size bound)
386/// - `total_bytes` may be 0 (not tracked in streaming mode)
387///
388/// # Calculating Deduplication Metrics
389///
390/// ## Deduplication Ratio
391///
392/// The deduplication ratio represents what fraction of data is unique:
393///
394/// ```text
395/// dedup_ratio = unique_bytes / total_bytes_before_dedup
396/// ```
397///
398/// Where `total_bytes_before_dedup` (the original data size) can be estimated from:
399/// ```text
400/// total_bytes_before_dedup ≈ unique_bytes × (chunk_count / unique_chunk_count)
401/// ```
402///
403/// **Interpretation:**
404/// - Ratio of **0.5** means 50% of data is unique → **50% storage savings**
405/// - Ratio of **1.0** means 100% unique (no duplicates) → **0% savings**
406/// - Ratio of **0.3** means 30% unique → **70% savings**
407///
408/// ## Deduplication Factor
409///
410/// An alternative metric is the deduplication factor (multiplicative savings):
411///
412/// ```text
413/// dedup_factor = chunk_count / unique_chunk_count
414/// ```
415///
416/// **Interpretation:**
417/// - Factor of **1.0x** means no deduplication occurred
418/// - Factor of **2.0x** means data compressed by half (50% savings)
419/// - Factor of **10.0x** means 90% of chunks were duplicates
420///
421/// ## Average Chunk Size
422///
423/// The realized average chunk size (may differ from `2^f` for small datasets):
424///
425/// ```text
426/// avg_chunk_size = unique_bytes / unique_chunk_count
427/// ```
428///
429/// # Examples
430///
431/// ```rust
432/// # use hexz_core::algo::dedup::cdc::CdcStats;
433/// let stats = CdcStats {
434/// total_bytes: 0, // Not tracked (estimated below)
435/// unique_bytes: 50_000_000,
436/// chunk_count: 10_000,
437/// unique_chunk_count: 3_000,
438/// };
439///
440/// // Estimate original size
441/// let estimated_original = stats.unique_bytes * stats.chunk_count / stats.unique_chunk_count;
442/// assert_eq!(estimated_original, 166_666_666); // ~167 MB original
443///
444/// // Calculate deduplication ratio (fraction unique)
445/// let dedup_ratio = stats.unique_bytes as f64 / estimated_original as f64;
446/// assert!((dedup_ratio - 0.3).abs() < 0.01); // ~30% unique
447///
448/// // Calculate storage savings (percent eliminated)
449/// let savings_percent = (1.0 - dedup_ratio) * 100.0;
450/// assert!((savings_percent - 70.0).abs() < 1.0); // ~70% savings
451///
452/// // Calculate deduplication factor (compression ratio)
453/// let dedup_factor = stats.chunk_count as f64 / stats.unique_chunk_count as f64;
454/// assert!((dedup_factor - 3.33).abs() < 0.01); // ~3.33x compression
455///
456/// // Average chunk size
457/// let avg_chunk_size = stats.unique_bytes / stats.unique_chunk_count;
458/// println!("Average unique chunk size: {} bytes", avg_chunk_size);
459/// ```
460#[derive(Debug)]
461pub struct CdcStats {
462 /// Total bytes processed from the input stream.
463 ///
464 /// **Note**: This field is currently not tracked during streaming analysis
465 /// (always set to 0) to avoid memory overhead. It may be populated in future
466 /// versions if needed for enhanced metrics.
467 ///
468 /// To estimate total bytes, use:
469 /// ```text
470 /// total_bytes ≈ unique_bytes * (chunk_count / unique_chunk_count)
471 /// ```
472 pub total_bytes: u64,
473
474 /// Number of unique bytes after deduplication.
475 ///
476 /// This is the sum of sizes of all unique chunks (first occurrence only).
477 /// Represents the actual storage space required if deduplication is applied.
478 ///
479 /// # Interpretation
480 ///
481 /// - Lower values indicate higher redundancy in the dataset
482 /// - Equals total data size if no duplicates exist
483 /// - Bounded by `unique_chunk_count * min_chunk_size` and
484 /// `unique_chunk_count * max_chunk_size`
485 pub unique_bytes: u64,
486
487 /// Total number of chunks identified by FastCDC.
488 ///
489 /// Includes both unique and duplicate chunks. This represents how many chunks
490 /// would be created if the data were processed through the chunking pipeline.
491 ///
492 /// # Expected Values
493 ///
494 /// For a dataset of size `N` bytes with average chunk size `2^f`:
495 /// ```text
496 /// chunk_count ≈ N / (2^f)
497 /// ```
498 ///
499 /// Example: 1GB file with f=14 (16KB avg) → ~65,536 chunks
500 pub chunk_count: u64,
501
502 /// Number of unique chunks after deduplication.
503 ///
504 /// This counts only distinct chunks (based on CRC32 hash comparison).
505 /// Duplicate chunks are not counted in this total.
506 ///
507 /// # Interpretation
508 ///
509 /// - `unique_chunk_count == chunk_count` → No deduplication (all unique)
510 /// - `unique_chunk_count << chunk_count` → High deduplication (many duplicates)
511 /// - Ratio `unique_chunk_count / chunk_count` indicates dedup effectiveness
512 ///
513 /// # Hash Collision Note
514 ///
515 /// Uses the first 8 bytes of BLAKE3 as a 64-bit hash, providing negligible
516 /// collision probability (birthday bound at ~2^32 ≈ 4 billion unique chunks).
517 pub unique_chunk_count: u64,
518}
519
520/// Streaming iterator that yields content-defined chunks from a reader.
521///
522/// `StreamChunker` is the primary interface for applying FastCDC to data streams
523/// in a memory-efficient manner. It reads data incrementally from any `Read`
524/// source and applies content-defined chunking to produce variable-sized chunks
525/// suitable for compression, deduplication, and storage.
526///
527/// # Design Goals
528///
529/// - **Streaming**: Process arbitrarily large files without loading entire contents
530/// - **Zero-copy (internal)**: Minimize data copying within the chunker
531/// - **Bounded memory**: Fixed buffer size regardless of input size
532/// - **Deterministic**: Same input always produces identical chunk boundaries
533/// - **Iterator-based**: Idiomatic Rust API that composes with other iterators
534///
535/// # Buffering Strategy
536///
537/// The chunker maintains a fixed-size internal buffer to handle the mismatch
538/// between read boundaries (determined by the OS/filesystem) and chunk boundaries
539/// (determined by content):
540///
541/// - **Buffer size**: `2 * max_chunk_size` (default: ~128 KB)
542/// - **Refill trigger**: When available data < `max_chunk_size`
543/// - **Shift threshold**: When cursor advances beyond buffer midpoint
544/// - **EOF handling**: Flushes remaining data as final chunk
545///
546/// ## Buffer Management Example
547///
548/// ```text
549/// Initial state (empty buffer):
550/// [__________________________________|__________________________________]
551/// 0 cursor 2*z
552///
553/// After reading (filled to cursor):
554/// [##################################|__________________________________]
555/// 0 cursor=filled 2*z
556///
557/// After consuming one chunk (cursor advances):
558/// [################XXXXXXXXXXXXXXXXXX|__________________________________]
559/// 0 consumed cursor 2*z
560/// (next chunk) filled
561///
562/// After shift (move unconsumed data to start):
563/// [XXXXXXXXXXXXXXXXXX________________|__________________________________]
564/// 0 cursor filled 2*z
565/// (new position)
566/// ```
567///
568/// # Chunk Boundary Guarantees
569///
570/// The chunker guarantees that:
571///
572/// 1. **Minimum size**: Every chunk (except possibly the last) is ≥ `min_size`
573/// 2. **Maximum size**: Every chunk is ≤ `max_size` (strictly enforced)
574/// 3. **Determinism**: Given the same input and parameters, chunk boundaries are identical
575/// 4. **Completeness**: All input bytes appear in exactly one chunk (no gaps, no overlaps)
576///
577/// # Performance Considerations
578///
579/// - **Memory allocation**: Each chunk is copied to an owned `Vec<u8>`. For
580/// zero-allocation iteration, consider using `StreamChunkerSlice` (if available)
581/// or process chunks in-place within the iterator.
582/// - **I/O pattern**: Reads in large blocks (up to `max_chunk_size`) to minimize
583/// syscalls. Works efficiently with buffered readers but not required.
584/// - **CPU cost**: Gear hash computation is the bottleneck (~500 MB/s). Consider
585/// parallel chunking of independent streams if throughput is critical.
586///
587/// # Examples
588///
589/// ## Basic Usage
590///
591/// ```no_run
592/// use hexz_core::algo::dedup::cdc::StreamChunker;
593/// use hexz_core::algo::dedup::dcam::DedupeParams;
594/// use std::fs::File;
595///
596/// # fn main() -> std::io::Result<()> {
597/// let file = File::open("data.bin")?;
598/// let params = DedupeParams::default();
599/// let chunker = StreamChunker::new(file, params);
600///
601/// for chunk_result in chunker {
602/// let chunk = chunk_result?;
603/// println!("Chunk: {} bytes", chunk.len());
604/// }
605/// # Ok(())
606/// # }
607/// ```
608///
609/// ## With Compression Pipeline
610///
611/// ```no_run
612/// use hexz_core::algo::dedup::cdc::StreamChunker;
613/// use hexz_core::algo::dedup::dcam::DedupeParams;
614/// use std::fs::File;
615/// use std::collections::HashMap;
616///
617/// # fn main() -> std::io::Result<()> {
618/// let file = File::open("memory.raw")?;
619/// let params = DedupeParams::default();
620/// let chunker = StreamChunker::new(file, params);
621///
622/// let mut dedup_map: HashMap<u64, usize> = HashMap::new();
623/// let mut unique_chunks = Vec::new();
624///
625/// for chunk_result in chunker {
626/// let chunk = chunk_result?;
627/// let hash = crc32fast::hash(&chunk) as u64;
628///
629/// if !dedup_map.contains_key(&hash) {
630/// let chunk_id = unique_chunks.len();
631/// dedup_map.insert(hash, chunk_id);
632/// // Compress and store chunk
633/// unique_chunks.push(chunk);
634/// }
635/// }
636///
637/// println!("Stored {} unique chunks", unique_chunks.len());
638/// # Ok(())
639/// # }
640/// ```
641///
642/// ## Custom Parameters
643///
644/// ```no_run
645/// use hexz_core::algo::dedup::cdc::StreamChunker;
646/// use hexz_core::algo::dedup::dcam::DedupeParams;
647/// use std::fs::File;
648///
649/// # fn main() -> std::io::Result<()> {
650/// let file = File::open("disk.raw")?;
651///
652/// // Custom parameters: larger chunks for better compression ratio
653/// let mut params = DedupeParams::default();
654/// params.f = 15; // 32KB average (was 16KB)
655/// params.m = 8192; // 8KB minimum (was 2KB)
656/// params.z = 131072; // 128KB maximum (was 64KB)
657///
658/// let chunker = StreamChunker::new(file, params);
659///
660/// let chunk_sizes: Vec<usize> = chunker
661/// .map(|res| res.map(|chunk| chunk.len()))
662/// .collect::<Result<_, _>>()?;
663///
664/// let avg_size: usize = chunk_sizes.iter().sum::<usize>() / chunk_sizes.len();
665/// println!("Average chunk size: {} bytes", avg_size);
666/// # Ok(())
667/// # }
668/// ```
669pub struct StreamChunker<R> {
670 /// Underlying data source.
671 ///
672 /// Can be any type implementing `Read`: files, network streams, memory
673 /// buffers, etc. The chunker makes no assumptions about the read pattern
674 /// or blocking behavior.
675 reader: R,
676
677 /// Internal buffer for staging data between reads and chunk boundaries.
678 ///
679 /// Size is `2 * max_chunk_size` to allow efficient buffering and shifting.
680 /// Allocated once during construction and reused for the entire stream.
681 buffer: Vec<u8>,
682
683 /// Current read position within the buffer.
684 ///
685 /// Points to the start of the next chunk to be yielded. Invariant:
686 /// `0 <= cursor <= filled <= buffer.len()`
687 cursor: usize,
688
689 /// Number of valid bytes currently in the buffer.
690 ///
691 /// Bytes in range `[0, filled)` contain valid data. Bytes in range
692 /// `[filled, buffer.len())` are unused scratch space.
693 filled: usize,
694
695 /// Minimum chunk size in bytes (FastCDC parameter `m`).
696 ///
697 /// No chunk will be smaller than this (except possibly the final chunk
698 /// if EOF is reached). Prevents tiny chunks that would increase metadata
699 /// overhead disproportionately.
700 min_size: usize,
701
702 /// Average chunk size in bytes (FastCDC parameter `2^f`).
703 ///
704 /// This is the statistical expectation of chunk sizes. Actual chunks follow
705 /// a bounded exponential distribution around this value due to the rolling
706 /// hash cut-point probability.
707 avg_size: usize,
708
709 /// Maximum chunk size in bytes (FastCDC parameter `z`).
710 ///
711 /// Chunks are forcibly cut at this size regardless of hash values. Bounds
712 /// worst-case memory usage and ensures progress even in adversarial inputs
713 /// (e.g., runs of zeros that never trigger hash-based cuts).
714 max_size: usize,
715
716 /// Whether EOF has been reached on the underlying reader.
717 ///
718 /// Once set to `true`, no further reads will be attempted. The chunker
719 /// will drain remaining buffered data and then terminate iteration.
720 eof: bool,
721}
722
723impl<R: Read> StreamChunker<R> {
724 /// Creates a new streaming chunker with the specified deduplication parameters.
725 ///
726 /// This constructor initializes the chunker's internal buffer and extracts the
727 /// relevant FastCDC parameters. The chunker is ready to begin iteration immediately
728 /// after construction; no separate initialization step is required.
729 ///
730 /// # Parameters
731 ///
732 /// - `reader`: Data source implementing `Read`. This can be:
733 /// - `File` for reading from disk
734 /// - `Cursor<Vec<u8>>` for in-memory data
735 /// - `BufReader<File>` for buffered I/O (redundant but not harmful)
736 /// - Any other `Read` implementation (network streams, pipes, etc.)
737 ///
738 /// - `params`: FastCDC parameters controlling chunk size distribution. Key fields:
739 /// - `params.f`: Fingerprint bits (average chunk size = 2^f)
740 /// - `params.m`: Minimum chunk size in bytes
741 /// - `params.z`: Maximum chunk size in bytes
742 /// - `params.w`: Rolling hash window size (informational, not used here)
743 ///
744 /// # Returns
745 ///
746 /// A new `StreamChunker` ready to yield chunks via iteration. The chunker
747 /// takes ownership of the reader and will consume it as iteration proceeds.
748 ///
749 /// # Memory Allocation
750 ///
751 /// Allocates a buffer of `2 * params.z` bytes (or 2 MB, whichever is larger).
752 /// This is a one-time allocation that persists for the entire stream:
753 ///
754 /// - Default settings (`z=64KB`): **~128 KB** per chunker
755 /// - Large chunks (`z=128KB`): **~256 KB** per chunker
756 /// - Small chunks (`z=16KB`): **~2 MB** per chunker (due to 2MB minimum)
757 ///
758 /// The 2MB minimum prevents excessive refill operations for small chunk sizes.
759 ///
760 /// # Panics
761 ///
762 /// May panic if memory allocation fails (extremely rare on modern systems with
763 /// virtual memory, but possible in constrained environments). For default parameters
764 /// this allocates ~128 KB, well within typical stack/heap limits.
765 ///
766 /// # Performance Notes
767 ///
768 /// - **No I/O in constructor**: Data reading is deferred until first `next()` call
769 /// - **Deterministic**: Given the same input and parameters, produces identical chunks
770 /// - **Thread-safe (if R is)**: The chunker itself has no shared state
771 ///
772 /// # Examples
773 ///
774 /// ## Default Parameters
775 ///
776 /// ```no_run
777 /// use hexz_core::algo::dedup::cdc::StreamChunker;
778 /// use hexz_core::algo::dedup::dcam::DedupeParams;
779 /// use std::fs::File;
780 ///
781 /// # fn main() -> std::io::Result<()> {
782 /// let file = File::open("data.bin")?;
783 /// let chunker = StreamChunker::new(file, DedupeParams::default());
784 /// // Chunker is ready to iterate
785 /// # Ok(())
786 /// # }
787 /// ```
788 ///
789 /// ## Custom Parameters
790 ///
791 /// ```no_run
792 /// use hexz_core::algo::dedup::cdc::StreamChunker;
793 /// use hexz_core::algo::dedup::dcam::DedupeParams;
794 /// use std::fs::File;
795 ///
796 /// # fn main() -> std::io::Result<()> {
797 /// let file = File::open("disk.raw")?;
798 /// let params = DedupeParams {
799 /// f: 15, // 32KB average
800 /// m: 4096, // 4KB minimum
801 /// z: 131072, // 128KB maximum
802 /// w: 48, // Window size (not used by StreamChunker)
803 /// v: 16, // Metadata overhead (not used by StreamChunker)
804 /// };
805 /// let chunker = StreamChunker::new(file, params);
806 /// # Ok(())
807 /// # }
808 /// ```
809 pub fn new(reader: R, params: DedupeParams) -> Self {
810 // Buffer needs to be at least max_size.
811 // We use 2 * max_size to allow for shifting and efficient reading.
812 let buf_size = (params.z as usize).max(1024 * 1024) * 2;
813 Self {
814 reader,
815 buffer: vec![0u8; buf_size],
816 cursor: 0,
817 filled: 0,
818 min_size: params.m as usize,
819 avg_size: 1 << params.f,
820 max_size: params.z as usize,
821 eof: false,
822 }
823 }
824
825 /// Refills the internal buffer from the reader.
826 ///
827 /// This private method manages the buffer state to maintain a continuous window
828 /// of data for chunking. It performs buffer compaction (shifting) and reads new
829 /// data to maximize available space for chunk detection.
830 ///
831 /// # Algorithm
832 ///
833 /// 1. **Shift unprocessed data**: If `cursor > 0`, move bytes `[cursor..filled)`
834 /// to the start of the buffer to reclaim space
835 /// 2. **Read new data**: Call `reader.read()` repeatedly to fill available space
836 /// 3. **Update EOF flag**: Set `eof = true` when reader returns 0 bytes
837 /// 4. **Early exit**: Stop reading once buffer contains ≥ `max_chunk_size` bytes
838 ///
839 /// # Buffer State Before/After
840 ///
841 /// Before:
842 /// ```text
843 /// [consumed|unprocessed______|____________available____________]
844 /// 0 cursor filled capacity
845 /// ```
846 ///
847 /// After shift:
848 /// ```text
849 /// [unprocessed______|____________available___________________]
850 /// 0 new_filled capacity
851 /// ```
852 ///
853 /// After refill:
854 /// ```text
855 /// [unprocessed______|new_data__________________________|____]
856 /// 0 old_filled new_filled
857 /// ```
858 ///
859 /// # Performance
860 ///
861 /// - **Shift cost**: O(filled - cursor) via `copy_within()` (memmove)
862 /// - **Read cost**: Depends on underlying reader (syscall overhead)
863 /// - **Amortization**: Shifts happen infrequently (once per chunk consumed)
864 ///
865 /// # Errors
866 ///
867 /// Returns any I/O errors encountered during reading. Common errors:
868 ///
869 /// - `ErrorKind::UnexpectedEof`: Reader closed unexpectedly (shouldn't happen
870 /// for valid streams; `read()` returning 0 is the proper EOF signal)
871 /// - `ErrorKind::Interrupted`: Signal interrupted read (automatically retried
872 /// by the read loop in this implementation)
873 /// - Other errors: Propagated from underlying reader (e.g., permission denied,
874 /// device errors, network timeouts)
875 fn refill(&mut self) -> io::Result<()> {
876 // Only shift when cursor has consumed more than half the buffer.
877 // This reduces memmove frequency by ~50% while still ensuring enough
878 // space for the next read.
879 if self.cursor > self.buffer.len() / 2 {
880 self.buffer.copy_within(self.cursor..self.filled, 0);
881 self.filled -= self.cursor;
882 self.cursor = 0;
883 }
884
885 while self.filled < self.buffer.len() && !self.eof {
886 let n = self.reader.read(&mut self.buffer[self.filled..])?;
887 if n == 0 {
888 self.eof = true;
889 } else {
890 self.filled += n;
891 // If we have enough data to potentially find a chunk, we can stop reading for now
892 if self.filled >= self.max_size {
893 break;
894 }
895 }
896 }
897 Ok(())
898 }
899}
900
901impl<R: Read> Iterator for StreamChunker<R> {
902 type Item = io::Result<Vec<u8>>;
903
904 /// Yields the next content-defined chunk.
905 ///
906 /// This is the core iterator method that drives the chunking process. Each call
907 /// performs the following operations:
908 ///
909 /// # Algorithm
910 ///
911 /// 1. **Check for data**: If buffer is empty and EOF not reached, refill
912 /// 2. **Handle EOF**: If no data available after refill, return `None`
913 /// 3. **Determine available window**: Compute bytes available for chunking
914 /// 4. **Apply FastCDC**:
915 /// - If available < `min_size`: Return all available bytes (last chunk)
916 /// - Else: Run FastCDC on window `[cursor..min(cursor+max_size, filled)]`
917 /// 5. **Extract chunk**: Copy chunk bytes to new `Vec<u8>`, advance cursor
918 /// 6. **Return**: Yield `Some(Ok(chunk))`
919 ///
920 /// # FastCDC Integration
921 ///
922 /// The method delegates to the `fastcdc` crate's `FastCDC::new()` constructor,
923 /// which implements the normalized chunking algorithm. The returned chunk length
924 /// respects all size constraints:
925 ///
926 /// - No chunk < `min_size` (except final chunk)
927 /// - No chunk > `max_size` (strictly enforced)
928 /// - Average chunk size ≈ `avg_size = 2^f`
929 ///
930 /// # Chunk Length Determination
931 ///
932 /// The algorithm chooses chunk length as follows:
933 ///
934 /// | Condition | Action |
935 /// |-----------|--------|
936 /// | `available < min_size` | Return all available bytes |
937 /// | FastCDC finds cut point | Use FastCDC-determined length |
938 /// | `available >= max_size` && no cut point | Force cut at `max_size` |
939 /// | EOF reached && no cut point | Return all remaining bytes |
940 /// | Buffer full && no cut point | Force cut at `max_size` |
941 /// | Other | Return all available bytes |
942 ///
943 /// # Returns
944 ///
945 /// - `Some(Ok(chunk))` - Next chunk successfully extracted and copied
946 /// - `Some(Err(e))` - I/O error occurred during refill
947 /// - `None` - Stream exhausted (no more data available)
948 ///
949 /// # Errors
950 ///
951 /// Returns `Some(Err(e))` if the underlying reader encounters an I/O error
952 /// during refill. After an error, the iterator should be considered invalid
953 /// (further calls may panic or return inconsistent results).
954 ///
955 /// Common error causes:
956 /// - File read errors (permissions, disk errors)
957 /// - Network errors (timeouts, connection reset)
958 /// - Interrupted system calls (usually auto-retried)
959 ///
960 /// # Chunk Size Guarantees
961 ///
962 /// The returned chunks satisfy these constraints:
963 ///
964 /// - **Minimum**: `≥ min_size` (except possibly the last chunk)
965 /// - **Maximum**: `≤ max_size` (always enforced, never violated)
966 /// - **Average**: `≈ avg_size = 2^f` (statistical expectation)
967 /// - **Last chunk**: May be smaller than `min_size` if EOF reached
968 ///
969 /// # Memory Allocation
970 ///
971 /// Each chunk is copied to a new `Vec<u8>`. For zero-copy iteration within
972 /// the buffer lifetime, consider modifying the API to return slices with
973 /// appropriate lifetimes (future enhancement).
974 ///
975 /// # Examples
976 ///
977 /// ```no_run
978 /// use hexz_core::algo::dedup::cdc::StreamChunker;
979 /// use hexz_core::algo::dedup::dcam::DedupeParams;
980 /// use std::fs::File;
981 ///
982 /// # fn main() -> std::io::Result<()> {
983 /// let file = File::open("data.bin")?;
984 /// let mut chunker = StreamChunker::new(file, DedupeParams::default());
985 ///
986 /// // First chunk
987 /// if let Some(Ok(chunk)) = chunker.next() {
988 /// println!("First chunk: {} bytes", chunk.len());
989 /// }
990 ///
991 /// // Iterate remaining chunks
992 /// for chunk_result in chunker {
993 /// let chunk = chunk_result?;
994 /// // Process chunk...
995 /// }
996 /// # Ok(())
997 /// # }
998 /// ```
999 ///
1000 /// # Performance Characteristics
1001 ///
1002 /// - **Amortized time**: O(chunk_size) per chunk (dominated by copying)
1003 /// - **Space**: O(chunk_size) allocation per chunk yielded
1004 /// - **Throughput**: ~500 MB/s on modern CPUs (bottlenecked by Gear hash)
1005 fn next(&mut self) -> Option<Self::Item> {
1006 if self.cursor >= self.filled {
1007 if self.eof {
1008 return None;
1009 }
1010 if let Err(e) = self.refill() {
1011 return Some(Err(e));
1012 }
1013 if self.filled == 0 {
1014 return None;
1015 }
1016 }
1017
1018 let available = self.filled - self.cursor;
1019
1020 // Refill if we don't have enough data for a full chunk and more data is available
1021 if available < self.max_size && !self.eof {
1022 if let Err(e) = self.refill() {
1023 return Some(Err(e));
1024 }
1025 }
1026 let available = self.filled - self.cursor;
1027
1028 let chunk_len = if available < self.min_size {
1029 available
1030 } else {
1031 // Run FastCDC on the available window
1032 let data = &self.buffer[self.cursor..self.filled];
1033 let search_limit = std::cmp::min(data.len(), self.max_size);
1034
1035 let chunker = fastcdc::v2020::FastCDC::new(
1036 &data[..search_limit],
1037 self.min_size as u32,
1038 self.avg_size as u32,
1039 self.max_size as u32,
1040 );
1041
1042 if let Some(chunk) = chunker.into_iter().next() {
1043 chunk.length
1044 } else {
1045 // No cut point found
1046 if available >= self.max_size {
1047 self.max_size
1048 } else if self.eof {
1049 available
1050 } else if self.filled == self.buffer.len() {
1051 self.max_size
1052 } else {
1053 available
1054 }
1055 }
1056 };
1057
1058 let start = self.cursor;
1059 self.cursor += chunk_len;
1060 Some(Ok(self.buffer[start..start + chunk_len].to_vec()))
1061 }
1062}
1063
1064/// Performs a single-pass deduplication analysis on a data stream.
1065///
1066/// This function applies FastCDC chunking to a data stream and tracks unique chunks
1067/// via hash-based deduplication. It is designed as a lightweight analysis pass for
1068/// the DCAM model to estimate deduplication potential before committing to full
1069/// snapshot packing.
1070///
1071/// # Algorithm
1072///
1073/// 1. **Chunk the input** using `StreamChunker` with the provided FastCDC parameters
1074/// 2. **Hash each chunk** using CRC32 (fast, non-cryptographic)
1075/// 3. **Track uniqueness** via a `HashSet<u64>` of seen hashes
1076/// 4. **Accumulate statistics**:
1077/// - Total chunk count (all chunks, including duplicates)
1078/// - Unique chunk count (distinct hashes)
1079/// - Unique bytes (sum of unique chunk sizes)
1080///
1081/// # Parameters
1082///
1083/// - `reader`: Data source to analyze. Can be any `Read` implementation:
1084/// - `File` for disk images or backups
1085/// - `Cursor<Vec<u8>>` for in-memory data
1086/// - Network streams, pipes, or other I/O sources
1087///
1088/// - `params`: FastCDC parameters controlling chunking behavior:
1089/// - `params.f`: Fingerprint bits (average chunk size = 2^f bytes)
1090/// - `params.m`: Minimum chunk size in bytes
1091/// - `params.z`: Maximum chunk size in bytes
1092/// - Other fields (`w`, `v`) are used by DCAM but not by the chunker
1093///
1094/// # Returns
1095///
1096/// `Ok(CdcStats)` containing:
1097/// - `chunk_count`: Total chunks processed (including duplicates)
1098/// - `unique_chunk_count`: Number of distinct chunks (unique hashes)
1099/// - `unique_bytes`: Total bytes in unique chunks (storage requirement estimate)
1100/// - `total_bytes`: Currently unused (always 0) for performance reasons
1101///
1102/// # Errors
1103///
1104/// Returns `Err` if the underlying reader encounters an I/O error during reading.
1105/// Common error cases:
1106///
1107/// - `std::io::ErrorKind::NotFound`: File doesn't exist (if using `File`)
1108/// - `std::io::ErrorKind::PermissionDenied`: Insufficient permissions to read
1109/// - `std::io::ErrorKind::UnexpectedEof`: Reader closed unexpectedly
1110/// - Network-related errors (timeouts, connection resets) for network streams
1111/// - Disk errors (bad sectors, hardware failures) for file-based readers
1112///
1113/// After an error, the returned statistics (if any) are invalid and should be
1114/// discarded. The function does not attempt recovery or partial results.
1115///
1116/// # Performance
1117///
1118/// ## Time Complexity
1119///
1120/// - **Single-threaded throughput**: ~500 MB/s on modern CPUs (2020+)
1121/// - **Bottleneck**: Gear hash computation (CPU-bound, not I/O-bound)
1122/// - **Scaling**: Linear in input size, O(N) where N = total bytes
1123///
1124/// ## Space Complexity
1125///
1126/// - **Hash set**: O(unique chunks) × 8 bytes per hash ≈ O(N / avg_chunk_size) × 8
1127/// - **Example**: 1GB input, 16KB avg chunks → ~65K unique chunks → ~520 KB hash set
1128/// - **Streaming buffer**: Fixed at `2 * params.z` (typically ~128 KB)
1129///
1130/// ## Memory Optimization
1131///
1132/// For extremely large datasets where hash set size becomes prohibitive, consider:
1133/// - Using a probabilistic data structure (Bloom filter, HyperLogLog)
1134/// - Sampling (analyze every Nth chunk instead of all chunks)
1135/// - External deduplication index (disk-based hash storage)
1136///
1137/// # Hash Collision Probability
1138///
1139/// This function uses the first 8 bytes of BLAKE3 as a 64-bit hash.
1140/// The probability of collision is governed by the birthday paradox:
1141///
1142/// ```text
1143/// P(collision) ≈ n² / (2 × 2^64) where n = unique_chunk_count
1144/// ```
1145///
1146/// For typical workloads:
1147/// - **1M unique chunks**: P(collision) ≈ 2.7e-8 — negligible
1148/// - **100M unique chunks**: P(collision) ≈ 2.7e-4 (0.027%) — negligible
1149/// - **4 billion unique chunks**: P(collision) ≈ 50% (birthday bound)
1150///
1151/// # Examples
1152///
1153/// ## Basic Usage
1154///
1155/// ```no_run
1156/// use hexz_core::algo::dedup::cdc::analyze_stream;
1157/// use hexz_core::algo::dedup::dcam::DedupeParams;
1158/// use std::fs::File;
1159///
1160/// # fn main() -> std::io::Result<()> {
1161/// let file = File::open("dataset.bin")?;
1162/// let params = DedupeParams::default();
1163/// let stats = analyze_stream(file, ¶ms)?;
1164///
1165/// println!("Total chunks: {}", stats.chunk_count);
1166/// println!("Unique chunks: {}", stats.unique_chunk_count);
1167/// println!("Unique bytes: {} MB", stats.unique_bytes / 1_000_000);
1168///
1169/// let dedup_factor = stats.chunk_count as f64 / stats.unique_chunk_count as f64;
1170/// println!("Deduplication factor: {:.2}x", dedup_factor);
1171/// # Ok(())
1172/// # }
1173/// ```
1174///
1175/// ## Estimating Storage Savings
1176///
1177/// ```no_run
1178/// use hexz_core::algo::dedup::cdc::analyze_stream;
1179/// use hexz_core::algo::dedup::dcam::DedupeParams;
1180/// use std::fs::File;
1181///
1182/// # fn main() -> std::io::Result<()> {
1183/// let file = File::open("snapshot.raw")?;
1184/// let params = DedupeParams::default();
1185/// let stats = analyze_stream(file, ¶ms)?;
1186///
1187/// // Estimate total bytes (not tracked directly for performance)
1188/// let estimated_total = stats.unique_bytes * stats.chunk_count / stats.unique_chunk_count;
1189/// let savings_ratio = 1.0 - (stats.unique_bytes as f64 / estimated_total as f64);
1190///
1191/// println!("Estimated storage savings: {:.1}%", savings_ratio * 100.0);
1192/// # Ok(())
1193/// # }
1194/// ```
1195///
1196/// ## Comparing Parameter Sets
1197///
1198/// ```no_run
1199/// use hexz_core::algo::dedup::cdc::analyze_stream;
1200/// use hexz_core::algo::dedup::dcam::DedupeParams;
1201/// use std::io::Cursor;
1202///
1203/// # fn main() -> std::io::Result<()> {
1204/// let data = vec![0u8; 10_000_000]; // 10 MB test data
1205///
1206/// // Analyze with small chunks
1207/// let mut params_small = DedupeParams::default();
1208/// params_small.f = 13; // 8KB average
1209/// let stats_small = analyze_stream(Cursor::new(&data), ¶ms_small)?;
1210///
1211/// // Analyze with large chunks
1212/// let mut params_large = DedupeParams::default();
1213/// params_large.f = 15; // 32KB average
1214/// let stats_large = analyze_stream(Cursor::new(&data), ¶ms_large)?;
1215///
1216/// println!("Small chunks: {} total, {} unique",
1217/// stats_small.chunk_count, stats_small.unique_chunk_count);
1218/// println!("Large chunks: {} total, {} unique",
1219/// stats_large.chunk_count, stats_large.unique_chunk_count);
1220/// # Ok(())
1221/// # }
1222/// ```
1223///
1224/// # Use Cases
1225///
1226/// - **DCAM Model Input**: Feeds statistics into analytical formulas to predict
1227/// optimal FastCDC parameters for a given dataset
1228/// - **Capacity Planning**: Estimates storage requirements before provisioning
1229/// deduplication infrastructure
1230/// - **Parameter Tuning**: Compares deduplication effectiveness across different
1231/// chunk size configurations
1232/// - **Workload Characterization**: Analyzes redundancy patterns in datasets
1233///
1234/// # Relationship to `StreamChunker`
1235///
1236/// This function is a convenience wrapper around `StreamChunker` that adds
1237/// deduplication tracking via a hash set. For actual snapshot packing, use
1238/// `StreamChunker` directly and handle deduplication in the compression/storage
1239/// pipeline. The analysis function is intended as a lightweight dry-run only.
1240pub fn analyze_stream<R: Read>(reader: R, params: &DedupeParams) -> io::Result<CdcStats> {
1241 let mut unique_bytes = 0;
1242 let mut chunk_count = 0;
1243 let mut unique_chunk_count = 0;
1244 let mut seen_chunks: HashSet<u64> = HashSet::new();
1245
1246 // Use the streaming chunker for analysis too, to ensure consistency
1247 let chunker = StreamChunker::new(reader, *params);
1248
1249 for chunk_res in chunker {
1250 let chunk = chunk_res?;
1251 let len = chunk.len() as u64;
1252 // Use first 8 bytes of BLAKE3 digest as a 64-bit hash.
1253 // BLAKE3 at 16KB chunks is >3 GB/s — only ~2-3x slower than CRC32 —
1254 // but provides 2^64 hash space (birthday bound at ~4 billion chunks)
1255 // vs CRC32's 2^32 (birthday bound at ~65K chunks).
1256 let digest = *blake3::hash(&chunk).as_bytes();
1257 let hash = u64::from_le_bytes([
1258 digest[0], digest[1], digest[2], digest[3], digest[4], digest[5], digest[6], digest[7],
1259 ]);
1260
1261 chunk_count += 1;
1262 if seen_chunks.insert(hash) {
1263 unique_bytes += len;
1264 unique_chunk_count += 1;
1265 }
1266 }
1267
1268 Ok(CdcStats {
1269 total_bytes: 0,
1270 unique_bytes,
1271 chunk_count,
1272 unique_chunk_count,
1273 })
1274}