Skip to main content

hexz_core/algo/dedup/
cdc.rs

1//! Content-Defined Chunking (CDC) for deduplication analysis.
2//!
3//! This module implements the FastCDC (Fast Content-Defined Chunking) algorithm,
4//! a state-of-the-art variable-length chunking algorithm that splits data streams
5//! into content-defined chunks for efficient deduplication. Unlike fixed-size
6//! chunking, CDC adapts chunk boundaries to data content, enabling detection of
7//! duplicate blocks even when shifted or surrounded by different data.
8//!
9//! # Why Content-Defined Chunking?
10//!
11//! Traditional fixed-size chunking fails to handle the "boundary shift problem":
12//! inserting or deleting even a single byte causes all subsequent chunks to shift,
13//! preventing duplicate detection. CDC solves this by:
14//!
15//! - **Content-based boundaries**: Chunk cuts occur at data-dependent positions
16//! - **Shift resilience**: Boundaries remain stable despite insertions/deletions
17//! - **Variable chunk sizes**: Adapts to data structure for optimal deduplication
18//!
19//! Example: Inserting 100 bytes at the start of a file with fixed 4KB chunks
20//! invalidates every chunk. With CDC, only the chunks containing the insertion
21//! are affected; chunks after the insertion point remain identical and deduplicatable.
22//!
23//! # FastCDC Algorithm Details
24//!
25//! FastCDC improves upon earlier CDC algorithms (Rabin fingerprinting, basic CDC)
26//! by using a normalized chunking approach that reduces chunk size variance while
27//! maintaining content-sensitivity.
28//!
29//! ## Rolling Hash Mechanism
30//!
31//! 1. **Sliding Window**: Maintains a window of `w` bytes (typically 48 bytes)
32//! 2. **Hash Computation**: Computes a rolling hash (Gear hash) over the window
33//! 3. **Cut-Point Detection**: When `hash & mask == 0`, mark a chunk boundary
34//! 4. **Normalization**: Uses different masks in different chunk regions to
35//!    achieve more uniform chunk size distribution
36//!
37//! ## Gear Hash Function
38//!
39//! FastCDC uses the Gear hash, a simple rolling hash that provides:
40//! - **O(1) update time** per byte (critical for performance)
41//! - **Good avalanche properties** (small changes → large hash changes)
42//! - **CPU-friendly operations** (no expensive modulo or division)
43//!
44//! The Gear hash works by:
45//! 1. Maintaining a precomputed random 256-entry lookup table
46//! 2. For each byte `b`, XOR the current hash with `table[b]`
47//! 3. Shift the hash left or right (depending on variant)
48//! 4. Check if the hash satisfies the cut-point condition
49//!
50//! **Pseudocode:**
51//! ```text
52//! hash = 0
53//! for each byte b in window:
54//!     hash = (hash << 1) ^ GEAR_TABLE[b]
55//!     if (hash & mask) == 0:
56//!         mark_chunk_boundary()
57//! ```
58//!
59//! The table lookup randomizes each byte's contribution, ensuring that
60//! different byte patterns produce uncorrelated hashes. This is critical
61//! for content-defined chunking to work correctly across diverse data types.
62//!
63//! ## Chunk Size Control
64//!
65//! Three parameters control chunk size distribution:
66//!
67//! - **`m` (min)**: Minimum chunk size (prevents tiny chunks, reduces overhead)
68//! - **`f` (bits)**: Fingerprint bits; average chunk size is `2^f` bytes
69//! - **`z` (max)**: Maximum chunk size (bounds worst-case memory usage)
70//!
71//! The algorithm enforces these constraints strictly:
72//! - Never cuts before `m` bytes
73//! - Always cuts at `z` bytes regardless of hash value
74//! - Statistically averages to `2^f` byte chunks
75//!
76//! ## Normalization Zones
77//!
78//! FastCDC divides each chunk into regions with different cut-point masks
79//! to achieve a more uniform chunk size distribution than basic CDC:
80//!
81//! - **[0, m)**: No cutting allowed (enforces minimum size)
82//! - **[m, avg)**: Relaxed mask (fewer bits required → higher cut probability)
83//! - **[avg, z)**: Standard mask (normal cut-point detection)
84//! - **[z, ∞)**: Force cut at maximum (enforces size bound)
85//!
86//! ### Mask Selection
87//!
88//! The masks are computed based on the number of trailing zero bits required
89//! in the hash value:
90//!
91//! - **Standard mask** (`[avg, z)`): Requires `f` trailing zero bits
92//!   - Example: `f=14` → mask = `0x3FFF` → `hash & 0x3FFF == 0`
93//!   - Cut probability: `1 / 2^14 ≈ 0.006%` per byte
94//!   - Expected spacing: `2^14 = 16,384` bytes
95//!
96//! - **Relaxed mask** (`[m, avg)`): Requires `f-1` or `f-2` trailing zeros
97//!   - Example: `f=14` → relaxed uses `f-2=12` → mask = `0xFFF`
98//!   - Cut probability: `1 / 2^12 ≈ 0.024%` per byte (4× higher)
99//!   - Encourages cuts closer to average, reducing long tails
100//!
101//! ### Effect on Distribution
102//!
103//! This normalization reduces the coefficient of variation (std dev / mean) in
104//! chunk sizes from ~1.0 (pure exponential) to ~0.6 (normalized exponential):
105//!
106//! | Distribution Type | Mean | Std Dev | CV |
107//! |-------------------|------|---------|-----|
108//! | Fixed-size | 16 KB | 0 KB | 0.0 |
109//! | Basic CDC | 16 KB | 16 KB | 1.0 |
110//! | FastCDC (normalized) | 16 KB | 10 KB | 0.6 |
111//!
112//! **Benefits:**
113//! - More consistent chunk sizes → better index density
114//! - Fewer outliers (very small/large chunks) → lower metadata overhead
115//! - Maintains content-sensitivity for deduplication effectiveness
116//!
117//! # Performance Characteristics
118//!
119//! ## Throughput
120//!
121//! - **Single-threaded**: 2.7 GB/s on modern CPUs (i7-14700K, validated via `cargo bench --bench cdc_chunking`)
122//! - **Bottleneck**: Rolling hash computation (CPU-bound, not I/O-bound)
123//! - **Comparison**: 4-5x faster than Rabin fingerprinting, 2x faster than basic CDC
124//! - **Overhead**: ~10× slower than fixed-size chunking (26 GB/s vs 2.7 GB/s), but acceptable for dedup benefits
125//!
126//! ## Parallelization
127//!
128//! FastCDC is fundamentally sequential (each byte depends on previous hash state),
129//! but can be parallelized across independent data streams:
130//!
131//! - **Multiple files**: Chunk each file in a separate thread/task
132//! - **Split input**: Pre-split large files at fixed boundaries (e.g., every 1GB)
133//!   and chunk each segment independently (loses some cross-boundary dedup)
134//! - **Pipeline parallelism**: Chunk in one thread, compress in another, write
135//!   in a third (producer-consumer pattern)
136//!
137//! **Note**: Parallel chunking of a single stream is possible using recursive
138//! chunking or sketch-based techniques, but adds significant complexity and is
139//! not implemented in this module
140//!
141//! ## Memory Usage
142//!
143//! - **Per-stream buffer**: `2 * z` bytes (default: ~128 KB for z=64KB)
144//! - **Analysis hash set**: ~8 bytes per unique chunk (amortized)
145//! - **Streaming overhead**: Negligible (no hash set)
146//!
147//! ## Chunk Size Distribution
148//!
149//! For typical data with `f=14` (16KB average, default):
150//! - **Mean**: ~16 KB (exactly 2^f)
151//! - **Median**: ~11 KB (skewed by exponential tail)
152//! - **Standard deviation**: ~10 KB (coefficient of variation ~0.6)
153//! - **Range**: [m, z] (typically [2KB, 64KB])
154//!
155//! # Usage Patterns
156//!
157//! This module provides two primary APIs:
158//!
159//! ## 1. Analysis Mode: `analyze_stream()`
160//!
161//! Performs a dry-run to estimate deduplication potential without storing chunks.
162//! Used by the DCAM model to optimize parameters before actual packing.
163//!
164//! - **Input**: Reader + parameters
165//! - **Output**: `CdcStats` (chunk counts, dedup ratio)
166//! - **Memory**: O(unique chunks) for hash set
167//! - **Use case**: Parameter optimization, capacity planning
168//!
169//! ## 2. Streaming Mode: `StreamChunker`
170//!
171//! Iterator that yields chunks on-demand during snapshot creation. Integrates
172//! with the packing pipeline for compression and encryption.
173//!
174//! - **Input**: Reader + parameters
175//! - **Output**: Iterator of `Vec<u8>` chunks
176//! - **Memory**: O(1) - fixed buffer size
177//! - **Use case**: Actual snapshot packing
178//!
179//! # Examples
180//!
181//! ## Basic Analysis
182//!
183//! ```no_run
184//! use hexz_core::algo::dedup::cdc::analyze_stream;
185//! use hexz_core::algo::dedup::dcam::DedupeParams;
186//! use std::fs::File;
187//!
188//! # fn main() -> std::io::Result<()> {
189//! let file = File::open("disk.raw")?;
190//! let params = DedupeParams::default(); // f=14, m=2KB, z=64KB
191//! let stats = analyze_stream(file, &params)?;
192//!
193//! println!("Total chunks: {}", stats.chunk_count);
194//! println!("Unique chunks: {}", stats.unique_chunk_count);
195//! println!("Dedup ratio: {:.2}%",
196//!          (1.0 - stats.unique_bytes as f64 /
197//!           (stats.unique_bytes as f64 * stats.chunk_count as f64 /
198//!            stats.unique_chunk_count as f64)) * 100.0);
199//! # Ok(())
200//! # }
201//! ```
202//!
203//! ## Streaming Chunks
204//!
205//! ```no_run
206//! use hexz_core::algo::dedup::cdc::StreamChunker;
207//! use hexz_core::algo::dedup::dcam::DedupeParams;
208//! use std::fs::File;
209//!
210//! # fn main() -> std::io::Result<()> {
211//! let file = File::open("memory.raw")?;
212//! let params = DedupeParams::default();
213//! let chunker = StreamChunker::new(file, params);
214//!
215//! for (i, chunk_result) in chunker.enumerate() {
216//!     let chunk = chunk_result?;
217//!     println!("Chunk {}: {} bytes", i, chunk.len());
218//!     // Compress, encrypt, hash, store...
219//! }
220//! # Ok(())
221//! # }
222//! ```
223//!
224//! ## Parameter Comparison
225//!
226//! ```no_run
227//! use hexz_core::algo::dedup::cdc::analyze_stream;
228//! use hexz_core::algo::dedup::dcam::DedupeParams;
229//! use std::fs::File;
230//!
231//! # fn main() -> std::io::Result<()> {
232//! let data = std::fs::read("dataset.bin")?;
233//!
234//! // Small chunks (8KB average) - fine-grained dedup
235//! let mut params_small = DedupeParams::default();
236//! params_small.f = 13; // 2^13 = 8KB
237//! let stats_small = analyze_stream(&data[..], &params_small)?;
238//!
239//! // Large chunks (32KB average) - coarse dedup, less overhead
240//! let mut params_large = DedupeParams::default();
241//! params_large.f = 15; // 2^15 = 32KB
242//! let stats_large = analyze_stream(&data[..], &params_large)?;
243//!
244//! println!("Small chunks: {} unique / {} total",
245//!          stats_small.unique_chunk_count, stats_small.chunk_count);
246//! println!("Large chunks: {} unique / {} total",
247//!          stats_large.unique_chunk_count, stats_large.chunk_count);
248//! # Ok(())
249//! # }
250//! ```
251//!
252//! # Integration with Hexz
253//!
254//! CDC is integrated into the snapshot packing pipeline with multiple stages:
255//!
256//! ## Analysis Phase
257//!
258//! 1. **DCAM Analysis**: `analyze_stream()` performs dry-run chunking to estimate
259//!    deduplication potential across different parameter configurations
260//! 2. **Parameter Selection**: DCAM formulas compute optimal `f` value based on
261//!    observed change rates and metadata overhead tradeoffs
262//! 3. **Validation**: Verify parameters are within hardware constraints (chunk
263//!    sizes fit in memory, hash set doesn't exceed available RAM)
264//!
265//! ## Packing Phase
266//!
267//! 4. **Streaming Chunking**: `StreamChunker` feeds variable-sized chunks to the
268//!    compression pipeline (typically zstd or lz4)
269//! 5. **Deduplication Tracking**: Compute cryptographic hash (SHA-256 or BLAKE3)
270//!    of each chunk and check against existing snapshot index
271//! 6. **Storage**: Write unique chunks to pack files; record chunk metadata
272//!    (offset, compressed size, hash) in snapshot index
273//!
274//! ## Unpacking Phase
275//!
276//! 7. **Reconstruction**: Read snapshot index to determine chunk sequence
277//! 8. **Decompression**: Fetch and decompress chunks from pack files
278//! 9. **Assembly**: Concatenate chunks in original order to reconstruct snapshot
279//!
280//! ## Error Handling
281//!
282//! - **I/O Errors**: Propagated immediately, abort operation
283//! - **Hash Collisions**: Detected during unpacking (content verification fails),
284//!   triggers integrity error and potential re-read or fallback
285//! - **Corrupted Chunks**: Compression errors detected during decompression,
286//!   logged and may trigger snapshot re-creation if critical
287//!
288//! ## Edge Cases
289//!
290//! - **Empty input**: Returns zero chunks (not an error)
291//! - **Tiny files** (< min_size): Single chunk containing entire file
292//! - **Huge files** (> max_size × chunk_count): Automatic chunking prevents
293//!   unbounded memory growth
294//! - **Adversarial input**: Maximum chunk size enforced regardless of hash values
295//!
296//! # Limitations and Considerations
297//!
298//! ## Hash Collision Handling
299//!
300//! The `analyze_stream()` function uses the first 8 bytes of BLAKE3 as a 64-bit
301//! hash. This provides a birthday bound at ~2^32 ≈ 4 billion unique chunks,
302//! which is more than sufficient for analysis workloads:
303//!
304//! - **Negligible collision risk**: Even at 100M unique chunks, P(collision) < 0.1%
305//! - **Not acceptable for storage**: Actual packing uses the full 256-bit BLAKE3
306//!   hash via the dedup hash table to avoid data corruption
307//!
308//! ## Small File Overhead
309//!
310//! For files smaller than `min_size`, CDC produces a single chunk containing
311//! the entire file. This is optimal behavior but means:
312//!
313//! - No deduplication benefit for very small files
314//! - Metadata overhead (hash + pointer) is proportionally higher
315//! - Consider alternative strategies for small file workloads (e.g., tar packing)
316//!
317//! ## Boundary Shift Resilience
318//!
319//! While CDC is resilient to insertions/deletions within data, it is NOT
320//! resilient to:
321//!
322//! - **Encryption**: Changes entire content, prevents all deduplication
323//! - **Compression**: Similar effect to encryption for CDC purposes
324//! - **Byte-level reordering**: Shuffles content, breaks chunk boundaries
325//!
326//! **Recommendation**: Apply CDC before compression/encryption, not after.
327//!
328//! ## Adversarial Input
329//!
330//! An attacker could craft input that produces:
331//!
332//! - **All max-size chunks**: Content designed to never trigger hash cut-points
333//!   (e.g., all zeros, which may or may not trigger depending on hash function)
334//! - **All min-size chunks**: Content that triggers cut-points too frequently
335//! - **Hash flooding**: Attempt to exhaust hash set memory in `analyze_stream()`
336//!
337//! The `max_size` enforcement prevents unbounded chunk growth, but adversarial
338//! input can still degrade deduplication effectiveness. For untrusted input,
339//! consider additional validation or rate limiting.
340//!
341//! ## Determinism Caveats
342//!
343//! Chunking is deterministic given:
344//! - Same input bytes (bit-identical)
345//! - Same parameters (`f`, `m`, `z`)
346//! - Same fastcdc library version
347//!
348//! However, updating the fastcdc library may change internal hash tables or
349//! algorithms, causing chunk boundaries to shift. This breaks cross-version
350//! deduplication. **Mitigation**: Include CDC version in snapshot metadata.
351//!
352//! # References
353//!
354//! - Xia, W. et al. "FastCDC: a Fast and Efficient Content-Defined Chunking
355//!   Approach for Data Deduplication." USENIX ATC 2016.
356//!   [https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia](https://www.usenix.org/conference/atc16/technical-sessions/presentation/xia)
357//! - Muthitacharoen, A. et al. "A Low-bandwidth Network File System." SOSP 2001.
358//!   (Original Rabin fingerprinting CDC)
359//! - Randall et al. "Optimizing Deduplication Parameters via a Change-Estimation
360//!   Analytical Model." 2025. (DCAM integration)
361
362use crate::algo::dedup::dcam::DedupeParams;
363use std::collections::HashSet;
364use std::io::{self, Read};
365
366/// Statistics from a CDC deduplication analysis run.
367///
368/// This structure captures metrics from a content-defined chunking analysis pass,
369/// providing insights into chunk distribution, deduplication effectiveness, and
370/// potential storage savings.
371///
372/// # Use Cases
373///
374/// - **DCAM Model Input**: Feeds into analytical formulas to predict optimal parameters
375/// - **Capacity Planning**: Estimates actual storage requirements after deduplication
376/// - **Performance Analysis**: Evaluates chunk size distribution and variance
377/// - **Comparison Studies**: Benchmarks different parameter configurations
378///
379/// # Invariants
380///
381/// The following relationships always hold for valid statistics:
382///
383/// - `unique_chunk_count <= chunk_count` (can't have more unique than total)
384/// - `unique_bytes <= chunk_count * max_chunk_size` (bounded by chunk constraints)
385/// - `unique_bytes >= unique_chunk_count * min_chunk_size` (minimum size bound)
386/// - `total_bytes` may be 0 (not tracked in streaming mode)
387///
388/// # Calculating Deduplication Metrics
389///
390/// ## Deduplication Ratio
391///
392/// The deduplication ratio represents what fraction of data is unique:
393///
394/// ```text
395/// dedup_ratio = unique_bytes / total_bytes_before_dedup
396/// ```
397///
398/// Where `total_bytes_before_dedup` (the original data size) can be estimated from:
399/// ```text
400/// total_bytes_before_dedup ≈ unique_bytes × (chunk_count / unique_chunk_count)
401/// ```
402///
403/// **Interpretation:**
404/// - Ratio of **0.5** means 50% of data is unique → **50% storage savings**
405/// - Ratio of **1.0** means 100% unique (no duplicates) → **0% savings**
406/// - Ratio of **0.3** means 30% unique → **70% savings**
407///
408/// ## Deduplication Factor
409///
410/// An alternative metric is the deduplication factor (multiplicative savings):
411///
412/// ```text
413/// dedup_factor = chunk_count / unique_chunk_count
414/// ```
415///
416/// **Interpretation:**
417/// - Factor of **1.0x** means no deduplication occurred
418/// - Factor of **2.0x** means data compressed by half (50% savings)
419/// - Factor of **10.0x** means 90% of chunks were duplicates
420///
421/// ## Average Chunk Size
422///
423/// The realized average chunk size (may differ from `2^f` for small datasets):
424///
425/// ```text
426/// avg_chunk_size = unique_bytes / unique_chunk_count
427/// ```
428///
429/// # Examples
430///
431/// ```rust
432/// # use hexz_core::algo::dedup::cdc::CdcStats;
433/// let stats = CdcStats {
434///     total_bytes: 0, // Not tracked (estimated below)
435///     unique_bytes: 50_000_000,
436///     chunk_count: 10_000,
437///     unique_chunk_count: 3_000,
438/// };
439///
440/// // Estimate original size
441/// let estimated_original = stats.unique_bytes * stats.chunk_count / stats.unique_chunk_count;
442/// assert_eq!(estimated_original, 166_666_666); // ~167 MB original
443///
444/// // Calculate deduplication ratio (fraction unique)
445/// let dedup_ratio = stats.unique_bytes as f64 / estimated_original as f64;
446/// assert!((dedup_ratio - 0.3).abs() < 0.01); // ~30% unique
447///
448/// // Calculate storage savings (percent eliminated)
449/// let savings_percent = (1.0 - dedup_ratio) * 100.0;
450/// assert!((savings_percent - 70.0).abs() < 1.0); // ~70% savings
451///
452/// // Calculate deduplication factor (compression ratio)
453/// let dedup_factor = stats.chunk_count as f64 / stats.unique_chunk_count as f64;
454/// assert!((dedup_factor - 3.33).abs() < 0.01); // ~3.33x compression
455///
456/// // Average chunk size
457/// let avg_chunk_size = stats.unique_bytes / stats.unique_chunk_count;
458/// println!("Average unique chunk size: {} bytes", avg_chunk_size);
459/// ```
460#[derive(Debug)]
461pub struct CdcStats {
462    /// Total bytes processed from the input stream.
463    ///
464    /// **Note**: This field is currently not tracked during streaming analysis
465    /// (always set to 0) to avoid memory overhead. It may be populated in future
466    /// versions if needed for enhanced metrics.
467    ///
468    /// To estimate total bytes, use:
469    /// ```text
470    /// total_bytes ≈ unique_bytes * (chunk_count / unique_chunk_count)
471    /// ```
472    pub total_bytes: u64,
473
474    /// Number of unique bytes after deduplication.
475    ///
476    /// This is the sum of sizes of all unique chunks (first occurrence only).
477    /// Represents the actual storage space required if deduplication is applied.
478    ///
479    /// # Interpretation
480    ///
481    /// - Lower values indicate higher redundancy in the dataset
482    /// - Equals total data size if no duplicates exist
483    /// - Bounded by `unique_chunk_count * min_chunk_size` and
484    ///   `unique_chunk_count * max_chunk_size`
485    pub unique_bytes: u64,
486
487    /// Total number of chunks identified by FastCDC.
488    ///
489    /// Includes both unique and duplicate chunks. This represents how many chunks
490    /// would be created if the data were processed through the chunking pipeline.
491    ///
492    /// # Expected Values
493    ///
494    /// For a dataset of size `N` bytes with average chunk size `2^f`:
495    /// ```text
496    /// chunk_count ≈ N / (2^f)
497    /// ```
498    ///
499    /// Example: 1GB file with f=14 (16KB avg) → ~65,536 chunks
500    pub chunk_count: u64,
501
502    /// Number of unique chunks after deduplication.
503    ///
504    /// This counts only distinct chunks (based on CRC32 hash comparison).
505    /// Duplicate chunks are not counted in this total.
506    ///
507    /// # Interpretation
508    ///
509    /// - `unique_chunk_count == chunk_count` → No deduplication (all unique)
510    /// - `unique_chunk_count << chunk_count` → High deduplication (many duplicates)
511    /// - Ratio `unique_chunk_count / chunk_count` indicates dedup effectiveness
512    ///
513    /// # Hash Collision Note
514    ///
515    /// Uses the first 8 bytes of BLAKE3 as a 64-bit hash, providing negligible
516    /// collision probability (birthday bound at ~2^32 ≈ 4 billion unique chunks).
517    pub unique_chunk_count: u64,
518}
519
520/// Streaming iterator that yields content-defined chunks from a reader.
521///
522/// `StreamChunker` is the primary interface for applying FastCDC to data streams
523/// in a memory-efficient manner. It reads data incrementally from any `Read`
524/// source and applies content-defined chunking to produce variable-sized chunks
525/// suitable for compression, deduplication, and storage.
526///
527/// # Design Goals
528///
529/// - **Streaming**: Process arbitrarily large files without loading entire contents
530/// - **Zero-copy (internal)**: Minimize data copying within the chunker
531/// - **Bounded memory**: Fixed buffer size regardless of input size
532/// - **Deterministic**: Same input always produces identical chunk boundaries
533/// - **Iterator-based**: Idiomatic Rust API that composes with other iterators
534///
535/// # Buffering Strategy
536///
537/// The chunker maintains a fixed-size internal buffer to handle the mismatch
538/// between read boundaries (determined by the OS/filesystem) and chunk boundaries
539/// (determined by content):
540///
541/// - **Buffer size**: `2 * max_chunk_size` (default: ~128 KB)
542/// - **Refill trigger**: When available data < `max_chunk_size`
543/// - **Shift threshold**: When cursor advances beyond buffer midpoint
544/// - **EOF handling**: Flushes remaining data as final chunk
545///
546/// ## Buffer Management Example
547///
548/// ```text
549/// Initial state (empty buffer):
550/// [__________________________________|__________________________________]
551///  0                                 cursor                          2*z
552///
553/// After reading (filled to cursor):
554/// [##################################|__________________________________]
555///  0                                 cursor=filled                   2*z
556///
557/// After consuming one chunk (cursor advances):
558/// [################XXXXXXXXXXXXXXXXXX|__________________________________]
559///  0              consumed          cursor                           2*z
560///                 (next chunk)       filled
561///
562/// After shift (move unconsumed data to start):
563/// [XXXXXXXXXXXXXXXXXX________________|__________________________________]
564///  0                cursor           filled                          2*z
565///  (new position)
566/// ```
567///
568/// # Chunk Boundary Guarantees
569///
570/// The chunker guarantees that:
571///
572/// 1. **Minimum size**: Every chunk (except possibly the last) is ≥ `min_size`
573/// 2. **Maximum size**: Every chunk is ≤ `max_size` (strictly enforced)
574/// 3. **Determinism**: Given the same input and parameters, chunk boundaries are identical
575/// 4. **Completeness**: All input bytes appear in exactly one chunk (no gaps, no overlaps)
576///
577/// # Performance Considerations
578///
579/// - **Memory allocation**: Each chunk is copied to an owned `Vec<u8>`. For
580///   zero-allocation iteration, consider using `StreamChunkerSlice` (if available)
581///   or process chunks in-place within the iterator.
582/// - **I/O pattern**: Reads in large blocks (up to `max_chunk_size`) to minimize
583///   syscalls. Works efficiently with buffered readers but not required.
584/// - **CPU cost**: Gear hash computation is the bottleneck (~500 MB/s). Consider
585///   parallel chunking of independent streams if throughput is critical.
586///
587/// # Examples
588///
589/// ## Basic Usage
590///
591/// ```no_run
592/// use hexz_core::algo::dedup::cdc::StreamChunker;
593/// use hexz_core::algo::dedup::dcam::DedupeParams;
594/// use std::fs::File;
595///
596/// # fn main() -> std::io::Result<()> {
597/// let file = File::open("data.bin")?;
598/// let params = DedupeParams::default();
599/// let chunker = StreamChunker::new(file, params);
600///
601/// for chunk_result in chunker {
602///     let chunk = chunk_result?;
603///     println!("Chunk: {} bytes", chunk.len());
604/// }
605/// # Ok(())
606/// # }
607/// ```
608///
609/// ## With Compression Pipeline
610///
611/// ```no_run
612/// use hexz_core::algo::dedup::cdc::StreamChunker;
613/// use hexz_core::algo::dedup::dcam::DedupeParams;
614/// use std::fs::File;
615/// use std::collections::HashMap;
616///
617/// # fn main() -> std::io::Result<()> {
618/// let file = File::open("memory.raw")?;
619/// let params = DedupeParams::default();
620/// let chunker = StreamChunker::new(file, params);
621///
622/// let mut dedup_map: HashMap<u64, usize> = HashMap::new();
623/// let mut unique_chunks = Vec::new();
624///
625/// for chunk_result in chunker {
626///     let chunk = chunk_result?;
627///     let hash = crc32fast::hash(&chunk) as u64;
628///
629///     if !dedup_map.contains_key(&hash) {
630///         let chunk_id = unique_chunks.len();
631///         dedup_map.insert(hash, chunk_id);
632///         // Compress and store chunk
633///         unique_chunks.push(chunk);
634///     }
635/// }
636///
637/// println!("Stored {} unique chunks", unique_chunks.len());
638/// # Ok(())
639/// # }
640/// ```
641///
642/// ## Custom Parameters
643///
644/// ```no_run
645/// use hexz_core::algo::dedup::cdc::StreamChunker;
646/// use hexz_core::algo::dedup::dcam::DedupeParams;
647/// use std::fs::File;
648///
649/// # fn main() -> std::io::Result<()> {
650/// let file = File::open("disk.raw")?;
651///
652/// // Custom parameters: larger chunks for better compression ratio
653/// let mut params = DedupeParams::default();
654/// params.f = 15; // 32KB average (was 16KB)
655/// params.m = 8192; // 8KB minimum (was 2KB)
656/// params.z = 131072; // 128KB maximum (was 64KB)
657///
658/// let chunker = StreamChunker::new(file, params);
659///
660/// let chunk_sizes: Vec<usize> = chunker
661///     .map(|res| res.map(|chunk| chunk.len()))
662///     .collect::<Result<_, _>>()?;
663///
664/// let avg_size: usize = chunk_sizes.iter().sum::<usize>() / chunk_sizes.len();
665/// println!("Average chunk size: {} bytes", avg_size);
666/// # Ok(())
667/// # }
668/// ```
669pub struct StreamChunker<R> {
670    /// Underlying data source.
671    ///
672    /// Can be any type implementing `Read`: files, network streams, memory
673    /// buffers, etc. The chunker makes no assumptions about the read pattern
674    /// or blocking behavior.
675    reader: R,
676
677    /// Internal buffer for staging data between reads and chunk boundaries.
678    ///
679    /// Size is `2 * max_chunk_size` to allow efficient buffering and shifting.
680    /// Allocated once during construction and reused for the entire stream.
681    buffer: Vec<u8>,
682
683    /// Current read position within the buffer.
684    ///
685    /// Points to the start of the next chunk to be yielded. Invariant:
686    /// `0 <= cursor <= filled <= buffer.len()`
687    cursor: usize,
688
689    /// Number of valid bytes currently in the buffer.
690    ///
691    /// Bytes in range `[0, filled)` contain valid data. Bytes in range
692    /// `[filled, buffer.len())` are unused scratch space.
693    filled: usize,
694
695    /// Minimum chunk size in bytes (FastCDC parameter `m`).
696    ///
697    /// No chunk will be smaller than this (except possibly the final chunk
698    /// if EOF is reached). Prevents tiny chunks that would increase metadata
699    /// overhead disproportionately.
700    min_size: usize,
701
702    /// Average chunk size in bytes (FastCDC parameter `2^f`).
703    ///
704    /// This is the statistical expectation of chunk sizes. Actual chunks follow
705    /// a bounded exponential distribution around this value due to the rolling
706    /// hash cut-point probability.
707    avg_size: usize,
708
709    /// Maximum chunk size in bytes (FastCDC parameter `z`).
710    ///
711    /// Chunks are forcibly cut at this size regardless of hash values. Bounds
712    /// worst-case memory usage and ensures progress even in adversarial inputs
713    /// (e.g., runs of zeros that never trigger hash-based cuts).
714    max_size: usize,
715
716    /// Whether EOF has been reached on the underlying reader.
717    ///
718    /// Once set to `true`, no further reads will be attempted. The chunker
719    /// will drain remaining buffered data and then terminate iteration.
720    eof: bool,
721}
722
723impl<R: Read> StreamChunker<R> {
724    /// Creates a new streaming chunker with the specified deduplication parameters.
725    ///
726    /// This constructor initializes the chunker's internal buffer and extracts the
727    /// relevant FastCDC parameters. The chunker is ready to begin iteration immediately
728    /// after construction; no separate initialization step is required.
729    ///
730    /// # Parameters
731    ///
732    /// - `reader`: Data source implementing `Read`. This can be:
733    ///   - `File` for reading from disk
734    ///   - `Cursor<Vec<u8>>` for in-memory data
735    ///   - `BufReader<File>` for buffered I/O (redundant but not harmful)
736    ///   - Any other `Read` implementation (network streams, pipes, etc.)
737    ///
738    /// - `params`: FastCDC parameters controlling chunk size distribution. Key fields:
739    ///   - `params.f`: Fingerprint bits (average chunk size = 2^f)
740    ///   - `params.m`: Minimum chunk size in bytes
741    ///   - `params.z`: Maximum chunk size in bytes
742    ///   - `params.w`: Rolling hash window size (informational, not used here)
743    ///
744    /// # Returns
745    ///
746    /// A new `StreamChunker` ready to yield chunks via iteration. The chunker
747    /// takes ownership of the reader and will consume it as iteration proceeds.
748    ///
749    /// # Memory Allocation
750    ///
751    /// Allocates a buffer of `2 * params.z` bytes (or 2 MB, whichever is larger).
752    /// This is a one-time allocation that persists for the entire stream:
753    ///
754    /// - Default settings (`z=64KB`): **~128 KB** per chunker
755    /// - Large chunks (`z=128KB`): **~256 KB** per chunker
756    /// - Small chunks (`z=16KB`): **~2 MB** per chunker (due to 2MB minimum)
757    ///
758    /// The 2MB minimum prevents excessive refill operations for small chunk sizes.
759    ///
760    /// # Panics
761    ///
762    /// May panic if memory allocation fails (extremely rare on modern systems with
763    /// virtual memory, but possible in constrained environments). For default parameters
764    /// this allocates ~128 KB, well within typical stack/heap limits.
765    ///
766    /// # Performance Notes
767    ///
768    /// - **No I/O in constructor**: Data reading is deferred until first `next()` call
769    /// - **Deterministic**: Given the same input and parameters, produces identical chunks
770    /// - **Thread-safe (if R is)**: The chunker itself has no shared state
771    ///
772    /// # Examples
773    ///
774    /// ## Default Parameters
775    ///
776    /// ```no_run
777    /// use hexz_core::algo::dedup::cdc::StreamChunker;
778    /// use hexz_core::algo::dedup::dcam::DedupeParams;
779    /// use std::fs::File;
780    ///
781    /// # fn main() -> std::io::Result<()> {
782    /// let file = File::open("data.bin")?;
783    /// let chunker = StreamChunker::new(file, DedupeParams::default());
784    /// // Chunker is ready to iterate
785    /// # Ok(())
786    /// # }
787    /// ```
788    ///
789    /// ## Custom Parameters
790    ///
791    /// ```no_run
792    /// use hexz_core::algo::dedup::cdc::StreamChunker;
793    /// use hexz_core::algo::dedup::dcam::DedupeParams;
794    /// use std::fs::File;
795    ///
796    /// # fn main() -> std::io::Result<()> {
797    /// let file = File::open("disk.raw")?;
798    /// let params = DedupeParams {
799    ///     f: 15,       // 32KB average
800    ///     m: 4096,     // 4KB minimum
801    ///     z: 131072,   // 128KB maximum
802    ///     w: 48,       // Window size (not used by StreamChunker)
803    ///     v: 16,       // Metadata overhead (not used by StreamChunker)
804    /// };
805    /// let chunker = StreamChunker::new(file, params);
806    /// # Ok(())
807    /// # }
808    /// ```
809    pub fn new(reader: R, params: DedupeParams) -> Self {
810        // Buffer needs to be at least max_size.
811        // We use 2 * max_size to allow for shifting and efficient reading.
812        let buf_size = (params.z as usize).max(1024 * 1024) * 2;
813        Self {
814            reader,
815            buffer: vec![0u8; buf_size],
816            cursor: 0,
817            filled: 0,
818            min_size: params.m as usize,
819            avg_size: 1 << params.f,
820            max_size: params.z as usize,
821            eof: false,
822        }
823    }
824
825    /// Refills the internal buffer from the reader.
826    ///
827    /// This private method manages the buffer state to maintain a continuous window
828    /// of data for chunking. It performs buffer compaction (shifting) and reads new
829    /// data to maximize available space for chunk detection.
830    ///
831    /// # Algorithm
832    ///
833    /// 1. **Shift unprocessed data**: If `cursor > 0`, move bytes `[cursor..filled)`
834    ///    to the start of the buffer to reclaim space
835    /// 2. **Read new data**: Call `reader.read()` repeatedly to fill available space
836    /// 3. **Update EOF flag**: Set `eof = true` when reader returns 0 bytes
837    /// 4. **Early exit**: Stop reading once buffer contains ≥ `max_chunk_size` bytes
838    ///
839    /// # Buffer State Before/After
840    ///
841    /// Before:
842    /// ```text
843    /// [consumed|unprocessed______|____________available____________]
844    ///  0       cursor            filled                          capacity
845    /// ```
846    ///
847    /// After shift:
848    /// ```text
849    /// [unprocessed______|____________available___________________]
850    ///  0                new_filled                             capacity
851    /// ```
852    ///
853    /// After refill:
854    /// ```text
855    /// [unprocessed______|new_data__________________________|____]
856    ///  0                old_filled                        new_filled
857    /// ```
858    ///
859    /// # Performance
860    ///
861    /// - **Shift cost**: O(filled - cursor) via `copy_within()` (memmove)
862    /// - **Read cost**: Depends on underlying reader (syscall overhead)
863    /// - **Amortization**: Shifts happen infrequently (once per chunk consumed)
864    ///
865    /// # Errors
866    ///
867    /// Returns any I/O errors encountered during reading. Common errors:
868    ///
869    /// - `ErrorKind::UnexpectedEof`: Reader closed unexpectedly (shouldn't happen
870    ///   for valid streams; `read()` returning 0 is the proper EOF signal)
871    /// - `ErrorKind::Interrupted`: Signal interrupted read (automatically retried
872    ///   by the read loop in this implementation)
873    /// - Other errors: Propagated from underlying reader (e.g., permission denied,
874    ///   device errors, network timeouts)
875    fn refill(&mut self) -> io::Result<()> {
876        // Only shift when cursor has consumed more than half the buffer.
877        // This reduces memmove frequency by ~50% while still ensuring enough
878        // space for the next read.
879        if self.cursor > self.buffer.len() / 2 {
880            self.buffer.copy_within(self.cursor..self.filled, 0);
881            self.filled -= self.cursor;
882            self.cursor = 0;
883        }
884
885        while self.filled < self.buffer.len() && !self.eof {
886            let n = self.reader.read(&mut self.buffer[self.filled..])?;
887            if n == 0 {
888                self.eof = true;
889            } else {
890                self.filled += n;
891                // If we have enough data to potentially find a chunk, we can stop reading for now
892                if self.filled >= self.max_size {
893                    break;
894                }
895            }
896        }
897        Ok(())
898    }
899}
900
901impl<R: Read> Iterator for StreamChunker<R> {
902    type Item = io::Result<Vec<u8>>;
903
904    /// Yields the next content-defined chunk.
905    ///
906    /// This is the core iterator method that drives the chunking process. Each call
907    /// performs the following operations:
908    ///
909    /// # Algorithm
910    ///
911    /// 1. **Check for data**: If buffer is empty and EOF not reached, refill
912    /// 2. **Handle EOF**: If no data available after refill, return `None`
913    /// 3. **Determine available window**: Compute bytes available for chunking
914    /// 4. **Apply FastCDC**:
915    ///    - If available < `min_size`: Return all available bytes (last chunk)
916    ///    - Else: Run FastCDC on window `[cursor..min(cursor+max_size, filled)]`
917    /// 5. **Extract chunk**: Copy chunk bytes to new `Vec<u8>`, advance cursor
918    /// 6. **Return**: Yield `Some(Ok(chunk))`
919    ///
920    /// # FastCDC Integration
921    ///
922    /// The method delegates to the `fastcdc` crate's `FastCDC::new()` constructor,
923    /// which implements the normalized chunking algorithm. The returned chunk length
924    /// respects all size constraints:
925    ///
926    /// - No chunk < `min_size` (except final chunk)
927    /// - No chunk > `max_size` (strictly enforced)
928    /// - Average chunk size ≈ `avg_size = 2^f`
929    ///
930    /// # Chunk Length Determination
931    ///
932    /// The algorithm chooses chunk length as follows:
933    ///
934    /// | Condition | Action |
935    /// |-----------|--------|
936    /// | `available < min_size` | Return all available bytes |
937    /// | FastCDC finds cut point | Use FastCDC-determined length |
938    /// | `available >= max_size` && no cut point | Force cut at `max_size` |
939    /// | EOF reached && no cut point | Return all remaining bytes |
940    /// | Buffer full && no cut point | Force cut at `max_size` |
941    /// | Other | Return all available bytes |
942    ///
943    /// # Returns
944    ///
945    /// - `Some(Ok(chunk))` - Next chunk successfully extracted and copied
946    /// - `Some(Err(e))` - I/O error occurred during refill
947    /// - `None` - Stream exhausted (no more data available)
948    ///
949    /// # Errors
950    ///
951    /// Returns `Some(Err(e))` if the underlying reader encounters an I/O error
952    /// during refill. After an error, the iterator should be considered invalid
953    /// (further calls may panic or return inconsistent results).
954    ///
955    /// Common error causes:
956    /// - File read errors (permissions, disk errors)
957    /// - Network errors (timeouts, connection reset)
958    /// - Interrupted system calls (usually auto-retried)
959    ///
960    /// # Chunk Size Guarantees
961    ///
962    /// The returned chunks satisfy these constraints:
963    ///
964    /// - **Minimum**: `≥ min_size` (except possibly the last chunk)
965    /// - **Maximum**: `≤ max_size` (always enforced, never violated)
966    /// - **Average**: `≈ avg_size = 2^f` (statistical expectation)
967    /// - **Last chunk**: May be smaller than `min_size` if EOF reached
968    ///
969    /// # Memory Allocation
970    ///
971    /// Each chunk is copied to a new `Vec<u8>`. For zero-copy iteration within
972    /// the buffer lifetime, consider modifying the API to return slices with
973    /// appropriate lifetimes (future enhancement).
974    ///
975    /// # Examples
976    ///
977    /// ```no_run
978    /// use hexz_core::algo::dedup::cdc::StreamChunker;
979    /// use hexz_core::algo::dedup::dcam::DedupeParams;
980    /// use std::fs::File;
981    ///
982    /// # fn main() -> std::io::Result<()> {
983    /// let file = File::open("data.bin")?;
984    /// let mut chunker = StreamChunker::new(file, DedupeParams::default());
985    ///
986    /// // First chunk
987    /// if let Some(Ok(chunk)) = chunker.next() {
988    ///     println!("First chunk: {} bytes", chunk.len());
989    /// }
990    ///
991    /// // Iterate remaining chunks
992    /// for chunk_result in chunker {
993    ///     let chunk = chunk_result?;
994    ///     // Process chunk...
995    /// }
996    /// # Ok(())
997    /// # }
998    /// ```
999    ///
1000    /// # Performance Characteristics
1001    ///
1002    /// - **Amortized time**: O(chunk_size) per chunk (dominated by copying)
1003    /// - **Space**: O(chunk_size) allocation per chunk yielded
1004    /// - **Throughput**: ~500 MB/s on modern CPUs (bottlenecked by Gear hash)
1005    fn next(&mut self) -> Option<Self::Item> {
1006        if self.cursor >= self.filled {
1007            if self.eof {
1008                return None;
1009            }
1010            if let Err(e) = self.refill() {
1011                return Some(Err(e));
1012            }
1013            if self.filled == 0 {
1014                return None;
1015            }
1016        }
1017
1018        let available = self.filled - self.cursor;
1019
1020        // Refill if we don't have enough data for a full chunk and more data is available
1021        if available < self.max_size && !self.eof {
1022            if let Err(e) = self.refill() {
1023                return Some(Err(e));
1024            }
1025        }
1026        let available = self.filled - self.cursor;
1027
1028        let chunk_len = if available < self.min_size {
1029            available
1030        } else {
1031            // Run FastCDC on the available window
1032            let data = &self.buffer[self.cursor..self.filled];
1033            let search_limit = std::cmp::min(data.len(), self.max_size);
1034
1035            let chunker = fastcdc::v2020::FastCDC::new(
1036                &data[..search_limit],
1037                self.min_size as u32,
1038                self.avg_size as u32,
1039                self.max_size as u32,
1040            );
1041
1042            if let Some(chunk) = chunker.into_iter().next() {
1043                chunk.length
1044            } else {
1045                // No cut point found
1046                if available >= self.max_size {
1047                    self.max_size
1048                } else if self.eof {
1049                    available
1050                } else if self.filled == self.buffer.len() {
1051                    self.max_size
1052                } else {
1053                    available
1054                }
1055            }
1056        };
1057
1058        let start = self.cursor;
1059        self.cursor += chunk_len;
1060        Some(Ok(self.buffer[start..start + chunk_len].to_vec()))
1061    }
1062}
1063
1064/// Performs a single-pass deduplication analysis on a data stream.
1065///
1066/// This function applies FastCDC chunking to a data stream and tracks unique chunks
1067/// via hash-based deduplication. It is designed as a lightweight analysis pass for
1068/// the DCAM model to estimate deduplication potential before committing to full
1069/// snapshot packing.
1070///
1071/// # Algorithm
1072///
1073/// 1. **Chunk the input** using `StreamChunker` with the provided FastCDC parameters
1074/// 2. **Hash each chunk** using CRC32 (fast, non-cryptographic)
1075/// 3. **Track uniqueness** via a `HashSet<u64>` of seen hashes
1076/// 4. **Accumulate statistics**:
1077///    - Total chunk count (all chunks, including duplicates)
1078///    - Unique chunk count (distinct hashes)
1079///    - Unique bytes (sum of unique chunk sizes)
1080///
1081/// # Parameters
1082///
1083/// - `reader`: Data source to analyze. Can be any `Read` implementation:
1084///   - `File` for disk images or backups
1085///   - `Cursor<Vec<u8>>` for in-memory data
1086///   - Network streams, pipes, or other I/O sources
1087///
1088/// - `params`: FastCDC parameters controlling chunking behavior:
1089///   - `params.f`: Fingerprint bits (average chunk size = 2^f bytes)
1090///   - `params.m`: Minimum chunk size in bytes
1091///   - `params.z`: Maximum chunk size in bytes
1092///   - Other fields (`w`, `v`) are used by DCAM but not by the chunker
1093///
1094/// # Returns
1095///
1096/// `Ok(CdcStats)` containing:
1097/// - `chunk_count`: Total chunks processed (including duplicates)
1098/// - `unique_chunk_count`: Number of distinct chunks (unique hashes)
1099/// - `unique_bytes`: Total bytes in unique chunks (storage requirement estimate)
1100/// - `total_bytes`: Currently unused (always 0) for performance reasons
1101///
1102/// # Errors
1103///
1104/// Returns `Err` if the underlying reader encounters an I/O error during reading.
1105/// Common error cases:
1106///
1107/// - `std::io::ErrorKind::NotFound`: File doesn't exist (if using `File`)
1108/// - `std::io::ErrorKind::PermissionDenied`: Insufficient permissions to read
1109/// - `std::io::ErrorKind::UnexpectedEof`: Reader closed unexpectedly
1110/// - Network-related errors (timeouts, connection resets) for network streams
1111/// - Disk errors (bad sectors, hardware failures) for file-based readers
1112///
1113/// After an error, the returned statistics (if any) are invalid and should be
1114/// discarded. The function does not attempt recovery or partial results.
1115///
1116/// # Performance
1117///
1118/// ## Time Complexity
1119///
1120/// - **Single-threaded throughput**: ~500 MB/s on modern CPUs (2020+)
1121/// - **Bottleneck**: Gear hash computation (CPU-bound, not I/O-bound)
1122/// - **Scaling**: Linear in input size, O(N) where N = total bytes
1123///
1124/// ## Space Complexity
1125///
1126/// - **Hash set**: O(unique chunks) × 8 bytes per hash ≈ O(N / avg_chunk_size) × 8
1127/// - **Example**: 1GB input, 16KB avg chunks → ~65K unique chunks → ~520 KB hash set
1128/// - **Streaming buffer**: Fixed at `2 * params.z` (typically ~128 KB)
1129///
1130/// ## Memory Optimization
1131///
1132/// For extremely large datasets where hash set size becomes prohibitive, consider:
1133/// - Using a probabilistic data structure (Bloom filter, HyperLogLog)
1134/// - Sampling (analyze every Nth chunk instead of all chunks)
1135/// - External deduplication index (disk-based hash storage)
1136///
1137/// # Hash Collision Probability
1138///
1139/// This function uses the first 8 bytes of BLAKE3 as a 64-bit hash.
1140/// The probability of collision is governed by the birthday paradox:
1141///
1142/// ```text
1143/// P(collision) ≈ n² / (2 × 2^64) where n = unique_chunk_count
1144/// ```
1145///
1146/// For typical workloads:
1147/// - **1M unique chunks**: P(collision) ≈ 2.7e-8 — negligible
1148/// - **100M unique chunks**: P(collision) ≈ 2.7e-4 (0.027%) — negligible
1149/// - **4 billion unique chunks**: P(collision) ≈ 50% (birthday bound)
1150///
1151/// # Examples
1152///
1153/// ## Basic Usage
1154///
1155/// ```no_run
1156/// use hexz_core::algo::dedup::cdc::analyze_stream;
1157/// use hexz_core::algo::dedup::dcam::DedupeParams;
1158/// use std::fs::File;
1159///
1160/// # fn main() -> std::io::Result<()> {
1161/// let file = File::open("dataset.bin")?;
1162/// let params = DedupeParams::default();
1163/// let stats = analyze_stream(file, &params)?;
1164///
1165/// println!("Total chunks: {}", stats.chunk_count);
1166/// println!("Unique chunks: {}", stats.unique_chunk_count);
1167/// println!("Unique bytes: {} MB", stats.unique_bytes / 1_000_000);
1168///
1169/// let dedup_factor = stats.chunk_count as f64 / stats.unique_chunk_count as f64;
1170/// println!("Deduplication factor: {:.2}x", dedup_factor);
1171/// # Ok(())
1172/// # }
1173/// ```
1174///
1175/// ## Estimating Storage Savings
1176///
1177/// ```no_run
1178/// use hexz_core::algo::dedup::cdc::analyze_stream;
1179/// use hexz_core::algo::dedup::dcam::DedupeParams;
1180/// use std::fs::File;
1181///
1182/// # fn main() -> std::io::Result<()> {
1183/// let file = File::open("snapshot.raw")?;
1184/// let params = DedupeParams::default();
1185/// let stats = analyze_stream(file, &params)?;
1186///
1187/// // Estimate total bytes (not tracked directly for performance)
1188/// let estimated_total = stats.unique_bytes * stats.chunk_count / stats.unique_chunk_count;
1189/// let savings_ratio = 1.0 - (stats.unique_bytes as f64 / estimated_total as f64);
1190///
1191/// println!("Estimated storage savings: {:.1}%", savings_ratio * 100.0);
1192/// # Ok(())
1193/// # }
1194/// ```
1195///
1196/// ## Comparing Parameter Sets
1197///
1198/// ```no_run
1199/// use hexz_core::algo::dedup::cdc::analyze_stream;
1200/// use hexz_core::algo::dedup::dcam::DedupeParams;
1201/// use std::io::Cursor;
1202///
1203/// # fn main() -> std::io::Result<()> {
1204/// let data = vec![0u8; 10_000_000]; // 10 MB test data
1205///
1206/// // Analyze with small chunks
1207/// let mut params_small = DedupeParams::default();
1208/// params_small.f = 13; // 8KB average
1209/// let stats_small = analyze_stream(Cursor::new(&data), &params_small)?;
1210///
1211/// // Analyze with large chunks
1212/// let mut params_large = DedupeParams::default();
1213/// params_large.f = 15; // 32KB average
1214/// let stats_large = analyze_stream(Cursor::new(&data), &params_large)?;
1215///
1216/// println!("Small chunks: {} total, {} unique",
1217///          stats_small.chunk_count, stats_small.unique_chunk_count);
1218/// println!("Large chunks: {} total, {} unique",
1219///          stats_large.chunk_count, stats_large.unique_chunk_count);
1220/// # Ok(())
1221/// # }
1222/// ```
1223///
1224/// # Use Cases
1225///
1226/// - **DCAM Model Input**: Feeds statistics into analytical formulas to predict
1227///   optimal FastCDC parameters for a given dataset
1228/// - **Capacity Planning**: Estimates storage requirements before provisioning
1229///   deduplication infrastructure
1230/// - **Parameter Tuning**: Compares deduplication effectiveness across different
1231///   chunk size configurations
1232/// - **Workload Characterization**: Analyzes redundancy patterns in datasets
1233///
1234/// # Relationship to `StreamChunker`
1235///
1236/// This function is a convenience wrapper around `StreamChunker` that adds
1237/// deduplication tracking via a hash set. For actual snapshot packing, use
1238/// `StreamChunker` directly and handle deduplication in the compression/storage
1239/// pipeline. The analysis function is intended as a lightweight dry-run only.
1240pub fn analyze_stream<R: Read>(reader: R, params: &DedupeParams) -> io::Result<CdcStats> {
1241    let mut unique_bytes = 0;
1242    let mut chunk_count = 0;
1243    let mut unique_chunk_count = 0;
1244    let mut seen_chunks: HashSet<u64> = HashSet::new();
1245
1246    // Use the streaming chunker for analysis too, to ensure consistency
1247    let chunker = StreamChunker::new(reader, *params);
1248
1249    for chunk_res in chunker {
1250        let chunk = chunk_res?;
1251        let len = chunk.len() as u64;
1252        // Use first 8 bytes of BLAKE3 digest as a 64-bit hash.
1253        // BLAKE3 at 16KB chunks is >3 GB/s — only ~2-3x slower than CRC32 —
1254        // but provides 2^64 hash space (birthday bound at ~4 billion chunks)
1255        // vs CRC32's 2^32 (birthday bound at ~65K chunks).
1256        let digest = *blake3::hash(&chunk).as_bytes();
1257        let hash = u64::from_le_bytes([
1258            digest[0], digest[1], digest[2], digest[3], digest[4], digest[5], digest[6], digest[7],
1259        ]);
1260
1261        chunk_count += 1;
1262        if seen_chunks.insert(hash) {
1263            unique_bytes += len;
1264            unique_chunk_count += 1;
1265        }
1266    }
1267
1268    Ok(CdcStats {
1269        total_bytes: 0,
1270        unique_bytes,
1271        chunk_count,
1272        unique_chunk_count,
1273    })
1274}