Skip to main content

rivven_cdc/common/
dedup.rs

1//! # CDC Event Deduplication
2//!
3//! Idempotent event processing with deduplication.
4//!
5//! ## Features
6//!
7//! - **Bloom Filter**: Fast probabilistic duplicate check
8//! - **LRU Cache**: Exact duplicate detection for recent events
9//! - **Composite Keys**: Dedup by table + PK or custom key
10//! - **Time-Based Expiry**: Auto-cleanup of old entries
11//!
12//! ## Usage
13//!
14//! ```ignore
15//! use rivven_cdc::common::{Deduplicator, DeduplicatorConfig, CdcEvent};
16//!
17//! let config = DeduplicatorConfig::default();
18//! let dedup = Deduplicator::new(config);
19//!
20//! // Check if event is duplicate
21//! if dedup.is_duplicate(&event).await {
22//!     continue; // Skip duplicate
23//! }
24//!
25//! // Mark as seen
26//! dedup.mark_seen(&event).await;
27//! ```
28
29use crate::common::CdcEvent;
30use std::collections::hash_map::DefaultHasher;
31use std::collections::{HashMap, VecDeque};
32use std::hash::{Hash, Hasher};
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35use tokio::sync::RwLock;
36use tracing::debug;
37
38/// Configuration for the deduplicator.
39#[derive(Debug, Clone)]
40pub struct DeduplicatorConfig {
41    /// Maximum number of recent event keys to track (exact dedup)
42    pub lru_capacity: usize,
43    /// Bloom filter size in bits (probabilistic dedup)
44    pub bloom_size_bits: usize,
45    /// Number of hash functions for bloom filter
46    pub bloom_hash_count: usize,
47    /// Time-to-live for seen events
48    pub ttl: Duration,
49    /// Key extraction strategy
50    pub key_strategy: KeyStrategy,
51}
52
53impl Default for DeduplicatorConfig {
54    fn default() -> Self {
55        Self {
56            lru_capacity: 100_000,
57            bloom_size_bits: 1_000_000, // ~1MB
58            bloom_hash_count: 7,
59            ttl: Duration::from_secs(3600), // 1 hour
60            key_strategy: KeyStrategy::TableAndPrimaryKey,
61        }
62    }
63}
64
65impl DeduplicatorConfig {
66    /// High-memory config for exact deduplication.
67    pub fn exact() -> Self {
68        Self {
69            lru_capacity: 1_000_000,
70            bloom_size_bits: 0, // No bloom filter
71            ttl: Duration::from_secs(7200),
72            ..Default::default()
73        }
74    }
75
76    /// Low-memory config using bloom filter.
77    pub fn compact() -> Self {
78        Self {
79            lru_capacity: 10_000,
80            bloom_size_bits: 10_000_000, // ~10MB
81            bloom_hash_count: 10,
82            ttl: Duration::from_secs(1800),
83            ..Default::default()
84        }
85    }
86}
87
88/// Strategy for extracting deduplication keys from events.
89#[derive(Debug, Clone)]
90pub enum KeyStrategy {
91    /// Use table name + primary key columns
92    TableAndPrimaryKey,
93    /// Use table + all column values
94    TableAndAllColumns,
95    /// Use transaction ID + LSN
96    TransactionPosition,
97    /// Custom key extractor
98    Custom(fn(&CdcEvent) -> String),
99}
100
101impl KeyStrategy {
102    /// Extract key from event.
103    pub fn extract_key(&self, event: &CdcEvent) -> String {
104        match self {
105            KeyStrategy::TableAndPrimaryKey => {
106                // Use table + id field if present
107                let pk = event
108                    .after
109                    .as_ref()
110                    .and_then(|m| m.get("id"))
111                    .map(|v| v.to_string())
112                    .unwrap_or_else(|| {
113                        // Fall back to hash of all values
114                        event
115                            .after
116                            .as_ref()
117                            .map(|m| {
118                                let mut h = DefaultHasher::new();
119                                if let Some(obj) = m.as_object() {
120                                    for (k, v) in obj {
121                                        k.hash(&mut h);
122                                        v.to_string().hash(&mut h);
123                                    }
124                                }
125                                h.finish().to_string()
126                            })
127                            .unwrap_or_default()
128                    });
129                format!("{}:{}:{}:{:?}", event.schema, event.table, pk, event.op)
130            }
131            KeyStrategy::TableAndAllColumns => {
132                let mut hasher = DefaultHasher::new();
133                event.schema.hash(&mut hasher);
134                event.table.hash(&mut hasher);
135                format!("{:?}", event.op).hash(&mut hasher);
136                if let Some(after) = &event.after {
137                    if let Some(obj) = after.as_object() {
138                        for (k, v) in obj {
139                            k.hash(&mut hasher);
140                            v.to_string().hash(&mut hasher);
141                        }
142                    }
143                }
144                hasher.finish().to_string()
145            }
146            KeyStrategy::TransactionPosition => {
147                // Use database and timestamp as a fallback for transaction position
148                format!("{}:{}:{}", event.database, event.table, event.timestamp)
149            }
150            KeyStrategy::Custom(f) => f(event),
151        }
152    }
153}
154
155/// Simple bloom filter for fast probabilistic deduplication.
156struct BloomFilter {
157    bits: Vec<u64>,
158    size_bits: usize,
159    hash_count: usize,
160}
161
162impl BloomFilter {
163    fn new(size_bits: usize, hash_count: usize) -> Self {
164        let num_words = size_bits.div_ceil(64);
165        Self {
166            bits: vec![0u64; num_words],
167            size_bits,
168            hash_count,
169        }
170    }
171
172    fn insert(&mut self, key: &str) {
173        for i in 0..self.hash_count {
174            let bit_index = self.hash(key, i);
175            let word_index = bit_index / 64;
176            let bit_offset = bit_index % 64;
177            self.bits[word_index] |= 1u64 << bit_offset;
178        }
179    }
180
181    fn contains(&self, key: &str) -> bool {
182        for i in 0..self.hash_count {
183            let bit_index = self.hash(key, i);
184            let word_index = bit_index / 64;
185            let bit_offset = bit_index % 64;
186            if (self.bits[word_index] & (1u64 << bit_offset)) == 0 {
187                return false;
188            }
189        }
190        true
191    }
192
193    fn hash(&self, key: &str, seed: usize) -> usize {
194        let mut hasher = DefaultHasher::new();
195        key.hash(&mut hasher);
196        seed.hash(&mut hasher);
197        (hasher.finish() as usize) % self.size_bits
198    }
199
200    fn clear(&mut self) {
201        for word in &mut self.bits {
202            *word = 0;
203        }
204    }
205}
206
207/// Entry in the LRU cache.
208struct LruEntry {
209    seen_at: Instant,
210    count: u32,
211}
212
213/// Deduplicator with LRU cache and optional bloom filter.
214pub struct Deduplicator {
215    config: DeduplicatorConfig,
216    lru: RwLock<LruState>,
217    bloom: RwLock<Option<BloomFilter>>,
218    stats: DeduplicatorStats,
219}
220
221struct LruState {
222    cache: HashMap<String, LruEntry>,
223    order: VecDeque<String>,
224    last_cleanup: Instant,
225}
226
227/// Statistics for deduplication.
228#[derive(Debug, Default)]
229pub struct DeduplicatorStats {
230    pub events_checked: AtomicU64,
231    pub duplicates_found: AtomicU64,
232    pub bloom_false_positives: AtomicU64,
233    pub lru_hits: AtomicU64,
234    pub lru_misses: AtomicU64,
235    pub cleanups: AtomicU64,
236}
237
238impl DeduplicatorStats {
239    pub fn new() -> Self {
240        Self::default()
241    }
242
243    pub fn snapshot(&self) -> DeduplicatorStatsSnapshot {
244        DeduplicatorStatsSnapshot {
245            events_checked: self.events_checked.load(Ordering::Relaxed),
246            duplicates_found: self.duplicates_found.load(Ordering::Relaxed),
247            bloom_false_positives: self.bloom_false_positives.load(Ordering::Relaxed),
248            lru_hits: self.lru_hits.load(Ordering::Relaxed),
249            lru_misses: self.lru_misses.load(Ordering::Relaxed),
250            cleanups: self.cleanups.load(Ordering::Relaxed),
251        }
252    }
253}
254
255/// Snapshot of deduplicator statistics.
256#[derive(Debug, Clone)]
257pub struct DeduplicatorStatsSnapshot {
258    pub events_checked: u64,
259    pub duplicates_found: u64,
260    pub bloom_false_positives: u64,
261    pub lru_hits: u64,
262    pub lru_misses: u64,
263    pub cleanups: u64,
264}
265
266impl DeduplicatorStatsSnapshot {
267    /// Calculate duplicate rate (0.0 - 1.0).
268    pub fn duplicate_rate(&self) -> f64 {
269        if self.events_checked == 0 {
270            return 0.0;
271        }
272        self.duplicates_found as f64 / self.events_checked as f64
273    }
274
275    /// Calculate bloom filter false positive rate.
276    pub fn bloom_fp_rate(&self) -> f64 {
277        let true_positives = self
278            .duplicates_found
279            .saturating_sub(self.bloom_false_positives);
280        let total_positives = true_positives + self.bloom_false_positives;
281        if total_positives == 0 {
282            return 0.0;
283        }
284        self.bloom_false_positives as f64 / total_positives as f64
285    }
286}
287
288impl Deduplicator {
289    /// Create a new deduplicator.
290    pub fn new(config: DeduplicatorConfig) -> Self {
291        let bloom = if config.bloom_size_bits > 0 {
292            Some(BloomFilter::new(
293                config.bloom_size_bits,
294                config.bloom_hash_count,
295            ))
296        } else {
297            None
298        };
299
300        Self {
301            lru: RwLock::new(LruState {
302                cache: HashMap::with_capacity(config.lru_capacity),
303                order: VecDeque::with_capacity(config.lru_capacity),
304                last_cleanup: Instant::now(),
305            }),
306            bloom: RwLock::new(bloom),
307            stats: DeduplicatorStats::new(),
308            config,
309        }
310    }
311
312    /// Check if an event is a duplicate.
313    pub async fn is_duplicate(&self, event: &CdcEvent) -> bool {
314        self.stats.events_checked.fetch_add(1, Ordering::Relaxed);
315        let key = self.config.key_strategy.extract_key(event);
316
317        // Check bloom filter first (fast path)
318        if let Some(bloom) = self.bloom.read().await.as_ref() {
319            if !bloom.contains(&key) {
320                self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
321                return false; // Definitely not seen
322            }
323        }
324
325        // Check LRU cache (exact)
326        let lru = self.lru.read().await;
327        if let Some(entry) = lru.cache.get(&key) {
328            if entry.seen_at.elapsed() < self.config.ttl {
329                self.stats.lru_hits.fetch_add(1, Ordering::Relaxed);
330                self.stats.duplicates_found.fetch_add(1, Ordering::Relaxed);
331                return true;
332            }
333        }
334
335        // Bloom said yes, but LRU says no - false positive
336        if self.config.bloom_size_bits > 0 {
337            self.stats
338                .bloom_false_positives
339                .fetch_add(1, Ordering::Relaxed);
340        }
341        self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
342        false
343    }
344
345    /// Mark an event as seen.
346    pub async fn mark_seen(&self, event: &CdcEvent) {
347        let key = self.config.key_strategy.extract_key(event);
348
349        // Add to bloom filter
350        if let Some(bloom) = self.bloom.write().await.as_mut() {
351            bloom.insert(&key);
352        }
353
354        // Add to LRU cache
355        let mut lru = self.lru.write().await;
356
357        // Update existing or insert new
358        if let Some(entry) = lru.cache.get_mut(&key) {
359            entry.seen_at = Instant::now();
360            entry.count += 1;
361        } else {
362            // Evict if at capacity
363            while lru.cache.len() >= self.config.lru_capacity {
364                if let Some(old_key) = lru.order.pop_front() {
365                    lru.cache.remove(&old_key);
366                }
367            }
368
369            lru.cache.insert(
370                key.clone(),
371                LruEntry {
372                    seen_at: Instant::now(),
373                    count: 1,
374                },
375            );
376            lru.order.push_back(key);
377        }
378
379        // Periodic cleanup
380        if lru.last_cleanup.elapsed() > Duration::from_secs(60) {
381            drop(lru);
382            self.cleanup().await;
383        }
384    }
385
386    /// Check and mark in one operation.
387    pub async fn check_and_mark(&self, event: &CdcEvent) -> bool {
388        if self.is_duplicate(event).await {
389            return true;
390        }
391        self.mark_seen(event).await;
392        false
393    }
394
395    /// Process events, filtering out duplicates.
396    pub async fn filter_duplicates(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
397        let mut unique = Vec::with_capacity(events.len());
398        for event in events {
399            if !self.check_and_mark(&event).await {
400                unique.push(event);
401            }
402        }
403        unique
404    }
405
406    /// Clean up expired entries.
407    pub async fn cleanup(&self) {
408        let mut lru = self.lru.write().await;
409        let now = Instant::now();
410
411        // Remove expired entries
412        lru.cache
413            .retain(|_, entry| entry.seen_at.elapsed() < self.config.ttl);
414
415        // Rebuild order queue - collect keys first to avoid borrow conflict
416        let valid_keys: Vec<_> = lru
417            .order
418            .iter()
419            .filter(|key| lru.cache.contains_key(*key))
420            .cloned()
421            .collect();
422        lru.order = std::collections::VecDeque::from(valid_keys);
423
424        lru.last_cleanup = now;
425        self.stats.cleanups.fetch_add(1, Ordering::Relaxed);
426
427        debug!(
428            "Deduplicator cleanup: {} entries remaining",
429            lru.cache.len()
430        );
431    }
432
433    /// Clear all state.
434    pub async fn clear(&self) {
435        let mut lru = self.lru.write().await;
436        lru.cache.clear();
437        lru.order.clear();
438
439        if let Some(bloom) = self.bloom.write().await.as_mut() {
440            bloom.clear();
441        }
442    }
443
444    /// Get statistics.
445    pub fn stats(&self) -> DeduplicatorStatsSnapshot {
446        self.stats.snapshot()
447    }
448
449    /// Get current cache size.
450    pub async fn cache_size(&self) -> usize {
451        self.lru.read().await.cache.len()
452    }
453}
454
455/// Deduplication key generator for common patterns.
456pub mod keys {
457    use super::*;
458
459    /// Generate key from specific columns.
460    pub fn from_columns(columns: Vec<String>) -> impl Fn(&CdcEvent) -> String + Send + Sync {
461        move |event| {
462            let values: Vec<String> = columns
463                .iter()
464                .filter_map(|col| {
465                    event
466                        .after
467                        .as_ref()
468                        .and_then(|m| m.get(col))
469                        .map(|v| v.to_string())
470                })
471                .collect();
472            format!("{}:{}:{}", event.schema, event.table, values.join(":"))
473        }
474    }
475
476    /// Generate key including operation type.
477    pub fn with_operation<F>(base: F) -> impl Fn(&CdcEvent) -> String + Send + Sync
478    where
479        F: Fn(&CdcEvent) -> String + Send + Sync,
480    {
481        move |event| format!("{}:{:?}", base(event), event.op)
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use crate::CdcOp;
489
490    fn make_event(table: &str, id: i64, op: CdcOp) -> CdcEvent {
491        CdcEvent {
492            source_type: "postgres".to_string(),
493            database: "testdb".to_string(),
494            schema: "public".to_string(),
495            table: table.to_string(),
496            op,
497            before: None,
498            after: Some(serde_json::json!({ "id": id })),
499            timestamp: chrono::Utc::now().timestamp(),
500            transaction: None,
501        }
502    }
503
504    #[tokio::test]
505    async fn test_deduplicator_basic() {
506        let dedup = Deduplicator::new(DeduplicatorConfig::default());
507
508        let event = make_event("users", 1, CdcOp::Insert);
509
510        // First time - not duplicate
511        assert!(!dedup.is_duplicate(&event).await);
512        dedup.mark_seen(&event).await;
513
514        // Second time - duplicate
515        assert!(dedup.is_duplicate(&event).await);
516    }
517
518    #[tokio::test]
519    async fn test_deduplicator_different_ids() {
520        let dedup = Deduplicator::new(DeduplicatorConfig::default());
521
522        let event1 = make_event("users", 1, CdcOp::Insert);
523        let event2 = make_event("users", 2, CdcOp::Insert);
524
525        dedup.mark_seen(&event1).await;
526
527        assert!(dedup.is_duplicate(&event1).await);
528        assert!(!dedup.is_duplicate(&event2).await);
529    }
530
531    #[tokio::test]
532    async fn test_deduplicator_different_ops() {
533        let dedup = Deduplicator::new(DeduplicatorConfig::default());
534
535        let insert = make_event("users", 1, CdcOp::Insert);
536        let update = make_event("users", 1, CdcOp::Update);
537
538        dedup.mark_seen(&insert).await;
539
540        assert!(dedup.is_duplicate(&insert).await);
541        assert!(!dedup.is_duplicate(&update).await); // Different op = different key
542    }
543
544    #[tokio::test]
545    async fn test_check_and_mark() {
546        let dedup = Deduplicator::new(DeduplicatorConfig::default());
547
548        let event = make_event("users", 1, CdcOp::Insert);
549
550        // First call marks as seen
551        assert!(!dedup.check_and_mark(&event).await);
552
553        // Second call detects duplicate
554        assert!(dedup.check_and_mark(&event).await);
555    }
556
557    #[tokio::test]
558    async fn test_filter_duplicates() {
559        let dedup = Deduplicator::new(DeduplicatorConfig::default());
560
561        let events = vec![
562            make_event("users", 1, CdcOp::Insert),
563            make_event("users", 1, CdcOp::Insert), // Duplicate
564            make_event("users", 2, CdcOp::Insert),
565            make_event("users", 2, CdcOp::Insert), // Duplicate
566            make_event("users", 3, CdcOp::Insert),
567        ];
568
569        let unique = dedup.filter_duplicates(events).await;
570        assert_eq!(unique.len(), 3); // Only unique events
571    }
572
573    #[tokio::test]
574    async fn test_deduplicator_stats() {
575        let dedup = Deduplicator::new(DeduplicatorConfig::default());
576
577        let event = make_event("users", 1, CdcOp::Insert);
578        dedup.check_and_mark(&event).await;
579        dedup.check_and_mark(&event).await;
580        dedup.check_and_mark(&event).await;
581
582        let stats = dedup.stats();
583        assert_eq!(stats.events_checked, 3);
584        assert_eq!(stats.duplicates_found, 2);
585    }
586
587    #[tokio::test]
588    async fn test_deduplicator_clear() {
589        let dedup = Deduplicator::new(DeduplicatorConfig::default());
590
591        let event = make_event("users", 1, CdcOp::Insert);
592        dedup.mark_seen(&event).await;
593        assert!(dedup.is_duplicate(&event).await);
594
595        dedup.clear().await;
596        assert!(!dedup.is_duplicate(&event).await);
597    }
598
599    #[tokio::test]
600    async fn test_key_strategy_transaction() {
601        let config = DeduplicatorConfig {
602            key_strategy: KeyStrategy::TransactionPosition,
603            ..Default::default()
604        };
605        let dedup = Deduplicator::new(config);
606
607        let mut event1 = make_event("users", 1, CdcOp::Insert);
608        event1.database = "db1".to_string();
609        event1.timestamp = 1000;
610
611        let mut event2 = make_event("users", 2, CdcOp::Insert);
612        event2.database = "db1".to_string();
613        event2.timestamp = 1000;
614
615        // Same database + table + timestamp = duplicate
616        dedup.mark_seen(&event1).await;
617        assert!(dedup.is_duplicate(&event2).await);
618    }
619
620    #[tokio::test]
621    async fn test_bloom_filter() {
622        let config = DeduplicatorConfig {
623            lru_capacity: 100,
624            bloom_size_bits: 10_000,
625            bloom_hash_count: 5,
626            ..Default::default()
627        };
628        let dedup = Deduplicator::new(config);
629
630        // Add many events
631        for i in 0..50 {
632            let event = make_event("users", i, CdcOp::Insert);
633            dedup.mark_seen(&event).await;
634        }
635
636        // Check duplicates
637        for i in 0..50 {
638            let event = make_event("users", i, CdcOp::Insert);
639            assert!(dedup.is_duplicate(&event).await);
640        }
641
642        // New events should not be duplicates
643        let new_event = make_event("users", 999, CdcOp::Insert);
644        assert!(!dedup.is_duplicate(&new_event).await);
645    }
646
647    #[tokio::test]
648    async fn test_lru_eviction() {
649        let config = DeduplicatorConfig {
650            lru_capacity: 10,
651            bloom_size_bits: 0, // No bloom filter
652            ..Default::default()
653        };
654        let dedup = Deduplicator::new(config);
655
656        // Add more than capacity
657        for i in 0..20 {
658            let event = make_event("users", i, CdcOp::Insert);
659            dedup.mark_seen(&event).await;
660        }
661
662        // Only last 10 should be in cache
663        assert_eq!(dedup.cache_size().await, 10);
664
665        // Recent events should be detected
666        let recent = make_event("users", 19, CdcOp::Insert);
667        assert!(dedup.is_duplicate(&recent).await);
668
669        // Old events should not (evicted)
670        let old = make_event("users", 0, CdcOp::Insert);
671        assert!(!dedup.is_duplicate(&old).await);
672    }
673
674    #[test]
675    fn test_duplicate_rate() {
676        let stats = DeduplicatorStatsSnapshot {
677            events_checked: 100,
678            duplicates_found: 25,
679            bloom_false_positives: 5,
680            lru_hits: 20,
681            lru_misses: 80,
682            cleanups: 1,
683        };
684
685        assert!((stats.duplicate_rate() - 0.25).abs() < 0.001);
686    }
687
688    #[test]
689    fn test_config_presets() {
690        let exact = DeduplicatorConfig::exact();
691        assert_eq!(exact.bloom_size_bits, 0);
692        assert_eq!(exact.lru_capacity, 1_000_000);
693
694        let compact = DeduplicatorConfig::compact();
695        assert_eq!(compact.bloom_size_bits, 10_000_000);
696        assert_eq!(compact.lru_capacity, 10_000);
697    }
698}