swarm_engine_core/pipeline/
transform.rs1use super::WatchEvent;
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7pub trait EventTransform: Send {
9 type Input: Send;
11 type Output: Send;
13
14 fn transform(
16 &mut self,
17 event: Self::Input,
18 ) -> impl std::future::Future<Output = Option<Self::Output>> + Send;
19}
20
21pub struct PassthroughTransform;
23
24impl EventTransform for PassthroughTransform {
25 type Input = WatchEvent;
26 type Output = WatchEvent;
27
28 async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
29 Some(event)
30 }
31}
32
33pub struct DebounceTransform {
35 delay: Duration,
36 last_seen: HashMap<String, Instant>,
37}
38
39impl DebounceTransform {
40 pub fn new(delay: Duration) -> Self {
45 Self {
46 delay,
47 last_seen: HashMap::new(),
48 }
49 }
50
51 fn should_pass(&mut self, key: &str) -> bool {
53 let now = Instant::now();
54
55 if let Some(last) = self.last_seen.get(key) {
56 if now.duration_since(*last) < self.delay {
57 return false;
58 }
59 }
60
61 self.last_seen.insert(key.to_string(), now);
62 true
63 }
64
65 pub fn cleanup(&mut self, max_age: Duration) {
67 let now = Instant::now();
68 self.last_seen
69 .retain(|_, last| now.duration_since(*last) < max_age);
70 }
71}
72
73impl EventTransform for DebounceTransform {
74 type Input = WatchEvent;
75 type Output = WatchEvent;
76
77 async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
78 if self.should_pass(&event.scenario) {
79 Some(event)
80 } else {
81 tracing::debug!(scenario = %event.scenario, "Event debounced");
82 None
83 }
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use std::time::Duration;
91
92 #[tokio::test]
93 async fn test_passthrough() {
94 let mut transform = PassthroughTransform;
95 let event = WatchEvent::new("test".into());
96 let result = transform.transform(event.clone()).await;
97 assert!(result.is_some());
98 assert_eq!(result.unwrap().scenario, "test");
99 }
100
101 #[tokio::test]
102 async fn test_debounce_allows_first() {
103 let mut transform = DebounceTransform::new(Duration::from_secs(5));
104 let event = WatchEvent::new("test".into());
105 let result = transform.transform(event).await;
106 assert!(result.is_some());
107 }
108
109 #[tokio::test]
110 async fn test_debounce_filters_rapid_duplicate() {
111 let mut transform = DebounceTransform::new(Duration::from_secs(5));
112
113 let event1 = WatchEvent::new("test".into());
114 let result1 = transform.transform(event1).await;
115 assert!(result1.is_some());
116
117 let event2 = WatchEvent::new("test".into());
119 let result2 = transform.transform(event2).await;
120 assert!(result2.is_none());
121 }
122
123 #[tokio::test]
124 async fn test_debounce_allows_different_scenarios() {
125 let mut transform = DebounceTransform::new(Duration::from_secs(5));
126
127 let event1 = WatchEvent::new("scenario_a".into());
128 let result1 = transform.transform(event1).await;
129 assert!(result1.is_some());
130
131 let event2 = WatchEvent::new("scenario_b".into());
133 let result2 = transform.transform(event2).await;
134 assert!(result2.is_some());
135 }
136
137 #[test]
138 fn test_cleanup() {
139 let mut transform = DebounceTransform::new(Duration::from_millis(10));
140 transform
141 .last_seen
142 .insert("old".to_string(), Instant::now());
143
144 std::thread::sleep(Duration::from_millis(20));
146
147 transform.cleanup(Duration::from_millis(15));
148 assert!(transform.last_seen.is_empty());
149 }
150}