Skip to main content

swarm_engine_core/pipeline/
transform.rs

1//! Event transform trait and implementations.
2
3use super::WatchEvent;
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7/// Trait for event transformers (filter/map).
8pub trait EventTransform: Send {
9    /// Input event type.
10    type Input: Send;
11    /// Output event type.
12    type Output: Send;
13
14    /// Transform an event. Returns None to filter out the event.
15    fn transform(
16        &mut self,
17        event: Self::Input,
18    ) -> impl std::future::Future<Output = Option<Self::Output>> + Send;
19}
20
21/// Passthrough transform (no-op).
22pub struct PassthroughTransform;
23
24impl EventTransform for PassthroughTransform {
25    type Input = WatchEvent;
26    type Output = WatchEvent;
27
28    async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
29        Some(event)
30    }
31}
32
33/// Debounce transform - filters rapid duplicate events.
34pub struct DebounceTransform {
35    delay: Duration,
36    last_seen: HashMap<String, Instant>,
37}
38
39impl DebounceTransform {
40    /// Create a new debounce transform.
41    ///
42    /// # Arguments
43    /// * `delay` - Minimum time between events for the same scenario
44    pub fn new(delay: Duration) -> Self {
45        Self {
46            delay,
47            last_seen: HashMap::new(),
48        }
49    }
50
51    /// Check if enough time has passed since last event for this key.
52    fn should_pass(&mut self, key: &str) -> bool {
53        let now = Instant::now();
54
55        if let Some(last) = self.last_seen.get(key) {
56            if now.duration_since(*last) < self.delay {
57                return false;
58            }
59        }
60
61        self.last_seen.insert(key.to_string(), now);
62        true
63    }
64
65    /// Clean up old entries to prevent memory leak.
66    pub fn cleanup(&mut self, max_age: Duration) {
67        let now = Instant::now();
68        self.last_seen
69            .retain(|_, last| now.duration_since(*last) < max_age);
70    }
71}
72
73impl EventTransform for DebounceTransform {
74    type Input = WatchEvent;
75    type Output = WatchEvent;
76
77    async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
78        if self.should_pass(&event.scenario) {
79            Some(event)
80        } else {
81            tracing::debug!(scenario = %event.scenario, "Event debounced");
82            None
83        }
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use std::time::Duration;
91
92    #[tokio::test]
93    async fn test_passthrough() {
94        let mut transform = PassthroughTransform;
95        let event = WatchEvent::new("test".into());
96        let result = transform.transform(event.clone()).await;
97        assert!(result.is_some());
98        assert_eq!(result.unwrap().scenario, "test");
99    }
100
101    #[tokio::test]
102    async fn test_debounce_allows_first() {
103        let mut transform = DebounceTransform::new(Duration::from_secs(5));
104        let event = WatchEvent::new("test".into());
105        let result = transform.transform(event).await;
106        assert!(result.is_some());
107    }
108
109    #[tokio::test]
110    async fn test_debounce_filters_rapid_duplicate() {
111        let mut transform = DebounceTransform::new(Duration::from_secs(5));
112
113        let event1 = WatchEvent::new("test".into());
114        let result1 = transform.transform(event1).await;
115        assert!(result1.is_some());
116
117        // Immediate second event should be filtered
118        let event2 = WatchEvent::new("test".into());
119        let result2 = transform.transform(event2).await;
120        assert!(result2.is_none());
121    }
122
123    #[tokio::test]
124    async fn test_debounce_allows_different_scenarios() {
125        let mut transform = DebounceTransform::new(Duration::from_secs(5));
126
127        let event1 = WatchEvent::new("scenario_a".into());
128        let result1 = transform.transform(event1).await;
129        assert!(result1.is_some());
130
131        // Different scenario should pass
132        let event2 = WatchEvent::new("scenario_b".into());
133        let result2 = transform.transform(event2).await;
134        assert!(result2.is_some());
135    }
136
137    #[test]
138    fn test_cleanup() {
139        let mut transform = DebounceTransform::new(Duration::from_millis(10));
140        transform
141            .last_seen
142            .insert("old".to_string(), Instant::now());
143
144        // Wait a bit
145        std::thread::sleep(Duration::from_millis(20));
146
147        transform.cleanup(Duration::from_millis(15));
148        assert!(transform.last_seen.is_empty());
149    }
150}