Skip to main content

ferro_whatsapp/
dedup.rs

1use crate::Error;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::task::AbortHandle;
7use tokio::time::sleep;
8
9/// TTL for deduplication entries. Covers all Meta retry windows.
10const DEDUP_TTL: Duration = Duration::from_secs(300);
11
12/// Pluggable store for wamid-based message deduplication.
13///
14/// Implementations must be thread-safe. The `check_and_insert` method is
15/// the single entry point: it returns `true` if the wamid was already seen
16/// (duplicate) and `false` if it is new.
17#[async_trait]
18pub trait DeduplicationStore: Send + Sync {
19    /// Checks whether `wamid` has been seen before and records it if not.
20    ///
21    /// Returns `true` if the wamid is a duplicate (already seen).
22    /// Returns `false` if it is new (first time seen).
23    async fn check_and_insert(&self, wamid: &str) -> Result<bool, Error>;
24}
25
26/// In-memory deduplication store backed by a [`DashMap`].
27///
28/// Each entry carries an [`AbortHandle`] for its 5-minute TTL expiry task.
29/// DashMap guards are **never** held across `.await` points.
30pub struct InMemoryDeduplicationStore {
31    inner: Arc<DashMap<String, AbortHandle>>,
32}
33
34impl InMemoryDeduplicationStore {
35    /// Create a new store.
36    pub fn new() -> Self {
37        Self {
38            inner: Arc::new(DashMap::new()),
39        }
40    }
41}
42
43impl Default for InMemoryDeduplicationStore {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49#[async_trait]
50impl DeduplicationStore for InMemoryDeduplicationStore {
51    async fn check_and_insert(&self, wamid: &str) -> Result<bool, Error> {
52        // Check existence without holding guard across .await
53        if self.inner.contains_key(wamid) {
54            tracing::debug!("duplicate wamid: {wamid}");
55            return Ok(true);
56        }
57
58        // Not a duplicate — insert with TTL timer
59        let store = Arc::clone(&self.inner);
60        let key_owned = wamid.to_string();
61
62        let abort_handle = tokio::spawn(async move {
63            sleep(DEDUP_TTL).await;
64            store.remove(&key_owned);
65        })
66        .abort_handle();
67
68        self.inner.insert(wamid.to_string(), abort_handle);
69        Ok(false)
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    #[tokio::test]
78    async fn dedup_first_insert() {
79        let store = InMemoryDeduplicationStore::new();
80        let result = store.check_and_insert("wamid.123").await.unwrap();
81        assert!(!result, "first insert must return false (not duplicate)");
82    }
83
84    #[tokio::test]
85    async fn dedup_duplicate() {
86        let store = InMemoryDeduplicationStore::new();
87        store.check_and_insert("wamid.abc").await.unwrap();
88        let result = store.check_and_insert("wamid.abc").await.unwrap();
89        assert!(
90            result,
91            "second insert of same wamid must return true (duplicate)"
92        );
93    }
94
95    #[tokio::test]
96    async fn dedup_different_wamids() {
97        let store = InMemoryDeduplicationStore::new();
98        let r1 = store.check_and_insert("wamid.001").await.unwrap();
99        let r2 = store.check_and_insert("wamid.002").await.unwrap();
100        assert!(!r1, "first wamid must not be duplicate");
101        assert!(!r2, "second different wamid must not be duplicate");
102    }
103
104    #[tokio::test(start_paused = true)]
105    async fn dedup_ttl_expiry() {
106        let store = InMemoryDeduplicationStore::new();
107        store.check_and_insert("wamid.ttl").await.unwrap();
108
109        // Yield so the spawned TTL task registers its sleep timer
110        tokio::task::yield_now().await;
111
112        // Not expired yet
113        let is_dup = store.check_and_insert("wamid.ttl").await.unwrap();
114        assert!(is_dup, "should still be a duplicate before TTL");
115
116        // Remove the second insert's timer too (it also spawned one)
117        tokio::task::yield_now().await;
118
119        // Advance past the TTL
120        tokio::time::advance(Duration::from_secs(301)).await;
121
122        // Allow spawned tasks to complete
123        for _ in 0..5 {
124            tokio::task::yield_now().await;
125        }
126
127        // Should no longer be a duplicate
128        let after_expiry = store.check_and_insert("wamid.ttl").await.unwrap();
129        assert!(
130            !after_expiry,
131            "after TTL expiry must return false (not duplicate)"
132        );
133    }
134}