pub struct StreamChunker<R> { /* private fields */ }Expand description
Streaming iterator that yields content-defined chunks from a reader.
StreamChunker is the primary interface for applying FastCDC to data streams
in a memory-efficient manner. It reads data incrementally from any Read
source and applies content-defined chunking to produce variable-sized chunks
suitable for compression, deduplication, and storage.
§Design Goals
- Streaming: Process arbitrarily large files without loading entire contents
- Zero-copy (internal): Minimize data copying within the chunker
- Bounded memory: Fixed buffer size regardless of input size
- Deterministic: Same input always produces identical chunk boundaries
- Iterator-based: Idiomatic Rust API that composes with other iterators
§Buffering Strategy
The chunker maintains a fixed-size internal buffer to handle the mismatch between read boundaries (determined by the OS/filesystem) and chunk boundaries (determined by content):
- Buffer size:
2 * max_chunk_size(default: ~128 KB) - Refill trigger: When available data <
max_chunk_size - Shift threshold: When cursor advances beyond buffer midpoint
- EOF handling: Flushes remaining data as final chunk
§Buffer Management Example
Initial state (empty buffer):
[__________________________________|__________________________________]
0 cursor 2*z
After reading (filled to cursor):
[##################################|__________________________________]
0 cursor=filled 2*z
After consuming one chunk (cursor advances):
[################XXXXXXXXXXXXXXXXXX|__________________________________]
0 consumed cursor 2*z
(next chunk) filled
After shift (move unconsumed data to start):
[XXXXXXXXXXXXXXXXXX________________|__________________________________]
0 cursor filled 2*z
(new position)§Chunk Boundary Guarantees
The chunker guarantees that:
- Minimum size: Every chunk (except possibly the last) is ≥
min_size - Maximum size: Every chunk is ≤
max_size(strictly enforced) - Determinism: Given the same input and parameters, chunk boundaries are identical
- Completeness: All input bytes appear in exactly one chunk (no gaps, no overlaps)
§Performance Considerations
- Memory allocation: Each chunk is copied to an owned
Vec<u8>. For zero-allocation iteration, consider usingStreamChunkerSlice(if available) or process chunks in-place within the iterator. - I/O pattern: Reads in large blocks (up to
max_chunk_size) to minimize syscalls. Works efficiently with buffered readers but not required. - CPU cost: Gear hash computation is the bottleneck (~500 MB/s). Consider parallel chunking of independent streams if throughput is critical.
§Examples
§Basic Usage
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
let file = File::open("data.bin")?;
let params = DedupeParams::default();
let chunker = StreamChunker::new(file, params);
for chunk_result in chunker {
let chunk = chunk_result?;
println!("Chunk: {} bytes", chunk.len());
}§With Compression Pipeline
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
use std::collections::HashMap;
let file = File::open("memory.raw")?;
let params = DedupeParams::default();
let chunker = StreamChunker::new(file, params);
let mut dedup_map: HashMap<u64, usize> = HashMap::new();
let mut unique_chunks = Vec::new();
for chunk_result in chunker {
let chunk = chunk_result?;
let hash = crc32fast::hash(&chunk) as u64;
if !dedup_map.contains_key(&hash) {
let chunk_id = unique_chunks.len();
dedup_map.insert(hash, chunk_id);
// Compress and store chunk
unique_chunks.push(chunk);
}
}
println!("Stored {} unique chunks", unique_chunks.len());§Custom Parameters
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
let file = File::open("disk.raw")?;
// Custom parameters: larger chunks for better compression ratio
let mut params = DedupeParams::default();
params.f = 15; // 32KB average (was 16KB)
params.m = 8192; // 8KB minimum (was 2KB)
params.z = 131072; // 128KB maximum (was 64KB)
let chunker = StreamChunker::new(file, params);
let chunk_sizes: Vec<usize> = chunker
.map(|res| res.map(|chunk| chunk.len()))
.collect::<Result<_, _>>()?;
let avg_size: usize = chunk_sizes.iter().sum::<usize>() / chunk_sizes.len();
println!("Average chunk size: {} bytes", avg_size);Implementations§
Source§impl<R: Read> StreamChunker<R>
impl<R: Read> StreamChunker<R>
Sourcepub fn new(reader: R, params: DedupeParams) -> Self
pub fn new(reader: R, params: DedupeParams) -> Self
Creates a new streaming chunker with the specified deduplication parameters.
This constructor initializes the chunker’s internal buffer and extracts the relevant FastCDC parameters. The chunker is ready to begin iteration immediately after construction; no separate initialization step is required.
§Parameters
-
reader: Data source implementingRead. This can be:Filefor reading from diskCursor<Vec<u8>>for in-memory dataBufReader<File>for buffered I/O (redundant but not harmful)- Any other
Readimplementation (network streams, pipes, etc.)
-
params: FastCDC parameters controlling chunk size distribution. Key fields:params.f: Fingerprint bits (average chunk size = 2^f)params.m: Minimum chunk size in bytesparams.z: Maximum chunk size in bytesparams.w: Rolling hash window size (informational, not used here)
§Returns
A new StreamChunker ready to yield chunks via iteration. The chunker
takes ownership of the reader and will consume it as iteration proceeds.
§Memory Allocation
Allocates a buffer of 2 * params.z bytes (or 2 MB, whichever is larger).
This is a one-time allocation that persists for the entire stream:
- Default settings (
z=64KB): ~128 KB per chunker - Large chunks (
z=128KB): ~256 KB per chunker - Small chunks (
z=16KB): ~2 MB per chunker (due to 2MB minimum)
The 2MB minimum prevents excessive refill operations for small chunk sizes.
§Panics
May panic if memory allocation fails (extremely rare on modern systems with virtual memory, but possible in constrained environments). For default parameters this allocates ~128 KB, well within typical stack/heap limits.
§Performance Notes
- No I/O in constructor: Data reading is deferred until first
next()call - Deterministic: Given the same input and parameters, produces identical chunks
- Thread-safe (if R is): The chunker itself has no shared state
§Examples
§Default Parameters
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
let file = File::open("data.bin")?;
let chunker = StreamChunker::new(file, DedupeParams::default());
// Chunker is ready to iterate§Custom Parameters
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
let file = File::open("disk.raw")?;
let params = DedupeParams {
f: 15, // 32KB average
m: 4096, // 4KB minimum
z: 131072, // 128KB maximum
w: 48, // Window size (not used by StreamChunker)
v: 16, // Metadata overhead (not used by StreamChunker)
};
let chunker = StreamChunker::new(file, params);Trait Implementations§
Source§impl<R: Read> Iterator for StreamChunker<R>
impl<R: Read> Iterator for StreamChunker<R>
Source§fn next(&mut self) -> Option<Self::Item>
fn next(&mut self) -> Option<Self::Item>
Yields the next content-defined chunk.
This is the core iterator method that drives the chunking process. Each call performs the following operations:
§Algorithm
- Check for data: If buffer is empty and EOF not reached, refill
- Handle EOF: If no data available after refill, return
None - Determine available window: Compute bytes available for chunking
- Apply FastCDC:
- If available <
min_size: Return all available bytes (last chunk) - Else: Run FastCDC on window
[cursor..min(cursor+max_size, filled)]
- If available <
- Extract chunk: Copy chunk bytes to new
Vec<u8>, advance cursor - Return: Yield
Some(Ok(chunk))
§FastCDC Integration
The method delegates to the fastcdc crate’s FastCDC::new() constructor,
which implements the normalized chunking algorithm. The returned chunk length
respects all size constraints:
- No chunk <
min_size(except final chunk) - No chunk >
max_size(strictly enforced) - Average chunk size ≈
avg_size = 2^f
§Chunk Length Determination
The algorithm chooses chunk length as follows:
| Condition | Action |
|---|---|
available < min_size | Return all available bytes |
| FastCDC finds cut point | Use FastCDC-determined length |
available >= max_size && no cut point | Force cut at max_size |
| EOF reached && no cut point | Return all remaining bytes |
| Buffer full && no cut point | Force cut at max_size |
| Other | Return all available bytes |
§Returns
Some(Ok(chunk))- Next chunk successfully extracted and copiedSome(Err(e))- I/O error occurred during refillNone- Stream exhausted (no more data available)
§Errors
Returns Some(Err(e)) if the underlying reader encounters an I/O error
during refill. After an error, the iterator should be considered invalid
(further calls may panic or return inconsistent results).
Common error causes:
- File read errors (permissions, disk errors)
- Network errors (timeouts, connection reset)
- Interrupted system calls (usually auto-retried)
§Chunk Size Guarantees
The returned chunks satisfy these constraints:
- Minimum:
≥ min_size(except possibly the last chunk) - Maximum:
≤ max_size(always enforced, never violated) - Average:
≈ avg_size = 2^f(statistical expectation) - Last chunk: May be smaller than
min_sizeif EOF reached
§Memory Allocation
Each chunk is copied to a new Vec<u8>. For zero-copy iteration within
the buffer lifetime, consider modifying the API to return slices with
appropriate lifetimes (future enhancement).
§Examples
use hexz_core::algo::dedup::cdc::StreamChunker;
use hexz_core::algo::dedup::dcam::DedupeParams;
use std::fs::File;
let file = File::open("data.bin")?;
let mut chunker = StreamChunker::new(file, DedupeParams::default());
// First chunk
if let Some(Ok(chunk)) = chunker.next() {
println!("First chunk: {} bytes", chunk.len());
}
// Iterate remaining chunks
for chunk_result in chunker {
let chunk = chunk_result?;
// Process chunk...
}§Performance Characteristics
- Amortized time: O(chunk_size) per chunk (dominated by copying)
- Space: O(chunk_size) allocation per chunk yielded
- Throughput: ~500 MB/s on modern CPUs (bottlenecked by Gear hash)
Source§fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
iter_next_chunk)N values. Read more1.0.0 · Source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
1.0.0 · Source§fn count(self) -> usizewhere
Self: Sized,
fn count(self) -> usizewhere
Self: Sized,
1.0.0 · Source§fn last(self) -> Option<Self::Item>where
Self: Sized,
fn last(self) -> Option<Self::Item>where
Self: Sized,
Source§fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
iter_advance_by)n elements. Read more1.0.0 · Source§fn nth(&mut self, n: usize) -> Option<Self::Item>
fn nth(&mut self, n: usize) -> Option<Self::Item>
nth element of the iterator. Read more1.28.0 · Source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
1.0.0 · Source§fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
1.0.0 · Source§fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
Source§fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
iter_intersperse)separator between adjacent
items of the original iterator. Read moreSource§fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
iter_intersperse)separator
between adjacent items of the original iterator. Read more1.0.0 · Source§fn map<B, F>(self, f: F) -> Map<Self, F>
fn map<B, F>(self, f: F) -> Map<Self, F>
1.0.0 · Source§fn filter<P>(self, predicate: P) -> Filter<Self, P>
fn filter<P>(self, predicate: P) -> Filter<Self, P>
1.0.0 · Source§fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
1.0.0 · Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
1.0.0 · Source§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1.0.0 · Source§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1.57.0 · Source§fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1.0.0 · Source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n elements. Read more1.0.0 · Source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n elements, or fewer
if the underlying iterator ends sooner. Read more1.0.0 · Source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
1.29.0 · Source§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Source§fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
iter_map_windows)f for each contiguous window of size N over
self and returns an iterator over the outputs of f. Like slice::windows(),
the windows during mapping overlap as well. Read more1.0.0 · Source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
1.0.0 · Source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
Iterator. Read moreSource§fn try_collect<B>(
&mut self,
) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
fn try_collect<B>( &mut self, ) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
iterator_try_collect)Source§fn collect_into<E>(self, collection: &mut E) -> &mut E
fn collect_into<E>(self, collection: &mut E) -> &mut E
iter_collect_into)1.0.0 · Source§fn partition<B, F>(self, f: F) -> (B, B)
fn partition<B, F>(self, f: F) -> (B, B)
Source§fn is_partitioned<P>(self, predicate: P) -> bool
fn is_partitioned<P>(self, predicate: P) -> bool
iter_is_partitioned)true precede all those that return false. Read more1.27.0 · Source§fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
1.27.0 · Source§fn try_for_each<F, R>(&mut self, f: F) -> R
fn try_for_each<F, R>(&mut self, f: F) -> R
1.0.0 · Source§fn fold<B, F>(self, init: B, f: F) -> B
fn fold<B, F>(self, init: B, f: F) -> B
1.51.0 · Source§fn reduce<F>(self, f: F) -> Option<Self::Item>
fn reduce<F>(self, f: F) -> Option<Self::Item>
Source§fn try_reduce<R>(
&mut self,
f: impl FnMut(Self::Item, Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
fn try_reduce<R>( &mut self, f: impl FnMut(Self::Item, Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
iterator_try_reduce)1.0.0 · Source§fn all<F>(&mut self, f: F) -> bool
fn all<F>(&mut self, f: F) -> bool
1.0.0 · Source§fn any<F>(&mut self, f: F) -> bool
fn any<F>(&mut self, f: F) -> bool
1.0.0 · Source§fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
1.30.0 · Source§fn find_map<B, F>(&mut self, f: F) -> Option<B>
fn find_map<B, F>(&mut self, f: F) -> Option<B>
Source§fn try_find<R>(
&mut self,
f: impl FnMut(&Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
fn try_find<R>( &mut self, f: impl FnMut(&Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
try_find)1.0.0 · Source§fn position<P>(&mut self, predicate: P) -> Option<usize>
fn position<P>(&mut self, predicate: P) -> Option<usize>
1.0.0 · Source§fn max(self) -> Option<Self::Item>
fn max(self) -> Option<Self::Item>
1.0.0 · Source§fn min(self) -> Option<Self::Item>
fn min(self) -> Option<Self::Item>
1.6.0 · Source§fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · Source§fn max_by<F>(self, compare: F) -> Option<Self::Item>
fn max_by<F>(self, compare: F) -> Option<Self::Item>
1.6.0 · Source§fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · Source§fn min_by<F>(self, compare: F) -> Option<Self::Item>
fn min_by<F>(self, compare: F) -> Option<Self::Item>
1.0.0 · Source§fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
1.36.0 · Source§fn copied<'a, T>(self) -> Copied<Self>
fn copied<'a, T>(self) -> Copied<Self>
Source§fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
iter_array_chunks)N elements of the iterator at a time. Read more1.11.0 · Source§fn product<P>(self) -> P
fn product<P>(self) -> P
Source§fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read more1.5.0 · Source§fn partial_cmp<I>(self, other: I) -> Option<Ordering>
fn partial_cmp<I>(self, other: I) -> Option<Ordering>
PartialOrd elements of
this Iterator with those of another. The comparison works like short-circuit
evaluation, returning a result without comparing the remaining elements.
As soon as an order can be determined, the evaluation stops and a result is returned. Read moreSource§fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read moreSource§fn eq_by<I, F>(self, other: I, eq: F) -> bool
fn eq_by<I, F>(self, other: I, eq: F) -> bool
iter_order_by)1.5.0 · Source§fn lt<I>(self, other: I) -> bool
fn lt<I>(self, other: I) -> bool
Iterator are lexicographically
less than those of another. Read more1.5.0 · Source§fn le<I>(self, other: I) -> bool
fn le<I>(self, other: I) -> bool
Iterator are lexicographically
less or equal to those of another. Read more1.5.0 · Source§fn gt<I>(self, other: I) -> bool
fn gt<I>(self, other: I) -> bool
Iterator are lexicographically
greater than those of another. Read more1.5.0 · Source§fn ge<I>(self, other: I) -> bool
fn ge<I>(self, other: I) -> bool
Iterator are lexicographically
greater than or equal to those of another. Read more1.82.0 · Source§fn is_sorted(self) -> bool
fn is_sorted(self) -> bool
1.82.0 · Source§fn is_sorted_by<F>(self, compare: F) -> bool
fn is_sorted_by<F>(self, compare: F) -> bool
1.82.0 · Source§fn is_sorted_by_key<F, K>(self, f: F) -> bool
fn is_sorted_by_key<F, K>(self, f: F) -> bool
Auto Trait Implementations§
impl<R> Freeze for StreamChunker<R>where
R: Freeze,
impl<R> RefUnwindSafe for StreamChunker<R>where
R: RefUnwindSafe,
impl<R> Send for StreamChunker<R>where
R: Send,
impl<R> Sync for StreamChunker<R>where
R: Sync,
impl<R> Unpin for StreamChunker<R>where
R: Unpin,
impl<R> UnsafeUnpin for StreamChunker<R>where
R: UnsafeUnpin,
impl<R> UnwindSafe for StreamChunker<R>where
R: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more