swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Pipeline abstraction for event-driven processing.
//!
//! Provides Source/Transform/Sink pattern for composable, testable pipelines.
//!
//! # Architecture
//!
//! ```text
//! Source (Pull) → Transform → Sink
//!      ↑                        │
//!      └─── backpressure ───────┘
//! ```
//!
//! # Example
//!
//! ```ignore
//! let source = LocalFileWatcherSource::new(watch_dir)?;
//! let transform = DebounceTransform::new(Duration::from_secs(5));
//! let sink = LearningSink::new(store, 20);
//!
//! let mut pipeline = Pipeline::new(source, transform, sink);
//! pipeline.run().await?;
//! ```

mod sink;
mod source;
mod transform;

pub use sink::{EventSink, LearningSink};
pub use source::{EventSource, LocalFileWatcherSource, WatchEvent};
pub use transform::{DebounceTransform, EventTransform, PassthroughTransform};

use crate::error::SwarmError;

/// Pipeline executor combining Source, Transform, and Sink.
pub struct Pipeline<S, T, K> {
    source: S,
    transform: T,
    sink: K,
}

impl<S, T, K> Pipeline<S, T, K>
where
    S: EventSource,
    T: EventTransform<Input = S::Event, Output = S::Event>,
    K: EventSink<Event = S::Event>,
{
    /// Create a new pipeline.
    pub fn new(source: S, transform: T, sink: K) -> Self {
        Self {
            source,
            transform,
            sink,
        }
    }

    /// Process one event (useful for testing).
    pub async fn process_one(&mut self) -> Result<bool, SwarmError> {
        let event = match self.source.next().await {
            Some(e) => e,
            None => return Ok(false),
        };

        if let Some(transformed) = self.transform.transform(event).await {
            self.sink.process(transformed).await?;
        }

        Ok(true)
    }

    /// Run the pipeline until source exhausted or shutdown.
    pub async fn run(&mut self) -> Result<(), SwarmError> {
        loop {
            if !self.process_one().await? {
                break;
            }
        }
        Ok(())
    }

    /// Get reference to sink (for inspection in tests).
    pub fn sink(&self) -> &K {
        &self.sink
    }

    /// Get mutable reference to sink.
    pub fn sink_mut(&mut self) -> &mut K {
        &mut self.sink
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::VecDeque;
    use std::time::Duration;

    /// Mock Source for testing.
    struct MockSource {
        events: VecDeque<WatchEvent>,
    }

    impl MockSource {
        fn new(events: Vec<WatchEvent>) -> Self {
            Self {
                events: events.into(),
            }
        }
    }

    impl EventSource for MockSource {
        type Event = WatchEvent;

        async fn next(&mut self) -> Option<Self::Event> {
            self.events.pop_front()
        }
    }

    /// Recording Sink for testing.
    struct RecordingSink {
        processed: Vec<WatchEvent>,
    }

    impl RecordingSink {
        fn new() -> Self {
            Self { processed: vec![] }
        }
    }

    impl EventSink for RecordingSink {
        type Event = WatchEvent;

        async fn process(&mut self, event: Self::Event) -> Result<(), SwarmError> {
            self.processed.push(event);
            Ok(())
        }
    }

    #[tokio::test]
    async fn test_pipeline_passthrough() {
        let source = MockSource::new(vec![
            WatchEvent::new("scenario_a".into()),
            WatchEvent::new("scenario_b".into()),
        ]);
        let transform = PassthroughTransform;
        let sink = RecordingSink::new();

        let mut pipeline = Pipeline::new(source, transform, sink);
        pipeline.run().await.unwrap();

        assert_eq!(pipeline.sink().processed.len(), 2);
        assert_eq!(pipeline.sink().processed[0].scenario, "scenario_a");
        assert_eq!(pipeline.sink().processed[1].scenario, "scenario_b");
    }

    #[tokio::test]
    async fn test_pipeline_with_debounce() {
        let source = MockSource::new(vec![
            WatchEvent::new("test".into()),
            WatchEvent::new("test".into()), // duplicate - should be filtered
            WatchEvent::new("other".into()),
        ]);
        let transform = DebounceTransform::new(Duration::from_secs(5));
        let sink = RecordingSink::new();

        let mut pipeline = Pipeline::new(source, transform, sink);
        pipeline.run().await.unwrap();

        // "test" appears twice but debounce filters the second one
        assert_eq!(pipeline.sink().processed.len(), 2);
        assert_eq!(pipeline.sink().processed[0].scenario, "test");
        assert_eq!(pipeline.sink().processed[1].scenario, "other");
    }
}