Skip to main content

oximedia_dedup/
rolling_hash.rs

1#![allow(dead_code)]
2
3//! Rolling hash for content-defined chunking in media deduplication.
4//!
5//! This module implements the Buzhash and Rabin-style rolling hash algorithms
6//! used for content-defined chunking (CDC). CDC splits a byte stream at
7//! boundaries determined by the content itself, which ensures that
8//! insertions or deletions in one part of the stream do not shift all
9//! subsequent chunk boundaries.
10//!
11//! # Key Types
12//!
13//! - [`BuzHash`] - Buzhash rolling hash with configurable window
14//! - [`ChunkBoundary`] - A detected chunk boundary with offset and hash
15//! - [`ChunkerConfig`] - Configuration for content-defined chunking
16//! - [`ContentChunker`] - Splits a byte stream into content-defined chunks
17//! - [`RollingHashStream`] - Streaming Rabin-fingerprint iterator over `Read` sources
18
19use std::collections::VecDeque;
20use std::io::{self, Read};
21
22/// Configuration for the content-defined chunker.
23#[derive(Debug, Clone)]
24pub struct ChunkerConfig {
25    /// Minimum chunk size in bytes.
26    pub min_chunk: usize,
27    /// Maximum chunk size in bytes.
28    pub max_chunk: usize,
29    /// Target (average) chunk size in bytes.
30    pub target_chunk: usize,
31    /// Rolling hash window size.
32    pub window_size: usize,
33    /// Mask bits used to detect chunk boundaries.
34    /// A boundary is declared when `(hash & mask) == 0`.
35    pub mask_bits: u32,
36}
37
38impl Default for ChunkerConfig {
39    fn default() -> Self {
40        Self {
41            min_chunk: 2048,
42            max_chunk: 65536,
43            target_chunk: 8192,
44            window_size: 48,
45            mask_bits: 13, // 2^13 = 8192 average
46        }
47    }
48}
49
50impl ChunkerConfig {
51    /// Create a config for small media chunks (e.g. subtitle segments).
52    #[must_use]
53    pub fn small() -> Self {
54        Self {
55            min_chunk: 512,
56            max_chunk: 8192,
57            target_chunk: 2048,
58            window_size: 32,
59            mask_bits: 11,
60        }
61    }
62
63    /// Create a config for large media chunks (e.g. video segments).
64    #[must_use]
65    pub fn large() -> Self {
66        Self {
67            min_chunk: 16384,
68            max_chunk: 524_288,
69            target_chunk: 65536,
70            window_size: 64,
71            mask_bits: 16,
72        }
73    }
74
75    /// Compute the chunk boundary mask from `mask_bits`.
76    #[must_use]
77    pub fn boundary_mask(&self) -> u64 {
78        (1u64 << self.mask_bits) - 1
79    }
80
81    /// Validate the configuration.
82    #[must_use]
83    pub fn is_valid(&self) -> bool {
84        self.min_chunk > 0
85            && self.max_chunk >= self.min_chunk
86            && self.target_chunk >= self.min_chunk
87            && self.target_chunk <= self.max_chunk
88            && self.window_size > 0
89            && self.mask_bits > 0
90            && self.mask_bits <= 32
91    }
92}
93
94/// Pre-computed byte hash table for Buzhash (256 random entries).
95const BUZHASH_TABLE: [u64; 256] = {
96    let mut table = [0u64; 256];
97    // Use a simple deterministic PRNG to fill the table at compile time.
98    let mut state: u64 = 0x5555_5555_5555_5555;
99    let mut i = 0;
100    while i < 256 {
101        state ^= state << 13;
102        state ^= state >> 7;
103        state ^= state << 17;
104        table[i] = state;
105        i += 1;
106    }
107    table
108};
109
110/// Buzhash rolling hash.
111///
112/// Maintains a sliding window over the input and computes a rolling hash
113/// that can be updated in O(1) as bytes enter and leave the window.
114#[derive(Clone)]
115pub struct BuzHash {
116    /// Current hash value.
117    hash: u64,
118    /// The sliding window buffer.
119    window: Vec<u8>,
120    /// Current position in the circular window buffer.
121    window_pos: usize,
122    /// Window size.
123    window_size: usize,
124    /// Number of bytes fed so far (capped at window_size for warm-up).
125    count: usize,
126}
127
128impl BuzHash {
129    /// Create a new Buzhash with the given window size.
130    #[must_use]
131    pub fn new(window_size: usize) -> Self {
132        Self {
133            hash: 0,
134            window: vec![0u8; window_size],
135            window_pos: 0,
136            window_size,
137            count: 0,
138        }
139    }
140
141    /// Feed a single byte and return the updated hash.
142    pub fn update(&mut self, byte: u8) -> u64 {
143        let out_byte = self.window[self.window_pos];
144        self.window[self.window_pos] = byte;
145        self.window_pos = (self.window_pos + 1) % self.window_size;
146
147        // Rotate left by 1
148        self.hash = self.hash.rotate_left(1);
149        // XOR in the new byte
150        self.hash ^= BUZHASH_TABLE[byte as usize];
151
152        if self.count >= self.window_size {
153            // XOR out the old byte (rotated by window_size)
154            self.hash ^= BUZHASH_TABLE[out_byte as usize].rotate_left(self.window_size as u32);
155        } else {
156            self.count += 1;
157        }
158
159        self.hash
160    }
161
162    /// Return the current hash value.
163    #[must_use]
164    pub fn value(&self) -> u64 {
165        self.hash
166    }
167
168    /// Return how many bytes have been fed.
169    #[must_use]
170    pub fn count(&self) -> usize {
171        self.count
172    }
173
174    /// Reset the hash state.
175    pub fn reset(&mut self) {
176        self.hash = 0;
177        self.window.fill(0);
178        self.window_pos = 0;
179        self.count = 0;
180    }
181}
182
183impl std::fmt::Debug for BuzHash {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("BuzHash")
186            .field("hash", &format_args!("0x{:016x}", self.hash))
187            .field("window_size", &self.window_size)
188            .field("count", &self.count)
189            .finish()
190    }
191}
192
193/// A detected chunk boundary.
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ChunkBoundary {
196    /// Byte offset of the boundary in the input stream.
197    pub offset: usize,
198    /// Rolling hash value at the boundary.
199    pub hash: u64,
200    /// Length of the chunk ending at this boundary.
201    pub chunk_len: usize,
202}
203
204/// Splits a byte stream into content-defined chunks.
205pub struct ContentChunker {
206    /// Configuration.
207    config: ChunkerConfig,
208    /// Rolling hash.
209    hasher: BuzHash,
210    /// Current position in the overall stream.
211    position: usize,
212    /// Position of the last boundary.
213    last_boundary: usize,
214    /// Detected boundaries.
215    boundaries: Vec<ChunkBoundary>,
216}
217
218impl ContentChunker {
219    /// Create a new chunker with the given configuration.
220    #[must_use]
221    pub fn new(config: ChunkerConfig) -> Self {
222        let hasher = BuzHash::new(config.window_size);
223        Self {
224            config,
225            hasher,
226            position: 0,
227            last_boundary: 0,
228            boundaries: Vec::new(),
229        }
230    }
231
232    /// Create a chunker with default configuration.
233    #[must_use]
234    pub fn with_defaults() -> Self {
235        Self::new(ChunkerConfig::default())
236    }
237
238    /// Feed a chunk of data and detect boundaries within it.
239    ///
240    /// Returns the boundaries found in this batch.
241    pub fn feed(&mut self, data: &[u8]) -> Vec<ChunkBoundary> {
242        let mask = self.config.boundary_mask();
243        let mut found = Vec::new();
244
245        for &byte in data {
246            let h = self.hasher.update(byte);
247            self.position += 1;
248
249            let chunk_len = self.position - self.last_boundary;
250
251            // Enforce minimum chunk size
252            if chunk_len < self.config.min_chunk {
253                continue;
254            }
255
256            // Check for boundary or max chunk size reached
257            let is_boundary = (h & mask) == 0 || chunk_len >= self.config.max_chunk;
258
259            if is_boundary {
260                let boundary = ChunkBoundary {
261                    offset: self.position,
262                    hash: h,
263                    chunk_len,
264                };
265                found.push(boundary.clone());
266                self.boundaries.push(boundary);
267                self.last_boundary = self.position;
268            }
269        }
270
271        found
272    }
273
274    /// Finalise the chunker, emitting a boundary for any trailing data.
275    pub fn finish(&mut self) -> Option<ChunkBoundary> {
276        let chunk_len = self.position - self.last_boundary;
277        if chunk_len > 0 {
278            let boundary = ChunkBoundary {
279                offset: self.position,
280                hash: self.hasher.value(),
281                chunk_len,
282            };
283            self.boundaries.push(boundary.clone());
284            self.last_boundary = self.position;
285            Some(boundary)
286        } else {
287            None
288        }
289    }
290
291    /// Return all detected boundaries so far.
292    #[must_use]
293    pub fn boundaries(&self) -> &[ChunkBoundary] {
294        &self.boundaries
295    }
296
297    /// Return the current stream position.
298    #[must_use]
299    pub fn position(&self) -> usize {
300        self.position
301    }
302
303    /// Reset the chunker state.
304    pub fn reset(&mut self) {
305        self.hasher.reset();
306        self.position = 0;
307        self.last_boundary = 0;
308        self.boundaries.clear();
309    }
310}
311
312/// Convenience function: chunk a complete byte slice.
313#[must_use]
314pub fn chunk_bytes(data: &[u8], config: ChunkerConfig) -> Vec<ChunkBoundary> {
315    let mut chunker = ContentChunker::new(config);
316    let mut all = chunker.feed(data);
317    if let Some(last) = chunker.finish() {
318        all.push(last);
319    }
320    all
321}
322
323// ── Rabin-fingerprint streaming iterator ─────────────────────────────────────
324
325/// Internal read buffer size used by [`RollingHashStream`].
326const CHUNK_SIZE: usize = 65_536;
327
328/// Rabin-fingerprint rolling hash multiplier (odd prime for good diffusion).
329const RABIN_BASE: u64 = 0x08D3_B1B9_ADFA_BC4D;
330
331/// A streaming Rabin-fingerprint rolling hash over an arbitrary [`Read`] source.
332///
333/// Each call to `next()` advances the window by one byte and yields
334/// `(byte_offset, hash)` where `byte_offset` is the 0-based index of the
335/// leading byte of the current window.
336///
337/// The hash is computed as:
338/// ```text
339/// hash = (hash * BASE + byte_in) XOR (BASE^window_size * byte_out)
340/// ```
341/// with `BASE^window_size` pre-computed at construction time.
342pub struct RollingHashStream<R: Read> {
343    /// Wrapped reader.
344    inner: R,
345    /// Sliding window of the last `window_size` bytes.
346    window: VecDeque<u8>,
347    /// Window size in bytes.
348    window_size: usize,
349    /// Current hash value.
350    hash: u64,
351    /// Current byte offset (index of leading byte of window).
352    pos: u64,
353    /// Pre-computed `BASE^window_size` for O(1) removal of oldest byte.
354    pow_table: u64,
355    /// Read buffer (heap-allocated to avoid large stack arrays).
356    buf: Box<[u8]>,
357    /// Valid bytes in `buf`.
358    buf_len: usize,
359    /// Cursor inside `buf`.
360    buf_pos: usize,
361    /// Whether the underlying reader is exhausted.
362    eof: bool,
363    /// Whether the iterator has been fully consumed (including EOF state).
364    done: bool,
365}
366
367impl<R: Read> RollingHashStream<R> {
368    /// Create a new streaming rolling hash with the given window size.
369    ///
370    /// The window is "warmed up" by the first `window_size` bytes fed, after
371    /// which each subsequent byte produces a yielded `(offset, hash)` pair.
372    #[must_use]
373    pub fn new(inner: R, window_size: usize) -> Self {
374        let window_size = window_size.max(1);
375        // Pre-compute BASE^window_size mod 2^64.
376        let pow_table = (0..window_size).fold(1u64, |acc, _| acc.wrapping_mul(RABIN_BASE));
377        Self {
378            inner,
379            window: VecDeque::with_capacity(window_size),
380            window_size,
381            hash: 0,
382            pos: 0,
383            pow_table,
384            buf: vec![0u8; CHUNK_SIZE].into_boxed_slice(),
385            buf_len: 0,
386            buf_pos: 0,
387            eof: false,
388            done: false,
389        }
390    }
391
392    /// Read the next byte from the buffered inner reader.
393    ///
394    /// Returns `Ok(Some(byte))`, `Ok(None)` at EOF, or `Err(io::Error)`.
395    fn read_byte(&mut self) -> io::Result<Option<u8>> {
396        if self.buf_pos >= self.buf_len {
397            if self.eof {
398                return Ok(None);
399            }
400            let n = self.inner.read(&mut self.buf[..])?;
401            if n == 0 {
402                self.eof = true;
403                return Ok(None);
404            }
405            self.buf_len = n;
406            self.buf_pos = 0;
407        }
408        let byte = self.buf[self.buf_pos];
409        self.buf_pos += 1;
410        Ok(Some(byte))
411    }
412}
413
414impl<R: Read> Iterator for RollingHashStream<R> {
415    /// `(byte_offset_of_window_start, rolling_hash)`.
416    type Item = io::Result<(u64, u64)>;
417
418    fn next(&mut self) -> Option<Self::Item> {
419        if self.done {
420            return None;
421        }
422        // Feed bytes until the window is full and we can yield.
423        loop {
424            let byte = match self.read_byte() {
425                Ok(Some(b)) => b,
426                Ok(None) => {
427                    self.done = true;
428                    return None;
429                }
430                Err(e) => {
431                    self.done = true;
432                    return Some(Err(e));
433                }
434            };
435
436            // Remove the oldest byte from the window (if full).
437            let byte_out = if self.window.len() == self.window_size {
438                self.window.pop_front()
439            } else {
440                None
441            };
442
443            self.window.push_back(byte);
444
445            // Update Rabin hash:
446            //   hash = hash * BASE + byte_in
447            //   hash ^= pow(BASE, window_size) * byte_out  (if window was full)
448            self.hash = self
449                .hash
450                .wrapping_mul(RABIN_BASE)
451                .wrapping_add(u64::from(byte));
452            if let Some(out) = byte_out {
453                self.hash ^= self.pow_table.wrapping_mul(u64::from(out));
454            }
455
456            self.pos += 1;
457
458            // Only yield once the window is fully filled.
459            if self.window.len() == self.window_size {
460                let window_start = self.pos - self.window_size as u64;
461                return Some(Ok((window_start, self.hash)));
462            }
463        }
464    }
465}
466
467/// Compute rolling hashes over a byte slice using the same Rabin formula as
468/// [`RollingHashStream`], for comparison and testing.
469///
470/// Returns one `(offset, hash)` per position once the window is filled.
471#[must_use]
472pub fn rolling_hash_slice(data: &[u8], window_size: usize) -> Vec<(u64, u64)> {
473    let window_size = window_size.max(1);
474    let pow_table = (0..window_size).fold(1u64, |acc, _| acc.wrapping_mul(RABIN_BASE));
475    let mut window: VecDeque<u8> = VecDeque::with_capacity(window_size);
476    let mut hash: u64 = 0;
477    let mut results = Vec::with_capacity(data.len().saturating_sub(window_size) + 1);
478
479    for (i, &byte) in data.iter().enumerate() {
480        let byte_out = if window.len() == window_size {
481            window.pop_front()
482        } else {
483            None
484        };
485        window.push_back(byte);
486        hash = hash.wrapping_mul(RABIN_BASE).wrapping_add(u64::from(byte));
487        if let Some(out) = byte_out {
488            hash ^= pow_table.wrapping_mul(u64::from(out));
489        }
490        if window.len() == window_size {
491            let offset = (i + 1 - window_size) as u64;
492            results.push((offset, hash));
493        }
494    }
495    results
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_chunker_config_default() {
504        let cfg = ChunkerConfig::default();
505        assert_eq!(cfg.min_chunk, 2048);
506        assert_eq!(cfg.max_chunk, 65536);
507        assert_eq!(cfg.target_chunk, 8192);
508        assert!(cfg.is_valid());
509    }
510
511    #[test]
512    fn test_chunker_config_small() {
513        let cfg = ChunkerConfig::small();
514        assert_eq!(cfg.min_chunk, 512);
515        assert!(cfg.is_valid());
516    }
517
518    #[test]
519    fn test_chunker_config_large() {
520        let cfg = ChunkerConfig::large();
521        assert_eq!(cfg.min_chunk, 16384);
522        assert!(cfg.is_valid());
523    }
524
525    #[test]
526    fn test_chunker_config_boundary_mask() {
527        let cfg = ChunkerConfig::default(); // mask_bits = 13
528        assert_eq!(cfg.boundary_mask(), (1 << 13) - 1);
529    }
530
531    #[test]
532    fn test_buzhash_new() {
533        let h = BuzHash::new(32);
534        assert_eq!(h.value(), 0);
535        assert_eq!(h.count(), 0);
536    }
537
538    #[test]
539    fn test_buzhash_deterministic() {
540        let mut h1 = BuzHash::new(16);
541        let mut h2 = BuzHash::new(16);
542        for b in b"identical input" {
543            h1.update(*b);
544            h2.update(*b);
545        }
546        assert_eq!(h1.value(), h2.value());
547    }
548
549    #[test]
550    fn test_buzhash_different_input() {
551        let mut h1 = BuzHash::new(16);
552        let mut h2 = BuzHash::new(16);
553        for b in b"input A" {
554            h1.update(*b);
555        }
556        for b in b"input B" {
557            h2.update(*b);
558        }
559        assert_ne!(h1.value(), h2.value());
560    }
561
562    #[test]
563    fn test_buzhash_reset() {
564        let mut h = BuzHash::new(8);
565        for b in b"some data" {
566            h.update(*b);
567        }
568        assert_ne!(h.value(), 0);
569        h.reset();
570        assert_eq!(h.value(), 0);
571        assert_eq!(h.count(), 0);
572    }
573
574    #[test]
575    fn test_content_chunker_small_input() {
576        // Input smaller than min_chunk => only finish() produces boundary
577        let config = ChunkerConfig {
578            min_chunk: 100,
579            max_chunk: 1000,
580            target_chunk: 500,
581            window_size: 8,
582            mask_bits: 3,
583        };
584        let mut chunker = ContentChunker::new(config);
585        let data = vec![0x42u8; 50];
586        let during = chunker.feed(&data);
587        assert!(during.is_empty()); // too small for any boundary
588        let last = chunker.finish();
589        assert!(last.is_some());
590        assert_eq!(last.expect("operation should succeed").chunk_len, 50);
591    }
592
593    #[test]
594    fn test_content_chunker_max_chunk() {
595        // Ensure max_chunk is enforced even if hash never triggers
596        let config = ChunkerConfig {
597            min_chunk: 4,
598            max_chunk: 16,
599            target_chunk: 8,
600            window_size: 4,
601            mask_bits: 30, // extremely unlikely to trigger via hash
602        };
603        let mut chunker = ContentChunker::new(config);
604        let data = vec![0u8; 100];
605        let boundaries = chunker.feed(&data);
606        // Should get boundaries at multiples of 16 (max_chunk)
607        assert!(!boundaries.is_empty());
608        for b in &boundaries {
609            assert!(b.chunk_len <= 16);
610        }
611    }
612
613    #[test]
614    fn test_chunk_bytes_convenience() {
615        let data = vec![0xABu8; 200];
616        let config = ChunkerConfig {
617            min_chunk: 10,
618            max_chunk: 50,
619            target_chunk: 30,
620            window_size: 4,
621            mask_bits: 30,
622        };
623        let boundaries = chunk_bytes(&data, config);
624        assert!(!boundaries.is_empty());
625
626        // Sum of chunk lengths should equal data length
627        let total: usize = boundaries.iter().map(|b| b.chunk_len).sum();
628        assert_eq!(total, 200);
629    }
630
631    #[test]
632    fn test_content_chunker_reset() {
633        let mut chunker = ContentChunker::with_defaults();
634        chunker.feed(&vec![1u8; 100_000]);
635        assert!(chunker.position() > 0);
636        chunker.reset();
637        assert_eq!(chunker.position(), 0);
638        assert!(chunker.boundaries().is_empty());
639    }
640
641    #[test]
642    fn test_chunk_boundary_equality() {
643        let a = ChunkBoundary {
644            offset: 100,
645            hash: 42,
646            chunk_len: 50,
647        };
648        let b = ChunkBoundary {
649            offset: 100,
650            hash: 42,
651            chunk_len: 50,
652        };
653        assert_eq!(a, b);
654    }
655
656    // ── RollingHashStream tests ───────────────────────────────────────────────
657
658    #[test]
659    fn test_rolling_hash_stream_matches_slice() {
660        // Feed the same data through both APIs and assert identical results.
661        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
662        let window_size = 32;
663
664        // Slice-based reference.
665        let expected = rolling_hash_slice(&data, window_size);
666
667        // Stream-based.
668        let cursor = std::io::Cursor::new(&data);
669        let stream = RollingHashStream::new(cursor, window_size);
670        let actual: Vec<(u64, u64)> = stream
671            .map(|r| r.expect("stream should not error"))
672            .collect();
673
674        assert_eq!(
675            expected.len(),
676            actual.len(),
677            "number of hash pairs must match"
678        );
679        for (i, (exp, got)) in expected.iter().zip(actual.iter()).enumerate() {
680            assert_eq!(
681                exp, got,
682                "hash mismatch at position {i}: expected {exp:?}, got {got:?}"
683            );
684        }
685    }
686
687    #[test]
688    fn test_rolling_hash_stream_large_data() {
689        // Stream 4 MB of synthetic data and verify no panic and reasonable count.
690        const MB4: usize = 4 * 1024 * 1024;
691        let data: Vec<u8> = (0u8..=255).cycle().take(MB4).collect();
692        let window_size = 64;
693
694        let cursor = std::io::Cursor::new(&data);
695        let stream = RollingHashStream::new(cursor, window_size);
696        let mut count = 0usize;
697        for item in stream {
698            let _ = item.expect("stream should not error");
699            count += 1;
700        }
701
702        // Expect exactly MB4 - window_size + 1 hash pairs.
703        let expected_count = MB4 - window_size + 1;
704        assert_eq!(
705            count, expected_count,
706            "expected {expected_count} hash pairs, got {count}"
707        );
708    }
709}