1use crate::common::CdcEvent;
30use std::collections::hash_map::DefaultHasher;
31use std::collections::{HashMap, VecDeque};
32use std::hash::{Hash, Hasher};
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35use tokio::sync::RwLock;
36use tracing::debug;
37
38#[derive(Debug, Clone)]
40pub struct DeduplicatorConfig {
41 pub lru_capacity: usize,
43 pub bloom_size_bits: usize,
45 pub bloom_hash_count: usize,
47 pub ttl: Duration,
49 pub key_strategy: KeyStrategy,
51}
52
53impl Default for DeduplicatorConfig {
54 fn default() -> Self {
55 Self {
56 lru_capacity: 100_000,
57 bloom_size_bits: 1_000_000, bloom_hash_count: 7,
59 ttl: Duration::from_secs(3600), key_strategy: KeyStrategy::TableAndPrimaryKey,
61 }
62 }
63}
64
65impl DeduplicatorConfig {
66 pub fn exact() -> Self {
68 Self {
69 lru_capacity: 1_000_000,
70 bloom_size_bits: 0, ttl: Duration::from_secs(7200),
72 ..Default::default()
73 }
74 }
75
76 pub fn compact() -> Self {
78 Self {
79 lru_capacity: 10_000,
80 bloom_size_bits: 10_000_000, bloom_hash_count: 10,
82 ttl: Duration::from_secs(1800),
83 ..Default::default()
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90pub enum KeyStrategy {
91 TableAndPrimaryKey,
93 TableAndAllColumns,
95 TransactionPosition,
97 Custom(fn(&CdcEvent) -> String),
99}
100
101impl KeyStrategy {
102 pub fn extract_key(&self, event: &CdcEvent) -> String {
104 match self {
105 KeyStrategy::TableAndPrimaryKey => {
106 let pk = event
108 .after
109 .as_ref()
110 .and_then(|m| m.get("id"))
111 .map(|v| v.to_string())
112 .unwrap_or_else(|| {
113 event
115 .after
116 .as_ref()
117 .map(|m| {
118 let mut h = DefaultHasher::new();
119 if let Some(obj) = m.as_object() {
120 for (k, v) in obj {
121 k.hash(&mut h);
122 v.to_string().hash(&mut h);
123 }
124 }
125 h.finish().to_string()
126 })
127 .unwrap_or_default()
128 });
129 format!("{}:{}:{}:{:?}", event.schema, event.table, pk, event.op)
130 }
131 KeyStrategy::TableAndAllColumns => {
132 let mut hasher = DefaultHasher::new();
133 event.schema.hash(&mut hasher);
134 event.table.hash(&mut hasher);
135 format!("{:?}", event.op).hash(&mut hasher);
136 if let Some(after) = &event.after {
137 if let Some(obj) = after.as_object() {
138 for (k, v) in obj {
139 k.hash(&mut hasher);
140 v.to_string().hash(&mut hasher);
141 }
142 }
143 }
144 hasher.finish().to_string()
145 }
146 KeyStrategy::TransactionPosition => {
147 format!("{}:{}:{}", event.database, event.table, event.timestamp)
149 }
150 KeyStrategy::Custom(f) => f(event),
151 }
152 }
153}
154
155struct BloomFilter {
157 bits: Vec<u64>,
158 size_bits: usize,
159 hash_count: usize,
160}
161
162impl BloomFilter {
163 fn new(size_bits: usize, hash_count: usize) -> Self {
164 let num_words = size_bits.div_ceil(64);
165 Self {
166 bits: vec![0u64; num_words],
167 size_bits,
168 hash_count,
169 }
170 }
171
172 fn insert(&mut self, key: &str) {
173 for i in 0..self.hash_count {
174 let bit_index = self.hash(key, i);
175 let word_index = bit_index / 64;
176 let bit_offset = bit_index % 64;
177 self.bits[word_index] |= 1u64 << bit_offset;
178 }
179 }
180
181 fn contains(&self, key: &str) -> bool {
182 for i in 0..self.hash_count {
183 let bit_index = self.hash(key, i);
184 let word_index = bit_index / 64;
185 let bit_offset = bit_index % 64;
186 if (self.bits[word_index] & (1u64 << bit_offset)) == 0 {
187 return false;
188 }
189 }
190 true
191 }
192
193 fn hash(&self, key: &str, seed: usize) -> usize {
194 let mut hasher = DefaultHasher::new();
195 key.hash(&mut hasher);
196 seed.hash(&mut hasher);
197 (hasher.finish() as usize) % self.size_bits
198 }
199
200 fn clear(&mut self) {
201 for word in &mut self.bits {
202 *word = 0;
203 }
204 }
205}
206
207struct LruEntry {
209 seen_at: Instant,
210 count: u32,
211}
212
213pub struct Deduplicator {
215 config: DeduplicatorConfig,
216 lru: RwLock<LruState>,
217 bloom: RwLock<Option<BloomFilter>>,
218 stats: DeduplicatorStats,
219}
220
221struct LruState {
222 cache: HashMap<String, LruEntry>,
223 order: VecDeque<String>,
224 last_cleanup: Instant,
225}
226
227#[derive(Debug, Default)]
229pub struct DeduplicatorStats {
230 pub events_checked: AtomicU64,
231 pub duplicates_found: AtomicU64,
232 pub bloom_false_positives: AtomicU64,
233 pub lru_hits: AtomicU64,
234 pub lru_misses: AtomicU64,
235 pub cleanups: AtomicU64,
236}
237
238impl DeduplicatorStats {
239 pub fn new() -> Self {
240 Self::default()
241 }
242
243 pub fn snapshot(&self) -> DeduplicatorStatsSnapshot {
244 DeduplicatorStatsSnapshot {
245 events_checked: self.events_checked.load(Ordering::Relaxed),
246 duplicates_found: self.duplicates_found.load(Ordering::Relaxed),
247 bloom_false_positives: self.bloom_false_positives.load(Ordering::Relaxed),
248 lru_hits: self.lru_hits.load(Ordering::Relaxed),
249 lru_misses: self.lru_misses.load(Ordering::Relaxed),
250 cleanups: self.cleanups.load(Ordering::Relaxed),
251 }
252 }
253}
254
255#[derive(Debug, Clone)]
257pub struct DeduplicatorStatsSnapshot {
258 pub events_checked: u64,
259 pub duplicates_found: u64,
260 pub bloom_false_positives: u64,
261 pub lru_hits: u64,
262 pub lru_misses: u64,
263 pub cleanups: u64,
264}
265
266impl DeduplicatorStatsSnapshot {
267 pub fn duplicate_rate(&self) -> f64 {
269 if self.events_checked == 0 {
270 return 0.0;
271 }
272 self.duplicates_found as f64 / self.events_checked as f64
273 }
274
275 pub fn bloom_fp_rate(&self) -> f64 {
277 let true_positives = self
278 .duplicates_found
279 .saturating_sub(self.bloom_false_positives);
280 let total_positives = true_positives + self.bloom_false_positives;
281 if total_positives == 0 {
282 return 0.0;
283 }
284 self.bloom_false_positives as f64 / total_positives as f64
285 }
286}
287
288impl Deduplicator {
289 pub fn new(config: DeduplicatorConfig) -> Self {
291 let bloom = if config.bloom_size_bits > 0 {
292 Some(BloomFilter::new(
293 config.bloom_size_bits,
294 config.bloom_hash_count,
295 ))
296 } else {
297 None
298 };
299
300 Self {
301 lru: RwLock::new(LruState {
302 cache: HashMap::with_capacity(config.lru_capacity),
303 order: VecDeque::with_capacity(config.lru_capacity),
304 last_cleanup: Instant::now(),
305 }),
306 bloom: RwLock::new(bloom),
307 stats: DeduplicatorStats::new(),
308 config,
309 }
310 }
311
312 pub async fn is_duplicate(&self, event: &CdcEvent) -> bool {
314 self.stats.events_checked.fetch_add(1, Ordering::Relaxed);
315 let key = self.config.key_strategy.extract_key(event);
316
317 if let Some(bloom) = self.bloom.read().await.as_ref() {
319 if !bloom.contains(&key) {
320 self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
321 return false; }
323 }
324
325 let lru = self.lru.read().await;
327 if let Some(entry) = lru.cache.get(&key) {
328 if entry.seen_at.elapsed() < self.config.ttl {
329 self.stats.lru_hits.fetch_add(1, Ordering::Relaxed);
330 self.stats.duplicates_found.fetch_add(1, Ordering::Relaxed);
331 return true;
332 }
333 }
334
335 if self.config.bloom_size_bits > 0 {
337 self.stats
338 .bloom_false_positives
339 .fetch_add(1, Ordering::Relaxed);
340 }
341 self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
342 false
343 }
344
345 pub async fn mark_seen(&self, event: &CdcEvent) {
347 let key = self.config.key_strategy.extract_key(event);
348
349 if let Some(bloom) = self.bloom.write().await.as_mut() {
351 bloom.insert(&key);
352 }
353
354 let mut lru = self.lru.write().await;
356
357 if let Some(entry) = lru.cache.get_mut(&key) {
359 entry.seen_at = Instant::now();
360 entry.count += 1;
361 } else {
362 while lru.cache.len() >= self.config.lru_capacity {
364 if let Some(old_key) = lru.order.pop_front() {
365 lru.cache.remove(&old_key);
366 }
367 }
368
369 lru.cache.insert(
370 key.clone(),
371 LruEntry {
372 seen_at: Instant::now(),
373 count: 1,
374 },
375 );
376 lru.order.push_back(key);
377 }
378
379 if lru.last_cleanup.elapsed() > Duration::from_secs(60) {
381 drop(lru);
382 self.cleanup().await;
383 }
384 }
385
386 pub async fn check_and_mark(&self, event: &CdcEvent) -> bool {
388 if self.is_duplicate(event).await {
389 return true;
390 }
391 self.mark_seen(event).await;
392 false
393 }
394
395 pub async fn filter_duplicates(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
397 let mut unique = Vec::with_capacity(events.len());
398 for event in events {
399 if !self.check_and_mark(&event).await {
400 unique.push(event);
401 }
402 }
403 unique
404 }
405
406 pub async fn cleanup(&self) {
408 let mut lru = self.lru.write().await;
409 let now = Instant::now();
410
411 lru.cache
413 .retain(|_, entry| entry.seen_at.elapsed() < self.config.ttl);
414
415 let valid_keys: Vec<_> = lru
417 .order
418 .iter()
419 .filter(|key| lru.cache.contains_key(*key))
420 .cloned()
421 .collect();
422 lru.order = std::collections::VecDeque::from(valid_keys);
423
424 lru.last_cleanup = now;
425 self.stats.cleanups.fetch_add(1, Ordering::Relaxed);
426
427 debug!(
428 "Deduplicator cleanup: {} entries remaining",
429 lru.cache.len()
430 );
431 }
432
433 pub async fn clear(&self) {
435 let mut lru = self.lru.write().await;
436 lru.cache.clear();
437 lru.order.clear();
438
439 if let Some(bloom) = self.bloom.write().await.as_mut() {
440 bloom.clear();
441 }
442 }
443
444 pub fn stats(&self) -> DeduplicatorStatsSnapshot {
446 self.stats.snapshot()
447 }
448
449 pub async fn cache_size(&self) -> usize {
451 self.lru.read().await.cache.len()
452 }
453}
454
455pub mod keys {
457 use super::*;
458
459 pub fn from_columns(columns: Vec<String>) -> impl Fn(&CdcEvent) -> String + Send + Sync {
461 move |event| {
462 let values: Vec<String> = columns
463 .iter()
464 .filter_map(|col| {
465 event
466 .after
467 .as_ref()
468 .and_then(|m| m.get(col))
469 .map(|v| v.to_string())
470 })
471 .collect();
472 format!("{}:{}:{}", event.schema, event.table, values.join(":"))
473 }
474 }
475
476 pub fn with_operation<F>(base: F) -> impl Fn(&CdcEvent) -> String + Send + Sync
478 where
479 F: Fn(&CdcEvent) -> String + Send + Sync,
480 {
481 move |event| format!("{}:{:?}", base(event), event.op)
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use crate::CdcOp;
489
490 fn make_event(table: &str, id: i64, op: CdcOp) -> CdcEvent {
491 CdcEvent {
492 source_type: "postgres".to_string(),
493 database: "testdb".to_string(),
494 schema: "public".to_string(),
495 table: table.to_string(),
496 op,
497 before: None,
498 after: Some(serde_json::json!({ "id": id })),
499 timestamp: chrono::Utc::now().timestamp(),
500 transaction: None,
501 }
502 }
503
504 #[tokio::test]
505 async fn test_deduplicator_basic() {
506 let dedup = Deduplicator::new(DeduplicatorConfig::default());
507
508 let event = make_event("users", 1, CdcOp::Insert);
509
510 assert!(!dedup.is_duplicate(&event).await);
512 dedup.mark_seen(&event).await;
513
514 assert!(dedup.is_duplicate(&event).await);
516 }
517
518 #[tokio::test]
519 async fn test_deduplicator_different_ids() {
520 let dedup = Deduplicator::new(DeduplicatorConfig::default());
521
522 let event1 = make_event("users", 1, CdcOp::Insert);
523 let event2 = make_event("users", 2, CdcOp::Insert);
524
525 dedup.mark_seen(&event1).await;
526
527 assert!(dedup.is_duplicate(&event1).await);
528 assert!(!dedup.is_duplicate(&event2).await);
529 }
530
531 #[tokio::test]
532 async fn test_deduplicator_different_ops() {
533 let dedup = Deduplicator::new(DeduplicatorConfig::default());
534
535 let insert = make_event("users", 1, CdcOp::Insert);
536 let update = make_event("users", 1, CdcOp::Update);
537
538 dedup.mark_seen(&insert).await;
539
540 assert!(dedup.is_duplicate(&insert).await);
541 assert!(!dedup.is_duplicate(&update).await); }
543
544 #[tokio::test]
545 async fn test_check_and_mark() {
546 let dedup = Deduplicator::new(DeduplicatorConfig::default());
547
548 let event = make_event("users", 1, CdcOp::Insert);
549
550 assert!(!dedup.check_and_mark(&event).await);
552
553 assert!(dedup.check_and_mark(&event).await);
555 }
556
557 #[tokio::test]
558 async fn test_filter_duplicates() {
559 let dedup = Deduplicator::new(DeduplicatorConfig::default());
560
561 let events = vec![
562 make_event("users", 1, CdcOp::Insert),
563 make_event("users", 1, CdcOp::Insert), make_event("users", 2, CdcOp::Insert),
565 make_event("users", 2, CdcOp::Insert), make_event("users", 3, CdcOp::Insert),
567 ];
568
569 let unique = dedup.filter_duplicates(events).await;
570 assert_eq!(unique.len(), 3); }
572
573 #[tokio::test]
574 async fn test_deduplicator_stats() {
575 let dedup = Deduplicator::new(DeduplicatorConfig::default());
576
577 let event = make_event("users", 1, CdcOp::Insert);
578 dedup.check_and_mark(&event).await;
579 dedup.check_and_mark(&event).await;
580 dedup.check_and_mark(&event).await;
581
582 let stats = dedup.stats();
583 assert_eq!(stats.events_checked, 3);
584 assert_eq!(stats.duplicates_found, 2);
585 }
586
587 #[tokio::test]
588 async fn test_deduplicator_clear() {
589 let dedup = Deduplicator::new(DeduplicatorConfig::default());
590
591 let event = make_event("users", 1, CdcOp::Insert);
592 dedup.mark_seen(&event).await;
593 assert!(dedup.is_duplicate(&event).await);
594
595 dedup.clear().await;
596 assert!(!dedup.is_duplicate(&event).await);
597 }
598
599 #[tokio::test]
600 async fn test_key_strategy_transaction() {
601 let config = DeduplicatorConfig {
602 key_strategy: KeyStrategy::TransactionPosition,
603 ..Default::default()
604 };
605 let dedup = Deduplicator::new(config);
606
607 let mut event1 = make_event("users", 1, CdcOp::Insert);
608 event1.database = "db1".to_string();
609 event1.timestamp = 1000;
610
611 let mut event2 = make_event("users", 2, CdcOp::Insert);
612 event2.database = "db1".to_string();
613 event2.timestamp = 1000;
614
615 dedup.mark_seen(&event1).await;
617 assert!(dedup.is_duplicate(&event2).await);
618 }
619
620 #[tokio::test]
621 async fn test_bloom_filter() {
622 let config = DeduplicatorConfig {
623 lru_capacity: 100,
624 bloom_size_bits: 10_000,
625 bloom_hash_count: 5,
626 ..Default::default()
627 };
628 let dedup = Deduplicator::new(config);
629
630 for i in 0..50 {
632 let event = make_event("users", i, CdcOp::Insert);
633 dedup.mark_seen(&event).await;
634 }
635
636 for i in 0..50 {
638 let event = make_event("users", i, CdcOp::Insert);
639 assert!(dedup.is_duplicate(&event).await);
640 }
641
642 let new_event = make_event("users", 999, CdcOp::Insert);
644 assert!(!dedup.is_duplicate(&new_event).await);
645 }
646
647 #[tokio::test]
648 async fn test_lru_eviction() {
649 let config = DeduplicatorConfig {
650 lru_capacity: 10,
651 bloom_size_bits: 0, ..Default::default()
653 };
654 let dedup = Deduplicator::new(config);
655
656 for i in 0..20 {
658 let event = make_event("users", i, CdcOp::Insert);
659 dedup.mark_seen(&event).await;
660 }
661
662 assert_eq!(dedup.cache_size().await, 10);
664
665 let recent = make_event("users", 19, CdcOp::Insert);
667 assert!(dedup.is_duplicate(&recent).await);
668
669 let old = make_event("users", 0, CdcOp::Insert);
671 assert!(!dedup.is_duplicate(&old).await);
672 }
673
674 #[test]
675 fn test_duplicate_rate() {
676 let stats = DeduplicatorStatsSnapshot {
677 events_checked: 100,
678 duplicates_found: 25,
679 bloom_false_positives: 5,
680 lru_hits: 20,
681 lru_misses: 80,
682 cleanups: 1,
683 };
684
685 assert!((stats.duplicate_rate() - 0.25).abs() < 0.001);
686 }
687
688 #[test]
689 fn test_config_presets() {
690 let exact = DeduplicatorConfig::exact();
691 assert_eq!(exact.bloom_size_bits, 0);
692 assert_eq!(exact.lru_capacity, 1_000_000);
693
694 let compact = DeduplicatorConfig::compact();
695 assert_eq!(compact.bloom_size_bits, 10_000_000);
696 assert_eq!(compact.lru_capacity, 10_000);
697 }
698}