swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Event transform trait and implementations.

use super::WatchEvent;
use std::collections::HashMap;
use std::time::{Duration, Instant};

/// Trait for event transformers (filter/map).
pub trait EventTransform: Send {
    /// Input event type.
    type Input: Send;
    /// Output event type.
    type Output: Send;

    /// Transform an event. Returns None to filter out the event.
    fn transform(
        &mut self,
        event: Self::Input,
    ) -> impl std::future::Future<Output = Option<Self::Output>> + Send;
}

/// Passthrough transform (no-op).
pub struct PassthroughTransform;

impl EventTransform for PassthroughTransform {
    type Input = WatchEvent;
    type Output = WatchEvent;

    async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
        Some(event)
    }
}

/// Debounce transform - filters rapid duplicate events.
pub struct DebounceTransform {
    delay: Duration,
    last_seen: HashMap<String, Instant>,
}

impl DebounceTransform {
    /// Create a new debounce transform.
    ///
    /// # Arguments
    /// * `delay` - Minimum time between events for the same scenario
    pub fn new(delay: Duration) -> Self {
        Self {
            delay,
            last_seen: HashMap::new(),
        }
    }

    /// Check if enough time has passed since last event for this key.
    fn should_pass(&mut self, key: &str) -> bool {
        let now = Instant::now();

        if let Some(last) = self.last_seen.get(key) {
            if now.duration_since(*last) < self.delay {
                return false;
            }
        }

        self.last_seen.insert(key.to_string(), now);
        true
    }

    /// Clean up old entries to prevent memory leak.
    pub fn cleanup(&mut self, max_age: Duration) {
        let now = Instant::now();
        self.last_seen
            .retain(|_, last| now.duration_since(*last) < max_age);
    }
}

impl EventTransform for DebounceTransform {
    type Input = WatchEvent;
    type Output = WatchEvent;

    async fn transform(&mut self, event: Self::Input) -> Option<Self::Output> {
        if self.should_pass(&event.scenario) {
            Some(event)
        } else {
            tracing::debug!(scenario = %event.scenario, "Event debounced");
            None
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    #[tokio::test]
    async fn test_passthrough() {
        let mut transform = PassthroughTransform;
        let event = WatchEvent::new("test".into());
        let result = transform.transform(event.clone()).await;
        assert!(result.is_some());
        assert_eq!(result.unwrap().scenario, "test");
    }

    #[tokio::test]
    async fn test_debounce_allows_first() {
        let mut transform = DebounceTransform::new(Duration::from_secs(5));
        let event = WatchEvent::new("test".into());
        let result = transform.transform(event).await;
        assert!(result.is_some());
    }

    #[tokio::test]
    async fn test_debounce_filters_rapid_duplicate() {
        let mut transform = DebounceTransform::new(Duration::from_secs(5));

        let event1 = WatchEvent::new("test".into());
        let result1 = transform.transform(event1).await;
        assert!(result1.is_some());

        // Immediate second event should be filtered
        let event2 = WatchEvent::new("test".into());
        let result2 = transform.transform(event2).await;
        assert!(result2.is_none());
    }

    #[tokio::test]
    async fn test_debounce_allows_different_scenarios() {
        let mut transform = DebounceTransform::new(Duration::from_secs(5));

        let event1 = WatchEvent::new("scenario_a".into());
        let result1 = transform.transform(event1).await;
        assert!(result1.is_some());

        // Different scenario should pass
        let event2 = WatchEvent::new("scenario_b".into());
        let result2 = transform.transform(event2).await;
        assert!(result2.is_some());
    }

    #[test]
    fn test_cleanup() {
        let mut transform = DebounceTransform::new(Duration::from_millis(10));
        transform
            .last_seen
            .insert("old".to_string(), Instant::now());

        // Wait a bit
        std::thread::sleep(Duration::from_millis(20));

        transform.cleanup(Duration::from_millis(15));
        assert!(transform.last_seen.is_empty());
    }
}