Skip to main content

swarm_engine_core/pipeline/
mod.rs

1//! Pipeline abstraction for event-driven processing.
2//!
3//! Provides Source/Transform/Sink pattern for composable, testable pipelines.
4//!
5//! # Architecture
6//!
7//! ```text
8//! Source (Pull) → Transform → Sink
9//!      ↑                        │
10//!      └─── backpressure ───────┘
11//! ```
12//!
13//! # Example
14//!
15//! ```ignore
16//! let source = LocalFileWatcherSource::new(watch_dir)?;
17//! let transform = DebounceTransform::new(Duration::from_secs(5));
18//! let sink = LearningSink::new(store, 20);
19//!
20//! let mut pipeline = Pipeline::new(source, transform, sink);
21//! pipeline.run().await?;
22//! ```
23
24mod sink;
25mod source;
26mod transform;
27
28pub use sink::{EventSink, LearningSink};
29pub use source::{EventSource, LocalFileWatcherSource, WatchEvent};
30pub use transform::{DebounceTransform, EventTransform, PassthroughTransform};
31
32use crate::error::SwarmError;
33
34/// Pipeline executor combining Source, Transform, and Sink.
35pub struct Pipeline<S, T, K> {
36    source: S,
37    transform: T,
38    sink: K,
39}
40
41impl<S, T, K> Pipeline<S, T, K>
42where
43    S: EventSource,
44    T: EventTransform<Input = S::Event, Output = S::Event>,
45    K: EventSink<Event = S::Event>,
46{
47    /// Create a new pipeline.
48    pub fn new(source: S, transform: T, sink: K) -> Self {
49        Self {
50            source,
51            transform,
52            sink,
53        }
54    }
55
56    /// Process one event (useful for testing).
57    pub async fn process_one(&mut self) -> Result<bool, SwarmError> {
58        let event = match self.source.next().await {
59            Some(e) => e,
60            None => return Ok(false),
61        };
62
63        if let Some(transformed) = self.transform.transform(event).await {
64            self.sink.process(transformed).await?;
65        }
66
67        Ok(true)
68    }
69
70    /// Run the pipeline until source exhausted or shutdown.
71    pub async fn run(&mut self) -> Result<(), SwarmError> {
72        loop {
73            if !self.process_one().await? {
74                break;
75            }
76        }
77        Ok(())
78    }
79
80    /// Get reference to sink (for inspection in tests).
81    pub fn sink(&self) -> &K {
82        &self.sink
83    }
84
85    /// Get mutable reference to sink.
86    pub fn sink_mut(&mut self) -> &mut K {
87        &mut self.sink
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use std::collections::VecDeque;
95    use std::time::Duration;
96
97    /// Mock Source for testing.
98    struct MockSource {
99        events: VecDeque<WatchEvent>,
100    }
101
102    impl MockSource {
103        fn new(events: Vec<WatchEvent>) -> Self {
104            Self {
105                events: events.into(),
106            }
107        }
108    }
109
110    impl EventSource for MockSource {
111        type Event = WatchEvent;
112
113        async fn next(&mut self) -> Option<Self::Event> {
114            self.events.pop_front()
115        }
116    }
117
118    /// Recording Sink for testing.
119    struct RecordingSink {
120        processed: Vec<WatchEvent>,
121    }
122
123    impl RecordingSink {
124        fn new() -> Self {
125            Self { processed: vec![] }
126        }
127    }
128
129    impl EventSink for RecordingSink {
130        type Event = WatchEvent;
131
132        async fn process(&mut self, event: Self::Event) -> Result<(), SwarmError> {
133            self.processed.push(event);
134            Ok(())
135        }
136    }
137
138    #[tokio::test]
139    async fn test_pipeline_passthrough() {
140        let source = MockSource::new(vec![
141            WatchEvent::new("scenario_a".into()),
142            WatchEvent::new("scenario_b".into()),
143        ]);
144        let transform = PassthroughTransform;
145        let sink = RecordingSink::new();
146
147        let mut pipeline = Pipeline::new(source, transform, sink);
148        pipeline.run().await.unwrap();
149
150        assert_eq!(pipeline.sink().processed.len(), 2);
151        assert_eq!(pipeline.sink().processed[0].scenario, "scenario_a");
152        assert_eq!(pipeline.sink().processed[1].scenario, "scenario_b");
153    }
154
155    #[tokio::test]
156    async fn test_pipeline_with_debounce() {
157        let source = MockSource::new(vec![
158            WatchEvent::new("test".into()),
159            WatchEvent::new("test".into()), // duplicate - should be filtered
160            WatchEvent::new("other".into()),
161        ]);
162        let transform = DebounceTransform::new(Duration::from_secs(5));
163        let sink = RecordingSink::new();
164
165        let mut pipeline = Pipeline::new(source, transform, sink);
166        pipeline.run().await.unwrap();
167
168        // "test" appears twice but debounce filters the second one
169        assert_eq!(pipeline.sink().processed.len(), 2);
170        assert_eq!(pipeline.sink().processed[0].scenario, "test");
171        assert_eq!(pipeline.sink().processed[1].scenario, "other");
172    }
173}