Skip to main content

oximedia_dedup/
stream_dedup.rs

1//! Streaming duplicate detection without loading entire files into memory.
2//!
3//! This module implements content-defined chunking over arbitrary `io::Read`
4//! sources so that very large media files can be fingerprinted without mapping
5//! the full file into RAM.  The approach:
6//!
7//! 1. A [`StreamChunker`] wraps any `io::Read` and emits content-defined
8//!    chunk hashes via `Iterator<Item = ChunkDigest>`.  Data flows through a
9//!    fixed-size internal buffer (`BUF_SIZE` bytes), so memory use is bounded
10//!    regardless of file size.
11//! 2. [`StreamFingerprint`] aggregates chunk digests into a compact file-level
12//!    fingerprint that survives byte-level insertions and deletions (unlike a
13//!    whole-file BLAKE3 hash).
14//! 3. [`StreamDedupIndex`] stores fingerprints and answers "is this stream a
15//!    near-duplicate of something already indexed?" via chunk-level Jaccard
16//!    similarity.
17//!
18//! # Rationale
19//!
20//! The existing [`crate::rolling_hash`] module provides content-defined chunking
21//! over in-memory byte slices.  This module extends the deduplication pipeline
22//! with a streaming interface that satisfies the TODO item:
23//! *"Optimize `rolling_hash.rs` for streaming duplicate detection without
24//! loading entire files"*.
25//!
26//! # Example
27//!
28//! ```rust
29//! use oximedia_dedup::stream_dedup::{StreamChunkerConfig, StreamDedupIndex};
30//! use std::io::Cursor;
31//!
32//! let config = StreamChunkerConfig::default();
33//! let mut index = StreamDedupIndex::new(config.clone());
34//!
35//! let data = vec![42u8; 32_768];
36//! let fp = index.ingest("file-a", Cursor::new(data.clone())).expect("ingest ok");
37//! assert!(fp.chunk_count() > 0);
38//!
39//! // A second identical stream should be detected as a duplicate.
40//! let fp2 = index.ingest("file-b", Cursor::new(data)).expect("ingest ok");
41//! let sim = index.jaccard_similarity(&fp, &fp2);
42//! assert!((sim - 1.0).abs() < 1e-9);
43//! ```
44
45#![allow(dead_code)]
46#![allow(clippy::cast_precision_loss)]
47#![allow(clippy::cast_possible_truncation)]
48
49use std::collections::{HashMap, HashSet};
50use std::io::{self, Read};
51
52/// Internal I/O buffer size for streaming reads.
53const BUF_SIZE: usize = 65_536; // 64 KiB
54
55// ---------------------------------------------------------------------------
56// StreamChunkerConfig
57// ---------------------------------------------------------------------------
58
59/// Configuration for the streaming content-defined chunker.
60#[derive(Debug, Clone)]
61pub struct StreamChunkerConfig {
62    /// Minimum chunk length in bytes.
63    pub min_chunk: usize,
64    /// Maximum chunk length in bytes.
65    pub max_chunk: usize,
66    /// Rolling hash window size.
67    pub window_size: usize,
68    /// Number of low-order bits of the rolling hash used for boundary detection.
69    /// A boundary occurs when `(hash & boundary_mask) == 0`.
70    pub mask_bits: u32,
71}
72
73impl Default for StreamChunkerConfig {
74    fn default() -> Self {
75        Self {
76            min_chunk: 4_096,
77            max_chunk: 131_072,
78            window_size: 48,
79            mask_bits: 12, // average chunk ≈ 4096 bytes
80        }
81    }
82}
83
84impl StreamChunkerConfig {
85    /// Compute the boundary mask from `mask_bits`.
86    #[must_use]
87    pub fn boundary_mask(&self) -> u64 {
88        (1u64 << self.mask_bits) - 1
89    }
90
91    /// Validate the configuration.
92    ///
93    /// Returns `true` when all invariants hold.
94    #[must_use]
95    pub fn is_valid(&self) -> bool {
96        self.min_chunk > 0
97            && self.max_chunk >= self.min_chunk
98            && self.window_size > 0
99            && self.mask_bits > 0
100            && self.mask_bits < 32
101    }
102}
103
104// ---------------------------------------------------------------------------
105// ChunkDigest
106// ---------------------------------------------------------------------------
107
108/// The hash of a single content-defined chunk.
109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct ChunkDigest {
111    /// FNV-1a 64-bit digest of the chunk bytes.
112    pub hash: u64,
113    /// Number of bytes in this chunk.
114    pub len: usize,
115}
116
117/// Compute a FNV-1a 64-bit hash of a byte slice.
118fn fnv1a_64(data: &[u8]) -> u64 {
119    const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
120    const PRIME: u64 = 0x0100_0000_01b3;
121    let mut h = OFFSET;
122    for &b in data {
123        h ^= u64::from(b);
124        h = h.wrapping_mul(PRIME);
125    }
126    h
127}
128
129// ---------------------------------------------------------------------------
130// Streaming rolling hash (Buzhash-lite)
131// ---------------------------------------------------------------------------
132
133/// A minimal rolling hash used internally by [`StreamChunker`].
134///
135/// Uses a power-of-2 lookup table (byte → 64-bit random value) with XOR
136/// rotation to provide the rolling property.
137struct RollingHash {
138    table: [u64; 256],
139    window: Vec<u8>,
140    window_size: usize,
141    head: usize,
142    value: u64,
143    count: usize,
144}
145
146impl RollingHash {
147    fn new(window_size: usize) -> Self {
148        // Deterministic table derived from FNV-1a of the byte value.
149        let mut table = [0u64; 256];
150        for (i, slot) in table.iter_mut().enumerate() {
151            *slot = fnv1a_64(&[i as u8, 0x5A, 0xA5]);
152        }
153        Self {
154            table,
155            window: vec![0u8; window_size],
156            window_size,
157            head: 0,
158            value: 0,
159            count: 0,
160        }
161    }
162
163    /// Feed one byte; returns the updated rolling hash.
164    fn update(&mut self, byte: u8) -> u64 {
165        let outgoing = self.window[self.head];
166        self.window[self.head] = byte;
167        self.head = (self.head + 1) % self.window_size;
168        // Rotate left by 1, XOR in new byte, XOR out old byte (rotated by window_size).
169        self.value = self.value.rotate_left(1)
170            ^ self.table[byte as usize]
171            ^ self.table[outgoing as usize].rotate_left(self.window_size as u32 & 63);
172        self.count += 1;
173        self.value
174    }
175}
176
177// ---------------------------------------------------------------------------
178// StreamChunker
179// ---------------------------------------------------------------------------
180
181/// An iterator over content-defined [`ChunkDigest`]s read from an `io::Read`.
182///
183/// Internally buffers data in a fixed-size heap buffer so the caller's memory
184/// usage is bounded regardless of file size.  The I/O buffer `io_buf` holds
185/// the most recently read batch; `io_pos` tracks where processing should resume
186/// within that batch, allowing the chunker to return mid-buffer and resume on
187/// the next call without discarding unprocessed bytes.
188pub struct StreamChunker<R: Read> {
189    reader: R,
190    config: StreamChunkerConfig,
191    rolling: RollingHash,
192    /// I/O read buffer.
193    io_buf: Vec<u8>,
194    /// How many bytes are valid in `io_buf`.
195    io_len: usize,
196    /// Current read position within `io_buf`.
197    io_pos: usize,
198    /// Accumulation buffer for the current in-flight chunk.
199    chunk_buf: Vec<u8>,
200    /// Set to true once the underlying reader returns EOF.
201    done: bool,
202}
203
204impl<R: Read> StreamChunker<R> {
205    /// Create a new `StreamChunker` wrapping `reader`.
206    #[must_use]
207    pub fn new(reader: R, config: StreamChunkerConfig) -> Self {
208        let window_size = config.window_size;
209        Self {
210            reader,
211            config,
212            rolling: RollingHash::new(window_size),
213            io_buf: vec![0u8; BUF_SIZE],
214            io_len: 0,
215            io_pos: 0,
216            chunk_buf: Vec::with_capacity(8_192),
217            done: false,
218        }
219    }
220
221    /// Collect all chunk digests eagerly, consuming `self`.
222    ///
223    /// # Errors
224    ///
225    /// Propagates any `io::Error` from the underlying reader.
226    pub fn collect_all(mut self) -> io::Result<Vec<ChunkDigest>> {
227        let mut out = Vec::new();
228        loop {
229            match self.next_chunk() {
230                Ok(Some(d)) => out.push(d),
231                Ok(None) => break,
232                Err(e) => return Err(e),
233            }
234        }
235        Ok(out)
236    }
237
238    /// Advance to the next chunk.
239    ///
240    /// Returns `Ok(None)` when the stream is exhausted.
241    ///
242    /// # Errors
243    ///
244    /// Returns an `io::Error` on read failure.
245    pub fn next_chunk(&mut self) -> io::Result<Option<ChunkDigest>> {
246        if self.done && self.io_pos >= self.io_len {
247            return Ok(None);
248        }
249        let mask = self.config.boundary_mask();
250
251        loop {
252            // Refill I/O buffer when the current batch is exhausted.
253            if self.io_pos >= self.io_len {
254                if self.done {
255                    break;
256                }
257                let n = self.reader.read(&mut self.io_buf)?;
258                if n == 0 {
259                    self.done = true;
260                    break;
261                }
262                self.io_len = n;
263                self.io_pos = 0;
264            }
265
266            // Process bytes from the current buffer position.
267            while self.io_pos < self.io_len {
268                let byte = self.io_buf[self.io_pos];
269                self.io_pos += 1;
270
271                let h = self.rolling.update(byte);
272                self.chunk_buf.push(byte);
273                let chunk_len = self.chunk_buf.len();
274
275                if chunk_len < self.config.min_chunk {
276                    continue;
277                }
278                let is_boundary = (h & mask) == 0 || chunk_len >= self.config.max_chunk;
279                if is_boundary {
280                    let digest = ChunkDigest {
281                        hash: fnv1a_64(&self.chunk_buf),
282                        len: chunk_len,
283                    };
284                    self.chunk_buf.clear();
285                    return Ok(Some(digest));
286                }
287            }
288            // Current batch exhausted; loop to refill.
289        }
290
291        // EOF reached — emit trailing chunk if any data remains.
292        if self.chunk_buf.is_empty() {
293            return Ok(None);
294        }
295        let digest = ChunkDigest {
296            hash: fnv1a_64(&self.chunk_buf),
297            len: self.chunk_buf.len(),
298        };
299        self.chunk_buf.clear();
300        Ok(Some(digest))
301    }
302}
303
304// ---------------------------------------------------------------------------
305// StreamFingerprint
306// ---------------------------------------------------------------------------
307
308/// A file-level fingerprint derived from its content-defined chunk hashes.
309///
310/// The fingerprint is robust against small edits: only the chunks that actually
311/// changed will differ, so the Jaccard similarity between two near-identical
312/// files remains high.
313#[derive(Debug, Clone)]
314pub struct StreamFingerprint {
315    /// Ordered list of chunk digests.
316    pub chunks: Vec<ChunkDigest>,
317    /// Total bytes processed.
318    pub total_bytes: u64,
319}
320
321impl StreamFingerprint {
322    /// Number of chunks in the fingerprint.
323    #[must_use]
324    pub fn chunk_count(&self) -> usize {
325        self.chunks.len()
326    }
327
328    /// Set of unique chunk hashes (for Jaccard computation).
329    #[must_use]
330    pub fn chunk_set(&self) -> HashSet<u64> {
331        self.chunks.iter().map(|c| c.hash).collect()
332    }
333
334    /// Compute the Jaccard similarity between this fingerprint and another.
335    ///
336    /// `J(A,B) = |A ∩ B| / |A ∪ B|`
337    ///
338    /// Returns 1.0 when both fingerprints are identical, 0.0 when completely
339    /// disjoint.
340    #[must_use]
341    pub fn jaccard(&self, other: &Self) -> f64 {
342        let a = self.chunk_set();
343        let b = other.chunk_set();
344        if a.is_empty() && b.is_empty() {
345            return 1.0;
346        }
347        let intersection = a.intersection(&b).count();
348        let union = a.union(&b).count();
349        if union == 0 {
350            return 1.0;
351        }
352        intersection as f64 / union as f64
353    }
354}
355
356// ---------------------------------------------------------------------------
357// StreamDedupIndex
358// ---------------------------------------------------------------------------
359
360/// Index of stream fingerprints for near-duplicate detection.
361///
362/// Files are added by name via [`ingest`](Self::ingest); duplicates are
363/// retrieved via [`find_duplicates`](Self::find_duplicates).
364#[derive(Debug)]
365pub struct StreamDedupIndex {
366    config: StreamChunkerConfig,
367    entries: HashMap<String, StreamFingerprint>,
368}
369
370impl StreamDedupIndex {
371    /// Create a new, empty index with the given chunker config.
372    #[must_use]
373    pub fn new(config: StreamChunkerConfig) -> Self {
374        Self {
375            config,
376            entries: HashMap::new(),
377        }
378    }
379
380    /// Ingest a stream and store its fingerprint under `name`.
381    ///
382    /// Returns the computed [`StreamFingerprint`].
383    ///
384    /// # Errors
385    ///
386    /// Propagates `io::Error` from `reader`.
387    pub fn ingest<R: Read>(&mut self, name: &str, reader: R) -> io::Result<StreamFingerprint> {
388        let chunker = StreamChunker::new(reader, self.config.clone());
389        let chunks = chunker.collect_all()?;
390        let total_bytes: u64 = chunks.iter().map(|c| c.len as u64).sum();
391        let fp = StreamFingerprint {
392            chunks,
393            total_bytes,
394        };
395        self.entries.insert(name.to_string(), fp.clone());
396        Ok(fp)
397    }
398
399    /// Number of indexed entries.
400    #[must_use]
401    pub fn len(&self) -> usize {
402        self.entries.len()
403    }
404
405    /// Return `true` if the index is empty.
406    #[must_use]
407    pub fn is_empty(&self) -> bool {
408        self.entries.is_empty()
409    }
410
411    /// Compute the Jaccard similarity between two fingerprints.
412    #[must_use]
413    pub fn jaccard_similarity(&self, a: &StreamFingerprint, b: &StreamFingerprint) -> f64 {
414        a.jaccard(b)
415    }
416
417    /// Find all pairs of indexed entries whose Jaccard similarity exceeds
418    /// `threshold`.
419    ///
420    /// Returns a list of `(name_a, name_b, similarity)` tuples sorted by
421    /// descending similarity.
422    #[must_use]
423    pub fn find_duplicates(&self, threshold: f64) -> Vec<(String, String, f64)> {
424        let names: Vec<&String> = self.entries.keys().collect();
425        let n = names.len();
426        let mut pairs = Vec::new();
427
428        for i in 0..n {
429            for j in (i + 1)..n {
430                let fp_a = &self.entries[names[i]];
431                let fp_b = &self.entries[names[j]];
432                let sim = fp_a.jaccard(fp_b);
433                if sim >= threshold {
434                    pairs.push((names[i].clone(), names[j].clone(), sim));
435                }
436            }
437        }
438
439        pairs.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
440        pairs
441    }
442
443    /// Retrieve the fingerprint stored under `name`, if any.
444    #[must_use]
445    pub fn get(&self, name: &str) -> Option<&StreamFingerprint> {
446        self.entries.get(name)
447    }
448}
449
450// ---------------------------------------------------------------------------
451// Tests
452// ---------------------------------------------------------------------------
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use std::io::Cursor;
458
459    fn small_config() -> StreamChunkerConfig {
460        StreamChunkerConfig {
461            min_chunk: 64,
462            max_chunk: 512,
463            window_size: 16,
464            mask_bits: 6, // average ≈ 64 bytes
465        }
466    }
467
468    #[test]
469    fn test_config_default_is_valid() {
470        assert!(StreamChunkerConfig::default().is_valid());
471    }
472
473    #[test]
474    fn test_config_small_is_valid() {
475        assert!(small_config().is_valid());
476    }
477
478    #[test]
479    fn test_boundary_mask() {
480        let cfg = StreamChunkerConfig {
481            mask_bits: 8,
482            ..Default::default()
483        };
484        assert_eq!(cfg.boundary_mask(), 0xFF);
485    }
486
487    #[test]
488    fn test_empty_stream_produces_no_chunks() {
489        let cfg = small_config();
490        let chunker = StreamChunker::new(Cursor::new(b""), cfg);
491        let chunks = chunker.collect_all().expect("io should not fail");
492        assert!(chunks.is_empty());
493    }
494
495    #[test]
496    fn test_small_data_single_chunk() {
497        // Data smaller than min_chunk → emitted as one trailing chunk.
498        let cfg = small_config(); // min_chunk = 64
499        let data = vec![0xABu8; 32]; // 32 bytes < 64
500        let chunker = StreamChunker::new(Cursor::new(data), cfg);
501        let chunks = chunker.collect_all().expect("ok");
502        assert_eq!(chunks.len(), 1);
503        assert_eq!(chunks[0].len, 32);
504    }
505
506    #[test]
507    fn test_large_data_multiple_chunks() {
508        let cfg = small_config();
509        // 8 KiB of repeating data should produce several chunks.
510        let data = vec![0x5Au8; 8192];
511        let chunker = StreamChunker::new(Cursor::new(data.clone()), cfg);
512        let chunks = chunker.collect_all().expect("ok");
513        // Total bytes must equal data length.
514        let total: usize = chunks.iter().map(|c| c.len).sum();
515        assert_eq!(total, 8192);
516    }
517
518    #[test]
519    fn test_deterministic_chunking() {
520        let cfg = small_config();
521        let data: Vec<u8> = (0..4096_u16).map(|i| (i % 251) as u8).collect();
522        let c1 = StreamChunker::new(Cursor::new(data.clone()), cfg.clone())
523            .collect_all()
524            .expect("ok");
525        let c2 = StreamChunker::new(Cursor::new(data), cfg)
526            .collect_all()
527            .expect("ok");
528        assert_eq!(c1, c2, "chunking must be deterministic");
529    }
530
531    #[test]
532    fn test_identical_streams_jaccard_one() {
533        let cfg = small_config();
534        let data = vec![0x7Fu8; 4096];
535        let mut index = StreamDedupIndex::new(cfg);
536        let fp1 = index.ingest("a", Cursor::new(data.clone())).expect("ok");
537        let fp2 = index.ingest("b", Cursor::new(data)).expect("ok");
538        let sim = index.jaccard_similarity(&fp1, &fp2);
539        assert!(
540            (sim - 1.0).abs() < 1e-9,
541            "identical streams must have Jaccard = 1.0, got {sim}"
542        );
543    }
544
545    #[test]
546    fn test_completely_different_streams_jaccard_near_zero() {
547        let cfg = small_config();
548        let data_a = vec![0x00u8; 4096];
549        let data_b = vec![0xFFu8; 4096];
550        let mut index = StreamDedupIndex::new(cfg);
551        let fp_a = index.ingest("a", Cursor::new(data_a)).expect("ok");
552        let fp_b = index.ingest("b", Cursor::new(data_b)).expect("ok");
553        let sim = index.jaccard_similarity(&fp_a, &fp_b);
554        // Chunks should be different → low similarity.
555        assert!(
556            sim < 0.5,
557            "different data should have low Jaccard, got {sim}"
558        );
559    }
560
561    #[test]
562    fn test_find_duplicates_returns_pairs_above_threshold() {
563        let cfg = small_config();
564        let data = vec![0xCCu8; 2048];
565        let mut index = StreamDedupIndex::new(cfg);
566        index.ingest("x", Cursor::new(data.clone())).expect("ok");
567        index.ingest("y", Cursor::new(data)).expect("ok");
568
569        let pairs = index.find_duplicates(0.9);
570        assert!(!pairs.is_empty());
571        let (ref na, ref nb, sim) = pairs[0];
572        // names may be in any order
573        assert!(
574            (na == "x" || na == "y") && (nb == "x" || nb == "y") && na != nb,
575            "pair names should be x and y"
576        );
577        assert!(sim >= 0.9);
578    }
579
580    #[test]
581    fn test_find_duplicates_no_pairs_above_high_threshold() {
582        let cfg = small_config();
583        let mut index = StreamDedupIndex::new(cfg);
584        index
585            .ingest("p", Cursor::new(vec![0x11u8; 2048]))
586            .expect("ok");
587        index
588            .ingest("q", Cursor::new(vec![0x22u8; 2048]))
589            .expect("ok");
590        // Very high threshold; different data won't meet it.
591        let pairs = index.find_duplicates(0.99);
592        assert!(
593            pairs.is_empty() || pairs.iter().all(|(_, _, s)| *s >= 0.99),
594            "all returned pairs must meet the threshold"
595        );
596    }
597
598    #[test]
599    fn test_index_len_and_is_empty() {
600        let mut index = StreamDedupIndex::new(small_config());
601        assert!(index.is_empty());
602        index
603            .ingest("file", Cursor::new(vec![1u8; 100]))
604            .expect("ok");
605        assert_eq!(index.len(), 1);
606        assert!(!index.is_empty());
607    }
608
609    #[test]
610    fn test_get_fingerprint_after_ingest() {
611        let mut index = StreamDedupIndex::new(small_config());
612        index
613            .ingest("myfile", Cursor::new(vec![42u8; 512]))
614            .expect("ok");
615        let fp = index.get("myfile");
616        assert!(fp.is_some());
617        assert!(fp.unwrap().chunk_count() >= 1);
618    }
619
620    #[test]
621    fn test_fnv1a_deterministic() {
622        let h1 = fnv1a_64(b"hello world");
623        let h2 = fnv1a_64(b"hello world");
624        assert_eq!(h1, h2);
625    }
626
627    #[test]
628    fn test_total_bytes_matches_data_length() {
629        let cfg = small_config();
630        let data = vec![0x33u8; 3333];
631        let mut index = StreamDedupIndex::new(cfg);
632        let fp = index.ingest("sz", Cursor::new(data)).expect("ok");
633        assert_eq!(fp.total_bytes, 3333);
634    }
635}