Skip to main content

laminar_core/sink/
dedup.rs

1//! Deduplication store for idempotent sinks
2
3#![allow(clippy::cast_possible_truncation)]
4
5use std::collections::VecDeque;
6use std::hash::{Hash, Hasher};
7
8use fxhash::FxHashSet;
9
10/// Unique identifier for a record
11///
12/// Used to track which records have been written to prevent duplicates.
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct RecordId {
15    /// The ID bytes (hash or actual ID)
16    bytes: Vec<u8>,
17}
18
19impl RecordId {
20    /// Create a record ID from bytes
21    #[must_use]
22    pub fn from_bytes(bytes: &[u8]) -> Self {
23        Self {
24            bytes: bytes.to_vec(),
25        }
26    }
27
28    /// Create a record ID from a string
29    #[must_use]
30    pub fn from_string(s: &str) -> Self {
31        Self {
32            bytes: s.as_bytes().to_vec(),
33        }
34    }
35
36    /// Create a record ID from a numeric value
37    #[must_use]
38    pub fn from_u64(value: u64) -> Self {
39        Self {
40            bytes: value.to_le_bytes().to_vec(),
41        }
42    }
43
44    /// Create a record ID by hashing data
45    ///
46    /// Uses `FxHash` for fast, deterministic hashing.
47    #[must_use]
48    pub fn from_hash(data: &[u8]) -> Self {
49        use fxhash::FxHasher;
50        let mut hasher = FxHasher::default();
51        data.hash(&mut hasher);
52        let hash = hasher.finish();
53        Self::from_u64(hash)
54    }
55
56    /// Create a composite ID from multiple fields
57    #[must_use]
58    pub fn composite(parts: &[&[u8]]) -> Self {
59        let mut bytes = Vec::new();
60        for part in parts {
61            bytes.extend_from_slice(&(part.len() as u32).to_le_bytes());
62            bytes.extend_from_slice(part);
63        }
64        Self::from_hash(&bytes)
65    }
66
67    /// Get the raw bytes
68    #[must_use]
69    pub fn as_bytes(&self) -> &[u8] {
70        &self.bytes
71    }
72}
73
74/// Trait for deduplication stores
75pub trait DeduplicationStore: Send {
76    /// Check if a record ID is new (not seen before)
77    fn is_new(&self, id: &RecordId) -> bool;
78
79    /// Mark a record ID as seen
80    fn mark_seen(&mut self, id: RecordId);
81
82    /// Mark multiple record IDs as seen
83    fn mark_seen_batch(&mut self, ids: impl IntoIterator<Item = RecordId>) {
84        for id in ids {
85            self.mark_seen(id);
86        }
87    }
88
89    /// Filter a batch to only include new (unseen) IDs
90    fn filter_new<'a>(&self, ids: impl IntoIterator<Item = &'a RecordId>) -> Vec<&'a RecordId> {
91        ids.into_iter().filter(|id| self.is_new(id)).collect()
92    }
93
94    /// Get the number of tracked IDs
95    fn len(&self) -> usize;
96
97    /// Check if the store is empty
98    fn is_empty(&self) -> bool {
99        self.len() == 0
100    }
101
102    /// Clear all tracked IDs
103    fn clear(&mut self);
104
105    /// Serialize the store for checkpointing
106    fn to_bytes(&self) -> Vec<u8>;
107
108    /// Restore from serialized bytes
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if deserialization fails.
113    fn restore(&mut self, bytes: &[u8]) -> Result<(), String>;
114}
115
116/// In-memory deduplication store with bounded capacity
117///
118/// Uses FIFO eviction when capacity is exceeded - oldest entries
119/// are removed first. This is appropriate because:
120/// 1. Older records are less likely to be seen again
121/// 2. Bounded memory usage
122/// 3. Fast O(1) lookup and insertion
123pub struct InMemoryDedup {
124    /// Set of seen record IDs for O(1) lookup
125    seen: FxHashSet<RecordId>,
126
127    /// Queue of IDs in insertion order for FIFO eviction
128    order: VecDeque<RecordId>,
129
130    /// Maximum number of entries to keep
131    capacity: usize,
132}
133
134impl InMemoryDedup {
135    /// Create a new in-memory dedup store with the given capacity
136    #[must_use]
137    pub fn new(capacity: usize) -> Self {
138        Self {
139            seen: FxHashSet::default(),
140            order: VecDeque::with_capacity(capacity.min(10000)),
141            capacity,
142        }
143    }
144
145    /// Get the current capacity
146    #[must_use]
147    pub fn capacity(&self) -> usize {
148        self.capacity
149    }
150
151    /// Evict oldest entries if over capacity
152    fn evict_if_needed(&mut self) {
153        while self.order.len() >= self.capacity {
154            if let Some(old_id) = self.order.pop_front() {
155                self.seen.remove(&old_id);
156            }
157        }
158    }
159}
160
161impl DeduplicationStore for InMemoryDedup {
162    fn is_new(&self, id: &RecordId) -> bool {
163        !self.seen.contains(id)
164    }
165
166    fn mark_seen(&mut self, id: RecordId) {
167        if self.seen.insert(id.clone()) {
168            // Only add to order if it's actually new
169            self.order.push_back(id);
170            self.evict_if_needed();
171        }
172    }
173
174    fn len(&self) -> usize {
175        self.seen.len()
176    }
177
178    fn clear(&mut self) {
179        self.seen.clear();
180        self.order.clear();
181    }
182
183    fn to_bytes(&self) -> Vec<u8> {
184        let mut bytes = Vec::new();
185
186        // Version
187        bytes.push(1u8);
188
189        // Capacity
190        bytes.extend_from_slice(&(self.capacity as u64).to_le_bytes());
191
192        // Number of entries
193        bytes.extend_from_slice(&(self.order.len() as u32).to_le_bytes());
194
195        // Each entry (in order)
196        for id in &self.order {
197            let id_bytes = id.as_bytes();
198            bytes.extend_from_slice(&(id_bytes.len() as u32).to_le_bytes());
199            bytes.extend_from_slice(id_bytes);
200        }
201
202        bytes
203    }
204
205    fn restore(&mut self, bytes: &[u8]) -> Result<(), String> {
206        if bytes.is_empty() {
207            return Err("Empty dedup data".to_string());
208        }
209
210        let mut pos = 0;
211
212        // Version
213        let version = bytes[pos];
214        pos += 1;
215        if version != 1 {
216            return Err(format!("Unsupported dedup version: {version}"));
217        }
218
219        // Capacity
220        if pos + 8 > bytes.len() {
221            return Err("Unexpected end of data (capacity)".to_string());
222        }
223        let capacity = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
224        pos += 8;
225        self.capacity = capacity;
226
227        // Number of entries
228        if pos + 4 > bytes.len() {
229            return Err("Unexpected end of data (count)".to_string());
230        }
231        let num_entries = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
232        pos += 4;
233
234        // Clear and restore
235        self.clear();
236
237        for _ in 0..num_entries {
238            if pos + 4 > bytes.len() {
239                return Err("Unexpected end of data (entry length)".to_string());
240            }
241            let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
242            pos += 4;
243
244            if pos + len > bytes.len() {
245                return Err("Unexpected end of data (entry data)".to_string());
246            }
247            let id = RecordId::from_bytes(&bytes[pos..pos + len]);
248            pos += len;
249
250            self.seen.insert(id.clone());
251            self.order.push_back(id);
252        }
253
254        Ok(())
255    }
256}
257
258/// Bloom filter-based deduplication for very high throughput
259///
260/// Uses probabilistic deduplication - may have false positives
261/// (treating new records as duplicates) but never false negatives.
262/// Good for append-only sinks where occasional duplicates are acceptable.
263#[allow(dead_code)] // Public API for Phase 3 connector implementations
264pub struct BloomFilterDedup {
265    /// Bloom filter bits
266    bits: Vec<u64>,
267
268    /// Number of hash functions
269    num_hashes: usize,
270
271    /// Number of bits
272    num_bits: usize,
273
274    /// Approximate count of elements
275    count: usize,
276}
277
278#[allow(dead_code)] // Public API for Phase 3 connector implementations
279impl BloomFilterDedup {
280    /// Create a new bloom filter dedup with target false positive rate
281    ///
282    /// # Arguments
283    ///
284    /// * `expected_elements` - Expected number of elements to track
285    /// * `false_positive_rate` - Target false positive rate (0.0 to 1.0)
286    #[must_use]
287    #[allow(
288        clippy::cast_sign_loss,
289        clippy::cast_precision_loss,
290        clippy::cast_possible_truncation
291    )]
292    pub fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
293        // Calculate optimal size
294        // m = -n * ln(p) / (ln(2)^2)
295        let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
296        let num_bits =
297            (-(expected_elements as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
298        let num_bits = num_bits.max(64); // Minimum 64 bits
299
300        // Calculate optimal number of hash functions
301        // k = (m/n) * ln(2)
302        let num_hashes =
303            ((num_bits as f64 / expected_elements as f64) * std::f64::consts::LN_2).ceil() as usize;
304        let num_hashes = num_hashes.clamp(1, 16);
305
306        let num_words = num_bits.div_ceil(64);
307
308        Self {
309            bits: vec![0u64; num_words],
310            num_hashes,
311            num_bits,
312            count: 0,
313        }
314    }
315
316    /// Get indices for a record ID
317    fn get_indices(&self, id: &RecordId) -> Vec<usize> {
318        use fxhash::FxHasher;
319
320        let mut indices = Vec::with_capacity(self.num_hashes);
321
322        // Use double hashing: h(i) = h1 + i * h2
323        let mut hasher1 = FxHasher::default();
324        id.bytes.hash(&mut hasher1);
325        let h1 = hasher1.finish() as usize;
326
327        let mut hasher2 = FxHasher::default();
328        hasher1.finish().hash(&mut hasher2);
329        let h2 = hasher2.finish() as usize;
330
331        for i in 0..self.num_hashes {
332            let index = (h1.wrapping_add(i.wrapping_mul(h2))) % self.num_bits;
333            indices.push(index);
334        }
335
336        indices
337    }
338
339    /// Check if the filter might contain the ID
340    fn might_contain(&self, id: &RecordId) -> bool {
341        let indices = self.get_indices(id);
342        for idx in indices {
343            let word = idx / 64;
344            let bit = idx % 64;
345            if self.bits[word] & (1u64 << bit) == 0 {
346                return false;
347            }
348        }
349        true
350    }
351
352    /// Add an ID to the filter
353    fn add(&mut self, id: &RecordId) {
354        let indices = self.get_indices(id);
355        for idx in indices {
356            let word = idx / 64;
357            let bit = idx % 64;
358            self.bits[word] |= 1u64 << bit;
359        }
360        self.count += 1;
361    }
362
363    /// Get the approximate false positive rate
364    #[must_use]
365    #[allow(clippy::cast_precision_loss)]
366    pub fn false_positive_rate(&self) -> f64 {
367        // FPR = (1 - e^(-kn/m))^k
368        let k = self.num_hashes as f64;
369        let n = self.count as f64;
370        let m = self.num_bits as f64;
371        (1.0 - (-k * n / m).exp()).powf(k)
372    }
373}
374
375impl DeduplicationStore for BloomFilterDedup {
376    fn is_new(&self, id: &RecordId) -> bool {
377        !self.might_contain(id)
378    }
379
380    fn mark_seen(&mut self, id: RecordId) {
381        self.add(&id);
382    }
383
384    fn len(&self) -> usize {
385        self.count
386    }
387
388    fn clear(&mut self) {
389        self.bits.fill(0);
390        self.count = 0;
391    }
392
393    fn to_bytes(&self) -> Vec<u8> {
394        let mut bytes = Vec::new();
395
396        // Version
397        bytes.push(2u8); // Version 2 for bloom filter
398
399        // Parameters
400        bytes.extend_from_slice(&(self.num_bits as u64).to_le_bytes());
401        bytes.extend_from_slice(&(self.num_hashes as u32).to_le_bytes());
402        bytes.extend_from_slice(&(self.count as u64).to_le_bytes());
403
404        // Bits
405        bytes.extend_from_slice(&(self.bits.len() as u32).to_le_bytes());
406        for word in &self.bits {
407            bytes.extend_from_slice(&word.to_le_bytes());
408        }
409
410        bytes
411    }
412
413    fn restore(&mut self, bytes: &[u8]) -> Result<(), String> {
414        if bytes.is_empty() {
415            return Err("Empty bloom filter data".to_string());
416        }
417
418        let mut pos = 0;
419
420        // Version
421        let version = bytes[pos];
422        pos += 1;
423        if version != 2 {
424            return Err(format!("Unsupported bloom filter version: {version}"));
425        }
426
427        // Parameters
428        if pos + 20 > bytes.len() {
429            return Err("Unexpected end of data (parameters)".to_string());
430        }
431        self.num_bits = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
432        pos += 8;
433        self.num_hashes = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
434        pos += 4;
435        self.count = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
436        pos += 8;
437
438        // Bits
439        if pos + 4 > bytes.len() {
440            return Err("Unexpected end of data (bits length)".to_string());
441        }
442        let num_words = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
443        pos += 4;
444
445        if pos + num_words * 8 > bytes.len() {
446            return Err("Unexpected end of data (bits)".to_string());
447        }
448
449        self.bits = Vec::with_capacity(num_words);
450        for _ in 0..num_words {
451            let word = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
452            self.bits.push(word);
453            pos += 8;
454        }
455
456        Ok(())
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_record_id_from_bytes() {
466        let id = RecordId::from_bytes(b"test-id");
467        assert_eq!(id.as_bytes(), b"test-id");
468    }
469
470    #[test]
471    fn test_record_id_from_u64() {
472        let id = RecordId::from_u64(12345);
473        assert_eq!(id.as_bytes().len(), 8);
474    }
475
476    #[test]
477    fn test_record_id_from_hash() {
478        let id1 = RecordId::from_hash(b"some data");
479        let id2 = RecordId::from_hash(b"some data");
480        let id3 = RecordId::from_hash(b"other data");
481
482        assert_eq!(id1, id2); // Same data = same hash
483        assert_ne!(id1, id3); // Different data = different hash
484    }
485
486    #[test]
487    fn test_record_id_composite() {
488        let id1 = RecordId::composite(&[b"table", b"key1"]);
489        let id2 = RecordId::composite(&[b"table", b"key1"]);
490        let id3 = RecordId::composite(&[b"table", b"key2"]);
491
492        assert_eq!(id1, id2);
493        assert_ne!(id1, id3);
494    }
495
496    #[test]
497    fn test_in_memory_dedup_basic() {
498        let mut dedup = InMemoryDedup::new(1000);
499
500        let id = RecordId::from_string("test");
501        assert!(dedup.is_new(&id));
502
503        dedup.mark_seen(id.clone());
504        assert!(!dedup.is_new(&id));
505
506        assert_eq!(dedup.len(), 1);
507    }
508
509    #[test]
510    fn test_in_memory_dedup_capacity() {
511        let mut dedup = InMemoryDedup::new(3);
512
513        for i in 0..5 {
514            let id = RecordId::from_u64(i);
515            dedup.mark_seen(id);
516        }
517
518        // Should have evicted oldest entries
519        assert!(dedup.len() <= 3);
520
521        // Most recent should still be present
522        assert!(!dedup.is_new(&RecordId::from_u64(4)));
523    }
524
525    #[test]
526    fn test_in_memory_dedup_serialization() {
527        let mut dedup = InMemoryDedup::new(100);
528        dedup.mark_seen(RecordId::from_string("id1"));
529        dedup.mark_seen(RecordId::from_string("id2"));
530
531        let bytes = dedup.to_bytes();
532
533        let mut restored = InMemoryDedup::new(1); // Different initial capacity
534        restored.restore(&bytes).unwrap();
535
536        assert_eq!(restored.capacity(), 100);
537        assert_eq!(restored.len(), 2);
538        assert!(!restored.is_new(&RecordId::from_string("id1")));
539        assert!(!restored.is_new(&RecordId::from_string("id2")));
540    }
541
542    #[test]
543    fn test_in_memory_dedup_filter_new() {
544        let mut dedup = InMemoryDedup::new(100);
545        dedup.mark_seen(RecordId::from_u64(1));
546        dedup.mark_seen(RecordId::from_u64(3));
547
548        let ids = [
549            RecordId::from_u64(1),
550            RecordId::from_u64(2),
551            RecordId::from_u64(3),
552            RecordId::from_u64(4),
553        ];
554
555        let new_ids: Vec<_> = dedup.filter_new(ids.iter()).into_iter().cloned().collect();
556        assert_eq!(new_ids.len(), 2);
557        assert!(new_ids.contains(&RecordId::from_u64(2)));
558        assert!(new_ids.contains(&RecordId::from_u64(4)));
559    }
560
561    #[test]
562    fn test_bloom_filter_basic() {
563        let mut bloom = BloomFilterDedup::new(1000, 0.01);
564
565        let id = RecordId::from_string("test");
566        assert!(bloom.is_new(&id));
567
568        bloom.mark_seen(id.clone());
569        assert!(!bloom.is_new(&id));
570    }
571
572    #[test]
573    fn test_bloom_filter_no_false_negatives() {
574        let mut bloom = BloomFilterDedup::new(100, 0.01);
575
576        // Add many IDs
577        for i in 0..100 {
578            let id = RecordId::from_u64(i);
579            bloom.mark_seen(id);
580        }
581
582        // All should be found (no false negatives)
583        for i in 0..100 {
584            let id = RecordId::from_u64(i);
585            assert!(!bloom.is_new(&id), "False negative for id {i}");
586        }
587    }
588
589    #[test]
590    fn test_bloom_filter_serialization() {
591        let mut bloom = BloomFilterDedup::new(100, 0.01);
592        bloom.mark_seen(RecordId::from_string("id1"));
593        bloom.mark_seen(RecordId::from_string("id2"));
594
595        let bytes = bloom.to_bytes();
596
597        let mut restored = BloomFilterDedup::new(10, 0.1); // Different params
598        restored.restore(&bytes).unwrap();
599
600        assert!(!restored.is_new(&RecordId::from_string("id1")));
601        assert!(!restored.is_new(&RecordId::from_string("id2")));
602    }
603}