use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
pub const DEFAULT_TTL: Duration = Duration::from_secs(3600);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DedupKey(String);
impl DedupKey {
pub fn new(tenant_id: &str, lead_id: &str, kind: &str, at_ms: i64) -> Self {
let bucket = at_ms / 60_000;
Self(format!("{tenant_id}:{lead_id}:{kind}:{bucket}"))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug)]
pub struct DedupCache {
backend: Backend,
ttl: Duration,
}
#[derive(Debug)]
enum Backend {
Memory(Mutex<HashMap<DedupKey, Instant>>),
#[cfg(feature = "dedup-sled")]
Sled(sled::Db),
}
impl Default for DedupCache {
fn default() -> Self {
Self::with_ttl(DEFAULT_TTL)
}
}
impl DedupCache {
pub fn new() -> Self {
Self::default()
}
pub fn with_ttl(ttl: Duration) -> Self {
Self {
backend: Backend::Memory(Mutex::new(HashMap::new())),
ttl,
}
}
#[cfg(feature = "dedup-sled")]
pub fn with_sled(
path: impl AsRef<std::path::Path>,
ttl: Duration,
) -> Result<Self, sled::Error> {
let db = sled::open(path)?;
Ok(Self {
backend: Backend::Sled(db),
ttl,
})
}
pub fn is_duplicate(&self, key: &DedupKey) -> bool {
match &self.backend {
Backend::Memory(inner) => {
let now = Instant::now();
let mut map = inner.lock().expect("notification dedup poisoned");
map.retain(|_, t| now.duration_since(*t) < self.ttl);
match map.get(key) {
Some(_) => true,
None => {
map.insert(key.clone(), now);
false
}
}
}
#[cfg(feature = "dedup-sled")]
Backend::Sled(db) => sled_is_duplicate(db, key, self.ttl),
}
}
pub fn len(&self) -> usize {
match &self.backend {
Backend::Memory(inner) => inner.lock().map(|m| m.len()).unwrap_or(0),
#[cfg(feature = "dedup-sled")]
Backend::Sled(db) => db.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(feature = "dedup-sled")]
fn sled_is_duplicate(db: &sled::Db, key: &DedupKey, ttl: Duration) -> bool {
use std::time::{SystemTime, UNIX_EPOCH};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let ttl_ms = ttl.as_millis() as u64;
let key_bytes = key.as_str().as_bytes();
match db.get(key_bytes) {
Ok(Some(value)) => {
let stamp_ms = if value.len() >= 8 {
let mut buf = [0u8; 8];
buf.copy_from_slice(&value[..8]);
u64::from_le_bytes(buf)
} else {
0
};
if now_ms.saturating_sub(stamp_ms) < ttl_ms {
true
} else {
let _ = db.insert(key_bytes, &now_ms.to_le_bytes());
let _ = db.flush();
false
}
}
Ok(None) => {
let _ = db.insert(key_bytes, &now_ms.to_le_bytes());
let _ = db.flush();
false
}
Err(e) => {
tracing::warn!(
target: "marketing.notification_dedup",
error = %e, key = %key.as_str(),
"sled read failed (failing open — duplicate may pass through)"
);
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn k(lead: &str) -> DedupKey {
DedupKey::new("acme", lead, "lead_created", 1_700_000_000_000)
}
#[test]
fn first_call_returns_false_records_entry() {
let cache = DedupCache::new();
assert!(!cache.is_duplicate(&k("l-1")));
assert_eq!(cache.len(), 1);
}
#[test]
fn second_call_with_same_key_returns_true() {
let cache = DedupCache::new();
assert!(!cache.is_duplicate(&k("l-1")));
assert!(cache.is_duplicate(&k("l-1")));
assert_eq!(cache.len(), 1);
}
#[test]
fn distinct_lead_ids_are_independent() {
let cache = DedupCache::new();
assert!(!cache.is_duplicate(&k("l-1")));
assert!(!cache.is_duplicate(&k("l-2")));
assert!(cache.is_duplicate(&k("l-1")));
assert!(cache.is_duplicate(&k("l-2")));
assert_eq!(cache.len(), 2);
}
#[test]
fn distinct_kinds_are_independent() {
let cache = DedupCache::new();
let k_created = DedupKey::new("acme", "l-1", "lead_created", 0);
let k_replied = DedupKey::new("acme", "l-1", "lead_replied", 0);
assert!(!cache.is_duplicate(&k_created));
assert!(!cache.is_duplicate(&k_replied));
assert!(cache.is_duplicate(&k_created));
assert!(cache.is_duplicate(&k_replied));
}
#[test]
fn distinct_minute_buckets_are_independent() {
let cache = DedupCache::new();
let k_a = DedupKey::new("acme", "l-1", "lead_created", 0);
let k_b = DedupKey::new("acme", "l-1", "lead_created", 5 * 60_000);
assert!(!cache.is_duplicate(&k_a));
assert!(!cache.is_duplicate(&k_b));
assert_eq!(cache.len(), 2);
}
#[test]
fn cross_tenant_keys_are_independent() {
let cache = DedupCache::new();
let acme = DedupKey::new("acme", "l-1", "lead_created", 0);
let globex = DedupKey::new("globex", "l-1", "lead_created", 0);
assert!(!cache.is_duplicate(&acme));
assert!(!cache.is_duplicate(&globex));
assert!(cache.is_duplicate(&acme));
assert!(cache.is_duplicate(&globex));
}
#[test]
fn ttl_expired_entries_are_evicted_on_next_call() {
let cache = DedupCache::with_ttl(Duration::from_millis(50));
assert!(!cache.is_duplicate(&k("l-1")));
assert_eq!(cache.len(), 1);
std::thread::sleep(Duration::from_millis(60));
assert!(!cache.is_duplicate(&k("l-1")));
assert_eq!(cache.len(), 1, "stale entry should be replaced not stacked");
}
#[test]
fn dedup_key_minute_bucket_collapses_30s_apart_events() {
let a = DedupKey::new("acme", "l-1", "lead_created", 0);
let b = DedupKey::new("acme", "l-1", "lead_created", 30 * 1000);
assert_eq!(a, b);
}
#[cfg(feature = "dedup-sled")]
mod sled_backend {
use super::*;
fn fresh_sled_cache(ttl: Duration) -> (DedupCache, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let cache = DedupCache::with_sled(dir.path(), ttl).unwrap();
(cache, dir)
}
#[test]
fn first_call_returns_false_records_entry() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_secs(60));
assert!(!cache.is_duplicate(&k("l-1")));
assert_eq!(cache.len(), 1);
}
#[test]
fn second_call_with_same_key_returns_true() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_secs(60));
assert!(!cache.is_duplicate(&k("l-1")));
assert!(cache.is_duplicate(&k("l-1")));
}
#[test]
fn distinct_keys_independent() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_secs(60));
assert!(!cache.is_duplicate(&k("l-1")));
assert!(!cache.is_duplicate(&k("l-2")));
assert!(cache.is_duplicate(&k("l-1")));
assert!(cache.is_duplicate(&k("l-2")));
}
#[test]
fn ttl_expired_entry_treated_as_fresh() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_millis(50));
assert!(!cache.is_duplicate(&k("l-1")));
std::thread::sleep(Duration::from_millis(60));
assert!(!cache.is_duplicate(&k("l-1")));
}
#[test]
fn cross_restart_dedup_survives_reopen() {
let dir = tempfile::tempdir().unwrap();
{
let cache = DedupCache::with_sled(dir.path(), Duration::from_secs(3600)).unwrap();
assert!(!cache.is_duplicate(&k("l-1")));
}
let cache = DedupCache::with_sled(dir.path(), Duration::from_secs(3600)).unwrap();
assert!(
cache.is_duplicate(&k("l-1")),
"sled-backed cache must persist across restarts"
);
}
#[test]
fn cross_restart_eviction_after_ttl() {
let dir = tempfile::tempdir().unwrap();
{
let cache = DedupCache::with_sled(dir.path(), Duration::from_millis(50)).unwrap();
assert!(!cache.is_duplicate(&k("l-1")));
}
std::thread::sleep(Duration::from_millis(60));
let cache = DedupCache::with_sled(dir.path(), Duration::from_millis(50)).unwrap();
assert!(
!cache.is_duplicate(&k("l-1")),
"stale row must be treated as fresh after restart"
);
}
#[test]
fn sled_len_reports_persisted_rows() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_secs(60));
for lead in &["l-1", "l-2", "l-3"] {
cache.is_duplicate(&k(lead));
}
assert_eq!(cache.len(), 3);
}
#[test]
fn cross_tenant_keys_independent_in_sled() {
let (cache, _tmp) = fresh_sled_cache(Duration::from_secs(60));
let acme = DedupKey::new("acme", "l-1", "lead_created", 0);
let globex = DedupKey::new("globex", "l-1", "lead_created", 0);
assert!(!cache.is_duplicate(&acme));
assert!(!cache.is_duplicate(&globex));
assert!(cache.is_duplicate(&acme));
assert!(cache.is_duplicate(&globex));
}
}
}