1#![allow(dead_code)]
46#![allow(clippy::cast_precision_loss)]
47#![allow(clippy::cast_possible_truncation)]
48
49use std::collections::{HashMap, HashSet};
50use std::io::{self, Read};
51
52const BUF_SIZE: usize = 65_536; #[derive(Debug, Clone)]
61pub struct StreamChunkerConfig {
62 pub min_chunk: usize,
64 pub max_chunk: usize,
66 pub window_size: usize,
68 pub mask_bits: u32,
71}
72
73impl Default for StreamChunkerConfig {
74 fn default() -> Self {
75 Self {
76 min_chunk: 4_096,
77 max_chunk: 131_072,
78 window_size: 48,
79 mask_bits: 12, }
81 }
82}
83
84impl StreamChunkerConfig {
85 #[must_use]
87 pub fn boundary_mask(&self) -> u64 {
88 (1u64 << self.mask_bits) - 1
89 }
90
91 #[must_use]
95 pub fn is_valid(&self) -> bool {
96 self.min_chunk > 0
97 && self.max_chunk >= self.min_chunk
98 && self.window_size > 0
99 && self.mask_bits > 0
100 && self.mask_bits < 32
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash)]
110pub struct ChunkDigest {
111 pub hash: u64,
113 pub len: usize,
115}
116
117fn fnv1a_64(data: &[u8]) -> u64 {
119 const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
120 const PRIME: u64 = 0x0100_0000_01b3;
121 let mut h = OFFSET;
122 for &b in data {
123 h ^= u64::from(b);
124 h = h.wrapping_mul(PRIME);
125 }
126 h
127}
128
129struct RollingHash {
138 table: [u64; 256],
139 window: Vec<u8>,
140 window_size: usize,
141 head: usize,
142 value: u64,
143 count: usize,
144}
145
146impl RollingHash {
147 fn new(window_size: usize) -> Self {
148 let mut table = [0u64; 256];
150 for (i, slot) in table.iter_mut().enumerate() {
151 *slot = fnv1a_64(&[i as u8, 0x5A, 0xA5]);
152 }
153 Self {
154 table,
155 window: vec![0u8; window_size],
156 window_size,
157 head: 0,
158 value: 0,
159 count: 0,
160 }
161 }
162
163 fn update(&mut self, byte: u8) -> u64 {
165 let outgoing = self.window[self.head];
166 self.window[self.head] = byte;
167 self.head = (self.head + 1) % self.window_size;
168 self.value = self.value.rotate_left(1)
170 ^ self.table[byte as usize]
171 ^ self.table[outgoing as usize].rotate_left(self.window_size as u32 & 63);
172 self.count += 1;
173 self.value
174 }
175}
176
177pub struct StreamChunker<R: Read> {
189 reader: R,
190 config: StreamChunkerConfig,
191 rolling: RollingHash,
192 io_buf: Vec<u8>,
194 io_len: usize,
196 io_pos: usize,
198 chunk_buf: Vec<u8>,
200 done: bool,
202}
203
204impl<R: Read> StreamChunker<R> {
205 #[must_use]
207 pub fn new(reader: R, config: StreamChunkerConfig) -> Self {
208 let window_size = config.window_size;
209 Self {
210 reader,
211 config,
212 rolling: RollingHash::new(window_size),
213 io_buf: vec![0u8; BUF_SIZE],
214 io_len: 0,
215 io_pos: 0,
216 chunk_buf: Vec::with_capacity(8_192),
217 done: false,
218 }
219 }
220
221 pub fn collect_all(mut self) -> io::Result<Vec<ChunkDigest>> {
227 let mut out = Vec::new();
228 loop {
229 match self.next_chunk() {
230 Ok(Some(d)) => out.push(d),
231 Ok(None) => break,
232 Err(e) => return Err(e),
233 }
234 }
235 Ok(out)
236 }
237
238 pub fn next_chunk(&mut self) -> io::Result<Option<ChunkDigest>> {
246 if self.done && self.io_pos >= self.io_len {
247 return Ok(None);
248 }
249 let mask = self.config.boundary_mask();
250
251 loop {
252 if self.io_pos >= self.io_len {
254 if self.done {
255 break;
256 }
257 let n = self.reader.read(&mut self.io_buf)?;
258 if n == 0 {
259 self.done = true;
260 break;
261 }
262 self.io_len = n;
263 self.io_pos = 0;
264 }
265
266 while self.io_pos < self.io_len {
268 let byte = self.io_buf[self.io_pos];
269 self.io_pos += 1;
270
271 let h = self.rolling.update(byte);
272 self.chunk_buf.push(byte);
273 let chunk_len = self.chunk_buf.len();
274
275 if chunk_len < self.config.min_chunk {
276 continue;
277 }
278 let is_boundary = (h & mask) == 0 || chunk_len >= self.config.max_chunk;
279 if is_boundary {
280 let digest = ChunkDigest {
281 hash: fnv1a_64(&self.chunk_buf),
282 len: chunk_len,
283 };
284 self.chunk_buf.clear();
285 return Ok(Some(digest));
286 }
287 }
288 }
290
291 if self.chunk_buf.is_empty() {
293 return Ok(None);
294 }
295 let digest = ChunkDigest {
296 hash: fnv1a_64(&self.chunk_buf),
297 len: self.chunk_buf.len(),
298 };
299 self.chunk_buf.clear();
300 Ok(Some(digest))
301 }
302}
303
304#[derive(Debug, Clone)]
314pub struct StreamFingerprint {
315 pub chunks: Vec<ChunkDigest>,
317 pub total_bytes: u64,
319}
320
321impl StreamFingerprint {
322 #[must_use]
324 pub fn chunk_count(&self) -> usize {
325 self.chunks.len()
326 }
327
328 #[must_use]
330 pub fn chunk_set(&self) -> HashSet<u64> {
331 self.chunks.iter().map(|c| c.hash).collect()
332 }
333
334 #[must_use]
341 pub fn jaccard(&self, other: &Self) -> f64 {
342 let a = self.chunk_set();
343 let b = other.chunk_set();
344 if a.is_empty() && b.is_empty() {
345 return 1.0;
346 }
347 let intersection = a.intersection(&b).count();
348 let union = a.union(&b).count();
349 if union == 0 {
350 return 1.0;
351 }
352 intersection as f64 / union as f64
353 }
354}
355
356#[derive(Debug)]
365pub struct StreamDedupIndex {
366 config: StreamChunkerConfig,
367 entries: HashMap<String, StreamFingerprint>,
368}
369
370impl StreamDedupIndex {
371 #[must_use]
373 pub fn new(config: StreamChunkerConfig) -> Self {
374 Self {
375 config,
376 entries: HashMap::new(),
377 }
378 }
379
380 pub fn ingest<R: Read>(&mut self, name: &str, reader: R) -> io::Result<StreamFingerprint> {
388 let chunker = StreamChunker::new(reader, self.config.clone());
389 let chunks = chunker.collect_all()?;
390 let total_bytes: u64 = chunks.iter().map(|c| c.len as u64).sum();
391 let fp = StreamFingerprint {
392 chunks,
393 total_bytes,
394 };
395 self.entries.insert(name.to_string(), fp.clone());
396 Ok(fp)
397 }
398
399 #[must_use]
401 pub fn len(&self) -> usize {
402 self.entries.len()
403 }
404
405 #[must_use]
407 pub fn is_empty(&self) -> bool {
408 self.entries.is_empty()
409 }
410
411 #[must_use]
413 pub fn jaccard_similarity(&self, a: &StreamFingerprint, b: &StreamFingerprint) -> f64 {
414 a.jaccard(b)
415 }
416
417 #[must_use]
423 pub fn find_duplicates(&self, threshold: f64) -> Vec<(String, String, f64)> {
424 let names: Vec<&String> = self.entries.keys().collect();
425 let n = names.len();
426 let mut pairs = Vec::new();
427
428 for i in 0..n {
429 for j in (i + 1)..n {
430 let fp_a = &self.entries[names[i]];
431 let fp_b = &self.entries[names[j]];
432 let sim = fp_a.jaccard(fp_b);
433 if sim >= threshold {
434 pairs.push((names[i].clone(), names[j].clone(), sim));
435 }
436 }
437 }
438
439 pairs.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
440 pairs
441 }
442
443 #[must_use]
445 pub fn get(&self, name: &str) -> Option<&StreamFingerprint> {
446 self.entries.get(name)
447 }
448}
449
450#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::io::Cursor;
458
459 fn small_config() -> StreamChunkerConfig {
460 StreamChunkerConfig {
461 min_chunk: 64,
462 max_chunk: 512,
463 window_size: 16,
464 mask_bits: 6, }
466 }
467
468 #[test]
469 fn test_config_default_is_valid() {
470 assert!(StreamChunkerConfig::default().is_valid());
471 }
472
473 #[test]
474 fn test_config_small_is_valid() {
475 assert!(small_config().is_valid());
476 }
477
478 #[test]
479 fn test_boundary_mask() {
480 let cfg = StreamChunkerConfig {
481 mask_bits: 8,
482 ..Default::default()
483 };
484 assert_eq!(cfg.boundary_mask(), 0xFF);
485 }
486
487 #[test]
488 fn test_empty_stream_produces_no_chunks() {
489 let cfg = small_config();
490 let chunker = StreamChunker::new(Cursor::new(b""), cfg);
491 let chunks = chunker.collect_all().expect("io should not fail");
492 assert!(chunks.is_empty());
493 }
494
495 #[test]
496 fn test_small_data_single_chunk() {
497 let cfg = small_config(); let data = vec![0xABu8; 32]; let chunker = StreamChunker::new(Cursor::new(data), cfg);
501 let chunks = chunker.collect_all().expect("ok");
502 assert_eq!(chunks.len(), 1);
503 assert_eq!(chunks[0].len, 32);
504 }
505
506 #[test]
507 fn test_large_data_multiple_chunks() {
508 let cfg = small_config();
509 let data = vec![0x5Au8; 8192];
511 let chunker = StreamChunker::new(Cursor::new(data.clone()), cfg);
512 let chunks = chunker.collect_all().expect("ok");
513 let total: usize = chunks.iter().map(|c| c.len).sum();
515 assert_eq!(total, 8192);
516 }
517
518 #[test]
519 fn test_deterministic_chunking() {
520 let cfg = small_config();
521 let data: Vec<u8> = (0..4096_u16).map(|i| (i % 251) as u8).collect();
522 let c1 = StreamChunker::new(Cursor::new(data.clone()), cfg.clone())
523 .collect_all()
524 .expect("ok");
525 let c2 = StreamChunker::new(Cursor::new(data), cfg)
526 .collect_all()
527 .expect("ok");
528 assert_eq!(c1, c2, "chunking must be deterministic");
529 }
530
531 #[test]
532 fn test_identical_streams_jaccard_one() {
533 let cfg = small_config();
534 let data = vec![0x7Fu8; 4096];
535 let mut index = StreamDedupIndex::new(cfg);
536 let fp1 = index.ingest("a", Cursor::new(data.clone())).expect("ok");
537 let fp2 = index.ingest("b", Cursor::new(data)).expect("ok");
538 let sim = index.jaccard_similarity(&fp1, &fp2);
539 assert!(
540 (sim - 1.0).abs() < 1e-9,
541 "identical streams must have Jaccard = 1.0, got {sim}"
542 );
543 }
544
545 #[test]
546 fn test_completely_different_streams_jaccard_near_zero() {
547 let cfg = small_config();
548 let data_a = vec![0x00u8; 4096];
549 let data_b = vec![0xFFu8; 4096];
550 let mut index = StreamDedupIndex::new(cfg);
551 let fp_a = index.ingest("a", Cursor::new(data_a)).expect("ok");
552 let fp_b = index.ingest("b", Cursor::new(data_b)).expect("ok");
553 let sim = index.jaccard_similarity(&fp_a, &fp_b);
554 assert!(
556 sim < 0.5,
557 "different data should have low Jaccard, got {sim}"
558 );
559 }
560
561 #[test]
562 fn test_find_duplicates_returns_pairs_above_threshold() {
563 let cfg = small_config();
564 let data = vec![0xCCu8; 2048];
565 let mut index = StreamDedupIndex::new(cfg);
566 index.ingest("x", Cursor::new(data.clone())).expect("ok");
567 index.ingest("y", Cursor::new(data)).expect("ok");
568
569 let pairs = index.find_duplicates(0.9);
570 assert!(!pairs.is_empty());
571 let (ref na, ref nb, sim) = pairs[0];
572 assert!(
574 (na == "x" || na == "y") && (nb == "x" || nb == "y") && na != nb,
575 "pair names should be x and y"
576 );
577 assert!(sim >= 0.9);
578 }
579
580 #[test]
581 fn test_find_duplicates_no_pairs_above_high_threshold() {
582 let cfg = small_config();
583 let mut index = StreamDedupIndex::new(cfg);
584 index
585 .ingest("p", Cursor::new(vec![0x11u8; 2048]))
586 .expect("ok");
587 index
588 .ingest("q", Cursor::new(vec![0x22u8; 2048]))
589 .expect("ok");
590 let pairs = index.find_duplicates(0.99);
592 assert!(
593 pairs.is_empty() || pairs.iter().all(|(_, _, s)| *s >= 0.99),
594 "all returned pairs must meet the threshold"
595 );
596 }
597
598 #[test]
599 fn test_index_len_and_is_empty() {
600 let mut index = StreamDedupIndex::new(small_config());
601 assert!(index.is_empty());
602 index
603 .ingest("file", Cursor::new(vec![1u8; 100]))
604 .expect("ok");
605 assert_eq!(index.len(), 1);
606 assert!(!index.is_empty());
607 }
608
609 #[test]
610 fn test_get_fingerprint_after_ingest() {
611 let mut index = StreamDedupIndex::new(small_config());
612 index
613 .ingest("myfile", Cursor::new(vec![42u8; 512]))
614 .expect("ok");
615 let fp = index.get("myfile");
616 assert!(fp.is_some());
617 assert!(fp.unwrap().chunk_count() >= 1);
618 }
619
620 #[test]
621 fn test_fnv1a_deterministic() {
622 let h1 = fnv1a_64(b"hello world");
623 let h2 = fnv1a_64(b"hello world");
624 assert_eq!(h1, h2);
625 }
626
627 #[test]
628 fn test_total_bytes_matches_data_length() {
629 let cfg = small_config();
630 let data = vec![0x33u8; 3333];
631 let mut index = StreamDedupIndex::new(cfg);
632 let fp = index.ingest("sz", Cursor::new(data)).expect("ok");
633 assert_eq!(fp.total_bytes, 3333);
634 }
635}