1use crate::Error;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::task::AbortHandle;
7use tokio::time::sleep;
8
9const DEDUP_TTL: Duration = Duration::from_secs(300);
11
12#[async_trait]
18pub trait DeduplicationStore: Send + Sync {
19 async fn check_and_insert(&self, wamid: &str) -> Result<bool, Error>;
24}
25
26pub struct InMemoryDeduplicationStore {
31 inner: Arc<DashMap<String, AbortHandle>>,
32}
33
34impl InMemoryDeduplicationStore {
35 pub fn new() -> Self {
37 Self {
38 inner: Arc::new(DashMap::new()),
39 }
40 }
41}
42
43impl Default for InMemoryDeduplicationStore {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49#[async_trait]
50impl DeduplicationStore for InMemoryDeduplicationStore {
51 async fn check_and_insert(&self, wamid: &str) -> Result<bool, Error> {
52 if self.inner.contains_key(wamid) {
54 tracing::debug!("duplicate wamid: {wamid}");
55 return Ok(true);
56 }
57
58 let store = Arc::clone(&self.inner);
60 let key_owned = wamid.to_string();
61
62 let abort_handle = tokio::spawn(async move {
63 sleep(DEDUP_TTL).await;
64 store.remove(&key_owned);
65 })
66 .abort_handle();
67
68 self.inner.insert(wamid.to_string(), abort_handle);
69 Ok(false)
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76
77 #[tokio::test]
78 async fn dedup_first_insert() {
79 let store = InMemoryDeduplicationStore::new();
80 let result = store.check_and_insert("wamid.123").await.unwrap();
81 assert!(!result, "first insert must return false (not duplicate)");
82 }
83
84 #[tokio::test]
85 async fn dedup_duplicate() {
86 let store = InMemoryDeduplicationStore::new();
87 store.check_and_insert("wamid.abc").await.unwrap();
88 let result = store.check_and_insert("wamid.abc").await.unwrap();
89 assert!(
90 result,
91 "second insert of same wamid must return true (duplicate)"
92 );
93 }
94
95 #[tokio::test]
96 async fn dedup_different_wamids() {
97 let store = InMemoryDeduplicationStore::new();
98 let r1 = store.check_and_insert("wamid.001").await.unwrap();
99 let r2 = store.check_and_insert("wamid.002").await.unwrap();
100 assert!(!r1, "first wamid must not be duplicate");
101 assert!(!r2, "second different wamid must not be duplicate");
102 }
103
104 #[tokio::test(start_paused = true)]
105 async fn dedup_ttl_expiry() {
106 let store = InMemoryDeduplicationStore::new();
107 store.check_and_insert("wamid.ttl").await.unwrap();
108
109 tokio::task::yield_now().await;
111
112 let is_dup = store.check_and_insert("wamid.ttl").await.unwrap();
114 assert!(is_dup, "should still be a duplicate before TTL");
115
116 tokio::task::yield_now().await;
118
119 tokio::time::advance(Duration::from_secs(301)).await;
121
122 for _ in 0..5 {
124 tokio::task::yield_now().await;
125 }
126
127 let after_expiry = store.check_and_insert("wamid.ttl").await.unwrap();
129 assert!(
130 !after_expiry,
131 "after TTL expiry must return false (not duplicate)"
132 );
133 }
134}