this/events/operators/
deduplicate.rs1use crate::config::events::DeduplicateConfig;
13use crate::events::context::FlowContext;
14use crate::events::operators::{OpResult, PipelineOperator};
15use anyhow::{Result, anyhow};
16use async_trait::async_trait;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::RwLock;
21
22#[derive(Debug)]
24pub struct DeduplicateOp {
25 key: String,
27
28 window: Duration,
30
31 seen: Arc<RwLock<HashMap<String, Instant>>>,
33}
34
35impl DeduplicateOp {
36 pub fn from_config(config: &DeduplicateConfig) -> Result<Self> {
38 let window = parse_duration(&config.window)?;
39 Ok(Self {
40 key: config.key.clone(),
41 window,
42 seen: Arc::new(RwLock::new(HashMap::new())),
43 })
44 }
45
46 #[cfg(test)]
48 fn with_window(key: &str, window: Duration) -> Self {
49 Self {
50 key: key.to_string(),
51 window,
52 seen: Arc::new(RwLock::new(HashMap::new())),
53 }
54 }
55}
56
57#[async_trait]
58impl PipelineOperator for DeduplicateOp {
59 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
60 let key_value = ctx
62 .get_var(&self.key)
63 .ok_or_else(|| anyhow!("deduplicate: variable '{}' not found in context", self.key))?
64 .clone();
65
66 let key_str = value_to_string(&key_value);
67 let now = Instant::now();
68
69 let mut seen = self.seen.write().await;
70
71 seen.retain(|_, ts| now.duration_since(*ts) < self.window);
73
74 if seen.contains_key(&key_str) {
76 return Ok(OpResult::Drop);
77 }
78
79 seen.insert(key_str, now);
81 Ok(OpResult::Continue)
82 }
83
84 fn name(&self) -> &str {
85 "deduplicate"
86 }
87}
88
89fn value_to_string(value: &serde_json::Value) -> String {
91 match value {
92 serde_json::Value::String(s) => s.clone(),
93 serde_json::Value::Number(n) => n.to_string(),
94 serde_json::Value::Bool(b) => b.to_string(),
95 serde_json::Value::Null => "null".to_string(),
96 other => other.to_string(),
97 }
98}
99
100pub(crate) fn parse_duration(s: &str) -> Result<Duration> {
102 let s = s.trim();
103
104 if let Some(ms) = s.strip_suffix("ms") {
105 let n: u64 = ms
106 .parse()
107 .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
108 return Ok(Duration::from_millis(n));
109 }
110
111 if let Some(secs) = s.strip_suffix('s') {
112 let n: u64 = secs
113 .parse()
114 .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
115 return Ok(Duration::from_secs(n));
116 }
117
118 if let Some(mins) = s.strip_suffix('m') {
119 let n: u64 = mins
120 .parse()
121 .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
122 return Ok(Duration::from_secs(n * 60));
123 }
124
125 if let Some(hours) = s.strip_suffix('h') {
126 let n: u64 = hours
127 .parse()
128 .map_err(|_| anyhow!("invalid duration: '{}'", s))?;
129 return Ok(Duration::from_secs(n * 3600));
130 }
131
132 Err(anyhow!(
133 "invalid duration '{}': expected format like '5m', '1h', '30s', '100ms'",
134 s
135 ))
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use crate::core::events::{EntityEvent, FrameworkEvent};
142 use crate::core::service::LinkService;
143 use serde_json::json;
144 use std::collections::HashMap as StdHashMap;
145 use std::sync::Arc;
146 use uuid::Uuid;
147
148 struct MockLinkService;
149
150 #[async_trait]
151 impl LinkService for MockLinkService {
152 async fn create(
153 &self,
154 _link: crate::core::link::LinkEntity,
155 ) -> Result<crate::core::link::LinkEntity> {
156 unimplemented!()
157 }
158 async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
159 unimplemented!()
160 }
161 async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
162 unimplemented!()
163 }
164 async fn find_by_source(
165 &self,
166 _: &Uuid,
167 _: Option<&str>,
168 _: Option<&str>,
169 ) -> Result<Vec<crate::core::link::LinkEntity>> {
170 unimplemented!()
171 }
172 async fn find_by_target(
173 &self,
174 _: &Uuid,
175 _: Option<&str>,
176 _: Option<&str>,
177 ) -> Result<Vec<crate::core::link::LinkEntity>> {
178 unimplemented!()
179 }
180 async fn update(
181 &self,
182 _: &Uuid,
183 _: crate::core::link::LinkEntity,
184 ) -> Result<crate::core::link::LinkEntity> {
185 unimplemented!()
186 }
187 async fn delete(&self, _: &Uuid) -> Result<()> {
188 unimplemented!()
189 }
190 async fn delete_by_entity(&self, _: &Uuid) -> Result<()> {
191 unimplemented!()
192 }
193 }
194
195 fn make_context(entity_id: Uuid) -> FlowContext {
196 let event = FrameworkEvent::Entity(EntityEvent::Created {
197 entity_type: "user".to_string(),
198 entity_id,
199 data: json!({}),
200 });
201 FlowContext::new(
202 event,
203 Arc::new(MockLinkService) as Arc<dyn LinkService>,
204 StdHashMap::new(),
205 )
206 }
207
208 #[tokio::test]
209 async fn test_dedup_first_event_passes() {
210 let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
211 let mut ctx = make_context(Uuid::new_v4());
212
213 let result = op.execute(&mut ctx).await.unwrap();
214 assert!(matches!(result, OpResult::Continue));
215 }
216
217 #[tokio::test]
218 async fn test_dedup_same_key_in_window_drops() {
219 let entity_id = Uuid::new_v4();
220 let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
221
222 let mut ctx1 = make_context(entity_id);
223 let result1 = op.execute(&mut ctx1).await.unwrap();
224 assert!(matches!(result1, OpResult::Continue));
225
226 let mut ctx2 = make_context(entity_id);
227 let result2 = op.execute(&mut ctx2).await.unwrap();
228 assert!(matches!(result2, OpResult::Drop));
229 }
230
231 #[tokio::test]
232 async fn test_dedup_different_key_passes() {
233 let op = DeduplicateOp::with_window("entity_id", Duration::from_secs(60));
234
235 let mut ctx1 = make_context(Uuid::new_v4());
236 let result1 = op.execute(&mut ctx1).await.unwrap();
237 assert!(matches!(result1, OpResult::Continue));
238
239 let mut ctx2 = make_context(Uuid::new_v4());
240 let result2 = op.execute(&mut ctx2).await.unwrap();
241 assert!(matches!(result2, OpResult::Continue));
242 }
243
244 #[tokio::test]
245 async fn test_dedup_expired_window_passes_again() {
246 let entity_id = Uuid::new_v4();
247 let op = DeduplicateOp::with_window("entity_id", Duration::from_millis(50));
249
250 let mut ctx1 = make_context(entity_id);
251 let result1 = op.execute(&mut ctx1).await.unwrap();
252 assert!(matches!(result1, OpResult::Continue));
253
254 tokio::time::sleep(Duration::from_millis(60)).await;
256
257 let mut ctx2 = make_context(entity_id);
258 let result2 = op.execute(&mut ctx2).await.unwrap();
259 assert!(matches!(result2, OpResult::Continue));
260 }
261
262 #[tokio::test]
263 async fn test_dedup_missing_key_errors() {
264 let op = DeduplicateOp::with_window("nonexistent", Duration::from_secs(60));
265 let mut ctx = make_context(Uuid::new_v4());
266
267 let result = op.execute(&mut ctx).await;
268 assert!(result.is_err());
269 }
270
271 #[test]
274 fn test_parse_duration_seconds() {
275 assert_eq!(parse_duration("30s").unwrap(), Duration::from_secs(30));
276 }
277
278 #[test]
279 fn test_parse_duration_minutes() {
280 assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
281 }
282
283 #[test]
284 fn test_parse_duration_hours() {
285 assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600));
286 }
287
288 #[test]
289 fn test_parse_duration_milliseconds() {
290 assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
291 }
292
293 #[test]
294 fn test_parse_duration_invalid() {
295 assert!(parse_duration("5x").is_err());
296 assert!(parse_duration("abc").is_err());
297 }
298}