swarm_engine_core/pipeline/
mod.rs1mod sink;
25mod source;
26mod transform;
27
28pub use sink::{EventSink, LearningSink};
29pub use source::{EventSource, LocalFileWatcherSource, WatchEvent};
30pub use transform::{DebounceTransform, EventTransform, PassthroughTransform};
31
32use crate::error::SwarmError;
33
34pub struct Pipeline<S, T, K> {
36 source: S,
37 transform: T,
38 sink: K,
39}
40
41impl<S, T, K> Pipeline<S, T, K>
42where
43 S: EventSource,
44 T: EventTransform<Input = S::Event, Output = S::Event>,
45 K: EventSink<Event = S::Event>,
46{
47 pub fn new(source: S, transform: T, sink: K) -> Self {
49 Self {
50 source,
51 transform,
52 sink,
53 }
54 }
55
56 pub async fn process_one(&mut self) -> Result<bool, SwarmError> {
58 let event = match self.source.next().await {
59 Some(e) => e,
60 None => return Ok(false),
61 };
62
63 if let Some(transformed) = self.transform.transform(event).await {
64 self.sink.process(transformed).await?;
65 }
66
67 Ok(true)
68 }
69
70 pub async fn run(&mut self) -> Result<(), SwarmError> {
72 loop {
73 if !self.process_one().await? {
74 break;
75 }
76 }
77 Ok(())
78 }
79
80 pub fn sink(&self) -> &K {
82 &self.sink
83 }
84
85 pub fn sink_mut(&mut self) -> &mut K {
87 &mut self.sink
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use std::collections::VecDeque;
95 use std::time::Duration;
96
97 struct MockSource {
99 events: VecDeque<WatchEvent>,
100 }
101
102 impl MockSource {
103 fn new(events: Vec<WatchEvent>) -> Self {
104 Self {
105 events: events.into(),
106 }
107 }
108 }
109
110 impl EventSource for MockSource {
111 type Event = WatchEvent;
112
113 async fn next(&mut self) -> Option<Self::Event> {
114 self.events.pop_front()
115 }
116 }
117
118 struct RecordingSink {
120 processed: Vec<WatchEvent>,
121 }
122
123 impl RecordingSink {
124 fn new() -> Self {
125 Self { processed: vec![] }
126 }
127 }
128
129 impl EventSink for RecordingSink {
130 type Event = WatchEvent;
131
132 async fn process(&mut self, event: Self::Event) -> Result<(), SwarmError> {
133 self.processed.push(event);
134 Ok(())
135 }
136 }
137
138 #[tokio::test]
139 async fn test_pipeline_passthrough() {
140 let source = MockSource::new(vec![
141 WatchEvent::new("scenario_a".into()),
142 WatchEvent::new("scenario_b".into()),
143 ]);
144 let transform = PassthroughTransform;
145 let sink = RecordingSink::new();
146
147 let mut pipeline = Pipeline::new(source, transform, sink);
148 pipeline.run().await.unwrap();
149
150 assert_eq!(pipeline.sink().processed.len(), 2);
151 assert_eq!(pipeline.sink().processed[0].scenario, "scenario_a");
152 assert_eq!(pipeline.sink().processed[1].scenario, "scenario_b");
153 }
154
155 #[tokio::test]
156 async fn test_pipeline_with_debounce() {
157 let source = MockSource::new(vec![
158 WatchEvent::new("test".into()),
159 WatchEvent::new("test".into()), WatchEvent::new("other".into()),
161 ]);
162 let transform = DebounceTransform::new(Duration::from_secs(5));
163 let sink = RecordingSink::new();
164
165 let mut pipeline = Pipeline::new(source, transform, sink);
166 pipeline.run().await.unwrap();
167
168 assert_eq!(pipeline.sink().processed.len(), 2);
170 assert_eq!(pipeline.sink().processed[0].scenario, "test");
171 assert_eq!(pipeline.sink().processed[1].scenario, "other");
172 }
173}