Skip to main content

this/events/operators/
deduplicate.rs

1//! Deduplicate operator — eliminates duplicate events within a sliding window
2//!
3//! Uses a bounded HashSet with time-based expiration. The deduplication key
4//! is read from a context variable (e.g., `source_id`).
5//!
6//! ```yaml
7//! - deduplicate:
8//!     key: source_id
9//!     window: 1h
10//! ```
11
12use crate::config::events::DeduplicateConfig;
13use crate::events::context::FlowContext;
14use crate::events::operators::{OpResult, PipelineOperator};
15use anyhow::{Result, anyhow};
16use async_trait::async_trait;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::RwLock;
21
22/// Compiled deduplicate operator
23#[derive(Debug)]
24pub struct DeduplicateOp {
25    /// Field name in context to use as the dedup key
26    key: String,
27
28    /// Sliding window duration
29    window: Duration,
30
31    /// Set of seen keys with their insertion timestamp
32    seen: Arc<RwLock<HashMap<String, Instant>>>,
33}
34
35impl DeduplicateOp {
36    /// Create a DeduplicateOp from a DeduplicateConfig
37    pub fn from_config(config: &DeduplicateConfig) -> Result<Self> {
38        let window = parse_duration(&config.window)?;
39        Ok(Self {
40            key: config.key.clone(),
41            window,
42            seen: Arc::new(RwLock::new(HashMap::new())),
43        })
44    }
45
46    /// Create with a specific window (for testing)
47    #[cfg(test)]
48    fn with_window(key: &str, window: Duration) -> Self {
49        Self {
50            key: key.to_string(),
51            window,
52            seen: Arc::new(RwLock::new(HashMap::new())),
53        }
54    }
55}
56
57#[async_trait]
58impl PipelineOperator for DeduplicateOp {
59    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
60        // Read the dedup key from context
61        let key_value = ctx
62            .get_var(&self.key)
63            .ok_or_else(|| anyhow!("deduplicate: variable '{}' not found in context", self.key))?
64            .clone();
65
66        let key_str = value_to_string(&key_value);
67        let now = Instant::now();
68
69        let mut seen = self.seen.write().await;
70
71        // Clean expired entries
72        seen.retain(|_, ts| now.duration_since(*ts) < self.window);
73
74        // Check if already seen
75        if seen.contains_key(&key_str) {
76            return Ok(OpResult::Drop);
77        }
78
79        // Mark as seen
80        seen.insert(key_str, now);
81        Ok(OpResult::Continue)
82    }
83
84    fn name(&self) -> &str {
85        "deduplicate"
86    }
87}
88
89/// Convert a JSON value to a string key for deduplication
90fn value_to_string(value: &serde_json::Value) -> String {
91    match value {
92        serde_json::Value::String(s) => s.clone(),
93        serde_json::Value::Number(n) => n.to_string(),
94        serde_json::Value::Bool(b) => b.to_string(),
95        serde_json::Value::Null => "null".to_string(),
96        other => other.to_string(),
97    }
98}
99
100/// Parse a duration string like "5m", "1h", "30s", "100ms"
101pub(crate) fn parse_duration(s: &str) -> Result<Duration> {
102    let s = s.trim();
103
104    if let Some(ms) = s.strip_suffix("ms") {
105        let n: u64 = ms
106            .parse()
107            .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
108        return Ok(Duration::from_millis(n));
109    }
110
111    if let Some(secs) = s.strip_suffix('s') {
112        let n: u64 = secs
113            .parse()
114            .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
115        return Ok(Duration::from_secs(n));
116    }
117
118    if let Some(mins) = s.strip_suffix('m') {
119        let n: u64 = mins
120            .parse()
121            .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
122        return Ok(Duration::from_secs(n * 60));
123    }
124
125    if let Some(hours) = s.strip_suffix('h') {
126        let n: u64 = hours
127            .parse()
128            .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
129        return Ok(Duration::from_secs(n * 3600));
130    }
131
132    Err(anyhow!(
133        "invalid duration '{}': expected format like '5m', '1h', '30s', '100ms'",
134        s
135    ))
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::core::events::{EntityEvent, FrameworkEvent};
142    use crate::core::service::LinkService;
143    use serde_json::json;
144    use std::collections::HashMap as StdHashMap;
145    use std::sync::Arc;
146    use uuid::Uuid;
147
148    struct MockLinkService;
149
150    #[async_trait]
151    impl LinkService for MockLinkService {
152        async fn create(
153            &self,
154            _link: crate::core::link::LinkEntity,
155        ) -> Result<crate::core::link::LinkEntity> {
156            unimplemented!()
157        }
158        async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
159            unimplemented!()
160        }
161        async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
162            unimplemented!()
163        }
164        async fn find_by_source(
165            &self,
166            _: &Uuid,
167            _: Option<&str>,
168            _: Option<&str>,
169        ) -> Result<Vec<crate::core::link::LinkEntity>> {
170            unimplemented!()
171        }
172        async fn find_by_target(
173            &self,
174            _: &Uuid,
175            _: Option<&str>,
176            _: Option<&str>,
177        ) -> Result<Vec<crate::core::link::LinkEntity>> {
178            unimplemented!()
179        }
180        async fn update(
181            &self,
182            _: &Uuid,
183            _: crate::core::link::LinkEntity,
184        ) -> Result<crate::core::link::LinkEntity> {
185            unimplemented!()
186        }
187        async fn delete(&self, _: &Uuid) -> Result<()> {
188            unimplemented!()
189        }
190        async fn delete_by_entity(&self, _: &Uuid) -> Result<()> {
191            unimplemented!()
192        }
193    }
194
195    fn make_context(entity_id: Uuid) -> FlowContext {
196        let event = FrameworkEvent::Entity(EntityEvent::Created {
197            entity_type: "user".to_string(),
198            entity_id,
199            data: json!({}),
200        });
201        FlowContext::new(
202            event,
203            Arc::new(MockLinkService) as Arc<dyn LinkService>,
204            StdHashMap::new(),
205        )
206    }
207
208    #[tokio::test]
209    async fn test_dedup_first_event_passes() {
210        let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
211        let mut ctx = make_context(Uuid::new_v4());
212
213        let result = op.execute(&mut ctx).await.unwrap();
214        assert!(matches!(result, OpResult::Continue));
215    }
216
217    #[tokio::test]
218    async fn test_dedup_same_key_in_window_drops() {
219        let entity_id = Uuid::new_v4();
220        let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
221
222        let mut ctx1 = make_context(entity_id);
223        let result1 = op.execute(&mut ctx1).await.unwrap();
224        assert!(matches!(result1, OpResult::Continue));
225
226        let mut ctx2 = make_context(entity_id);
227        let result2 = op.execute(&mut ctx2).await.unwrap();
228        assert!(matches!(result2, OpResult::Drop));
229    }
230
231    #[tokio::test]
232    async fn test_dedup_different_key_passes() {
233        let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
234
235        let mut ctx1 = make_context(Uuid::new_v4());
236        let result1 = op.execute(&mut ctx1).await.unwrap();
237        assert!(matches!(result1, OpResult::Continue));
238
239        let mut ctx2 = make_context(Uuid::new_v4());
240        let result2 = op.execute(&mut ctx2).await.unwrap();
241        assert!(matches!(result2, OpResult::Continue));
242    }
243
244    #[tokio::test]
245    async fn test_dedup_expired_window_passes_again() {
246        let entity_id = Uuid::new_v4();
247        // Use a very short window
248        let op = DeduplicateOp::with_window("entity_id", Duration::from_millis(50));
249
250        let mut ctx1 = make_context(entity_id);
251        let result1 = op.execute(&mut ctx1).await.unwrap();
252        assert!(matches!(result1, OpResult::Continue));
253
254        // Wait for window to expire
255        tokio::time::sleep(Duration::from_millis(60)).await;
256
257        let mut ctx2 = make_context(entity_id);
258        let result2 = op.execute(&mut ctx2).await.unwrap();
259        assert!(matches!(result2, OpResult::Continue));
260    }
261
262    #[tokio::test]
263    async fn test_dedup_missing_key_errors() {
264        let op = DeduplicateOp::with_window("nonexistent", Duration::from_secs(60));
265        let mut ctx = make_context(Uuid::new_v4());
266
267        let result = op.execute(&mut ctx).await;
268        assert!(result.is_err());
269    }
270
271    // ── Duration parsing tests ───────────────────────────────────────
272
273    #[test]
274    fn test_parse_duration_seconds() {
275        assert_eq!(parse_duration("30s").unwrap(), Duration::from_secs(30));
276    }
277
278    #[test]
279    fn test_parse_duration_minutes() {
280        assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
281    }
282
283    #[test]
284    fn test_parse_duration_hours() {
285        assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600));
286    }
287
288    #[test]
289    fn test_parse_duration_milliseconds() {
290        assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
291    }
292
293    #[test]
294    fn test_parse_duration_invalid() {
295        assert!(parse_duration("5x").is_err());
296        assert!(parse_duration("abc").is_err());
297    }
298}