use crate::common::CdcEvent;
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, VecDeque};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::debug;
#[derive(Debug, Clone)]
pub struct DeduplicatorConfig {
pub lru_capacity: usize,
pub bloom_size_bits: usize,
pub bloom_hash_count: usize,
pub ttl: Duration,
pub key_strategy: KeyStrategy,
}
impl Default for DeduplicatorConfig {
fn default() -> Self {
Self {
lru_capacity: 100_000,
bloom_size_bits: 1_000_000, bloom_hash_count: 7,
ttl: Duration::from_secs(3600), key_strategy: KeyStrategy::TableAndPrimaryKey,
}
}
}
impl DeduplicatorConfig {
pub fn exact() -> Self {
Self {
lru_capacity: 1_000_000,
bloom_size_bits: 0, ttl: Duration::from_secs(7200),
..Default::default()
}
}
pub fn compact() -> Self {
Self {
lru_capacity: 10_000,
bloom_size_bits: 10_000_000, bloom_hash_count: 10,
ttl: Duration::from_secs(1800),
..Default::default()
}
}
}
#[derive(Debug, Clone)]
pub enum KeyStrategy {
TableAndPrimaryKey,
TableAndAllColumns,
TransactionPosition,
Custom(fn(&CdcEvent) -> String),
}
impl KeyStrategy {
pub fn extract_key(&self, event: &CdcEvent) -> String {
match self {
KeyStrategy::TableAndPrimaryKey => {
let pk = event
.after
.as_ref()
.and_then(|m| m.get("id"))
.map(|v| v.to_string())
.unwrap_or_else(|| {
event
.after
.as_ref()
.map(|m| {
let mut h = DefaultHasher::new();
if let Some(obj) = m.as_object() {
for (k, v) in obj {
k.hash(&mut h);
v.to_string().hash(&mut h);
}
}
h.finish().to_string()
})
.unwrap_or_default()
});
format!("{}:{}:{}:{:?}", event.schema, event.table, pk, event.op)
}
KeyStrategy::TableAndAllColumns => {
let mut hasher = DefaultHasher::new();
event.schema.hash(&mut hasher);
event.table.hash(&mut hasher);
format!("{:?}", event.op).hash(&mut hasher);
if let Some(after) = &event.after {
if let Some(obj) = after.as_object() {
for (k, v) in obj {
k.hash(&mut hasher);
v.to_string().hash(&mut hasher);
}
}
}
hasher.finish().to_string()
}
KeyStrategy::TransactionPosition => {
format!("{}:{}:{}", event.database, event.table, event.timestamp)
}
KeyStrategy::Custom(f) => f(event),
}
}
}
struct BloomFilter {
bits: Vec<u64>,
size_bits: usize,
hash_count: usize,
}
impl BloomFilter {
fn new(size_bits: usize, hash_count: usize) -> Self {
let num_words = size_bits.div_ceil(64);
Self {
bits: vec![0u64; num_words],
size_bits,
hash_count,
}
}
fn insert(&mut self, key: &str) {
for i in 0..self.hash_count {
let bit_index = self.hash(key, i);
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
self.bits[word_index] |= 1u64 << bit_offset;
}
}
fn contains(&self, key: &str) -> bool {
for i in 0..self.hash_count {
let bit_index = self.hash(key, i);
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
if (self.bits[word_index] & (1u64 << bit_offset)) == 0 {
return false;
}
}
true
}
fn hash(&self, key: &str, seed: usize) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
seed.hash(&mut hasher);
(hasher.finish() as usize) % self.size_bits
}
fn clear(&mut self) {
for word in &mut self.bits {
*word = 0;
}
}
}
struct LruEntry {
seen_at: Instant,
count: u32,
}
pub struct Deduplicator {
config: DeduplicatorConfig,
lru: RwLock<LruState>,
bloom: RwLock<Option<BloomFilter>>,
stats: DeduplicatorStats,
}
struct LruState {
cache: HashMap<String, LruEntry>,
order: VecDeque<String>,
last_cleanup: Instant,
}
#[derive(Debug, Default)]
pub struct DeduplicatorStats {
pub events_checked: AtomicU64,
pub duplicates_found: AtomicU64,
pub bloom_false_positives: AtomicU64,
pub lru_hits: AtomicU64,
pub lru_misses: AtomicU64,
pub cleanups: AtomicU64,
}
impl DeduplicatorStats {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> DeduplicatorStatsSnapshot {
DeduplicatorStatsSnapshot {
events_checked: self.events_checked.load(Ordering::Relaxed),
duplicates_found: self.duplicates_found.load(Ordering::Relaxed),
bloom_false_positives: self.bloom_false_positives.load(Ordering::Relaxed),
lru_hits: self.lru_hits.load(Ordering::Relaxed),
lru_misses: self.lru_misses.load(Ordering::Relaxed),
cleanups: self.cleanups.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct DeduplicatorStatsSnapshot {
pub events_checked: u64,
pub duplicates_found: u64,
pub bloom_false_positives: u64,
pub lru_hits: u64,
pub lru_misses: u64,
pub cleanups: u64,
}
impl DeduplicatorStatsSnapshot {
pub fn duplicate_rate(&self) -> f64 {
if self.events_checked == 0 {
return 0.0;
}
self.duplicates_found as f64 / self.events_checked as f64
}
pub fn bloom_fp_rate(&self) -> f64 {
let true_positives = self
.duplicates_found
.saturating_sub(self.bloom_false_positives);
let total_positives = true_positives + self.bloom_false_positives;
if total_positives == 0 {
return 0.0;
}
self.bloom_false_positives as f64 / total_positives as f64
}
}
impl Deduplicator {
pub fn new(config: DeduplicatorConfig) -> Self {
let bloom = if config.bloom_size_bits > 0 {
Some(BloomFilter::new(
config.bloom_size_bits,
config.bloom_hash_count,
))
} else {
None
};
Self {
lru: RwLock::new(LruState {
cache: HashMap::with_capacity(config.lru_capacity),
order: VecDeque::with_capacity(config.lru_capacity),
last_cleanup: Instant::now(),
}),
bloom: RwLock::new(bloom),
stats: DeduplicatorStats::new(),
config,
}
}
pub async fn is_duplicate(&self, event: &CdcEvent) -> bool {
self.stats.events_checked.fetch_add(1, Ordering::Relaxed);
let key = self.config.key_strategy.extract_key(event);
if let Some(bloom) = self.bloom.read().await.as_ref() {
if !bloom.contains(&key) {
self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
return false; }
}
let lru = self.lru.read().await;
if let Some(entry) = lru.cache.get(&key) {
if entry.seen_at.elapsed() < self.config.ttl {
self.stats.lru_hits.fetch_add(1, Ordering::Relaxed);
self.stats.duplicates_found.fetch_add(1, Ordering::Relaxed);
return true;
}
}
if self.config.bloom_size_bits > 0 {
self.stats
.bloom_false_positives
.fetch_add(1, Ordering::Relaxed);
}
self.stats.lru_misses.fetch_add(1, Ordering::Relaxed);
false
}
pub async fn mark_seen(&self, event: &CdcEvent) {
let key = self.config.key_strategy.extract_key(event);
if let Some(bloom) = self.bloom.write().await.as_mut() {
bloom.insert(&key);
}
let mut lru = self.lru.write().await;
if let Some(entry) = lru.cache.get_mut(&key) {
entry.seen_at = Instant::now();
entry.count += 1;
} else {
while lru.cache.len() >= self.config.lru_capacity {
if let Some(old_key) = lru.order.pop_front() {
lru.cache.remove(&old_key);
}
}
lru.cache.insert(
key.clone(),
LruEntry {
seen_at: Instant::now(),
count: 1,
},
);
lru.order.push_back(key);
}
if lru.last_cleanup.elapsed() > Duration::from_secs(60) {
drop(lru);
self.cleanup().await;
}
}
pub async fn check_and_mark(&self, event: &CdcEvent) -> bool {
if self.is_duplicate(event).await {
return true;
}
self.mark_seen(event).await;
false
}
pub async fn filter_duplicates(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
let mut unique = Vec::with_capacity(events.len());
for event in events {
if !self.check_and_mark(&event).await {
unique.push(event);
}
}
unique
}
pub async fn cleanup(&self) {
let mut lru = self.lru.write().await;
let now = Instant::now();
lru.cache
.retain(|_, entry| entry.seen_at.elapsed() < self.config.ttl);
let valid_keys: Vec<_> = lru
.order
.iter()
.filter(|key| lru.cache.contains_key(*key))
.cloned()
.collect();
lru.order = std::collections::VecDeque::from(valid_keys);
lru.last_cleanup = now;
self.stats.cleanups.fetch_add(1, Ordering::Relaxed);
debug!(
"Deduplicator cleanup: {} entries remaining",
lru.cache.len()
);
}
pub async fn clear(&self) {
let mut lru = self.lru.write().await;
lru.cache.clear();
lru.order.clear();
if let Some(bloom) = self.bloom.write().await.as_mut() {
bloom.clear();
}
}
pub fn stats(&self) -> DeduplicatorStatsSnapshot {
self.stats.snapshot()
}
pub async fn cache_size(&self) -> usize {
self.lru.read().await.cache.len()
}
}
pub mod keys {
use super::*;
pub fn from_columns(columns: Vec<String>) -> impl Fn(&CdcEvent) -> String + Send + Sync {
move |event| {
let values: Vec<String> = columns
.iter()
.filter_map(|col| {
event
.after
.as_ref()
.and_then(|m| m.get(col))
.map(|v| v.to_string())
})
.collect();
format!("{}:{}:{}", event.schema, event.table, values.join(":"))
}
}
pub fn with_operation<F>(base: F) -> impl Fn(&CdcEvent) -> String + Send + Sync
where
F: Fn(&CdcEvent) -> String + Send + Sync,
{
move |event| format!("{}:{:?}", base(event), event.op)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CdcOp;
fn make_event(table: &str, id: i64, op: CdcOp) -> CdcEvent {
CdcEvent {
source_type: "postgres".to_string(),
database: "testdb".to_string(),
schema: "public".to_string(),
table: table.to_string(),
op,
before: None,
after: Some(serde_json::json!({ "id": id })),
timestamp: chrono::Utc::now().timestamp(),
transaction: None,
}
}
#[tokio::test]
async fn test_deduplicator_basic() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let event = make_event("users", 1, CdcOp::Insert);
assert!(!dedup.is_duplicate(&event).await);
dedup.mark_seen(&event).await;
assert!(dedup.is_duplicate(&event).await);
}
#[tokio::test]
async fn test_deduplicator_different_ids() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let event1 = make_event("users", 1, CdcOp::Insert);
let event2 = make_event("users", 2, CdcOp::Insert);
dedup.mark_seen(&event1).await;
assert!(dedup.is_duplicate(&event1).await);
assert!(!dedup.is_duplicate(&event2).await);
}
#[tokio::test]
async fn test_deduplicator_different_ops() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let insert = make_event("users", 1, CdcOp::Insert);
let update = make_event("users", 1, CdcOp::Update);
dedup.mark_seen(&insert).await;
assert!(dedup.is_duplicate(&insert).await);
assert!(!dedup.is_duplicate(&update).await); }
#[tokio::test]
async fn test_check_and_mark() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let event = make_event("users", 1, CdcOp::Insert);
assert!(!dedup.check_and_mark(&event).await);
assert!(dedup.check_and_mark(&event).await);
}
#[tokio::test]
async fn test_filter_duplicates() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let events = vec![
make_event("users", 1, CdcOp::Insert),
make_event("users", 1, CdcOp::Insert), make_event("users", 2, CdcOp::Insert),
make_event("users", 2, CdcOp::Insert), make_event("users", 3, CdcOp::Insert),
];
let unique = dedup.filter_duplicates(events).await;
assert_eq!(unique.len(), 3); }
#[tokio::test]
async fn test_deduplicator_stats() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let event = make_event("users", 1, CdcOp::Insert);
dedup.check_and_mark(&event).await;
dedup.check_and_mark(&event).await;
dedup.check_and_mark(&event).await;
let stats = dedup.stats();
assert_eq!(stats.events_checked, 3);
assert_eq!(stats.duplicates_found, 2);
}
#[tokio::test]
async fn test_deduplicator_clear() {
let dedup = Deduplicator::new(DeduplicatorConfig::default());
let event = make_event("users", 1, CdcOp::Insert);
dedup.mark_seen(&event).await;
assert!(dedup.is_duplicate(&event).await);
dedup.clear().await;
assert!(!dedup.is_duplicate(&event).await);
}
#[tokio::test]
async fn test_key_strategy_transaction() {
let config = DeduplicatorConfig {
key_strategy: KeyStrategy::TransactionPosition,
..Default::default()
};
let dedup = Deduplicator::new(config);
let mut event1 = make_event("users", 1, CdcOp::Insert);
event1.database = "db1".to_string();
event1.timestamp = 1000;
let mut event2 = make_event("users", 2, CdcOp::Insert);
event2.database = "db1".to_string();
event2.timestamp = 1000;
dedup.mark_seen(&event1).await;
assert!(dedup.is_duplicate(&event2).await);
}
#[tokio::test]
async fn test_bloom_filter() {
let config = DeduplicatorConfig {
lru_capacity: 100,
bloom_size_bits: 10_000,
bloom_hash_count: 5,
..Default::default()
};
let dedup = Deduplicator::new(config);
for i in 0..50 {
let event = make_event("users", i, CdcOp::Insert);
dedup.mark_seen(&event).await;
}
for i in 0..50 {
let event = make_event("users", i, CdcOp::Insert);
assert!(dedup.is_duplicate(&event).await);
}
let new_event = make_event("users", 999, CdcOp::Insert);
assert!(!dedup.is_duplicate(&new_event).await);
}
#[tokio::test]
async fn test_lru_eviction() {
let config = DeduplicatorConfig {
lru_capacity: 10,
bloom_size_bits: 0, ..Default::default()
};
let dedup = Deduplicator::new(config);
for i in 0..20 {
let event = make_event("users", i, CdcOp::Insert);
dedup.mark_seen(&event).await;
}
assert_eq!(dedup.cache_size().await, 10);
let recent = make_event("users", 19, CdcOp::Insert);
assert!(dedup.is_duplicate(&recent).await);
let old = make_event("users", 0, CdcOp::Insert);
assert!(!dedup.is_duplicate(&old).await);
}
#[test]
fn test_duplicate_rate() {
let stats = DeduplicatorStatsSnapshot {
events_checked: 100,
duplicates_found: 25,
bloom_false_positives: 5,
lru_hits: 20,
lru_misses: 80,
cleanups: 1,
};
assert!((stats.duplicate_rate() - 0.25).abs() < 0.001);
}
#[test]
fn test_config_presets() {
let exact = DeduplicatorConfig::exact();
assert_eq!(exact.bloom_size_bits, 0);
assert_eq!(exact.lru_capacity, 1_000_000);
let compact = DeduplicatorConfig::compact();
assert_eq!(compact.bloom_size_bits, 10_000_000);
assert_eq!(compact.lru_capacity, 10_000);
}
}