mod sink;
mod source;
mod transform;
pub use sink::{EventSink, LearningSink};
pub use source::{EventSource, LocalFileWatcherSource, WatchEvent};
pub use transform::{DebounceTransform, EventTransform, PassthroughTransform};
use crate::error::SwarmError;
pub struct Pipeline<S, T, K> {
source: S,
transform: T,
sink: K,
}
impl<S, T, K> Pipeline<S, T, K>
where
S: EventSource,
T: EventTransform<Input = S::Event, Output = S::Event>,
K: EventSink<Event = S::Event>,
{
pub fn new(source: S, transform: T, sink: K) -> Self {
Self {
source,
transform,
sink,
}
}
pub async fn process_one(&mut self) -> Result<bool, SwarmError> {
let event = match self.source.next().await {
Some(e) => e,
None => return Ok(false),
};
if let Some(transformed) = self.transform.transform(event).await {
self.sink.process(transformed).await?;
}
Ok(true)
}
pub async fn run(&mut self) -> Result<(), SwarmError> {
loop {
if !self.process_one().await? {
break;
}
}
Ok(())
}
pub fn sink(&self) -> &K {
&self.sink
}
pub fn sink_mut(&mut self) -> &mut K {
&mut self.sink
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::time::Duration;
struct MockSource {
events: VecDeque<WatchEvent>,
}
impl MockSource {
fn new(events: Vec<WatchEvent>) -> Self {
Self {
events: events.into(),
}
}
}
impl EventSource for MockSource {
type Event = WatchEvent;
async fn next(&mut self) -> Option<Self::Event> {
self.events.pop_front()
}
}
struct RecordingSink {
processed: Vec<WatchEvent>,
}
impl RecordingSink {
fn new() -> Self {
Self { processed: vec![] }
}
}
impl EventSink for RecordingSink {
type Event = WatchEvent;
async fn process(&mut self, event: Self::Event) -> Result<(), SwarmError> {
self.processed.push(event);
Ok(())
}
}
#[tokio::test]
async fn test_pipeline_passthrough() {
let source = MockSource::new(vec![
WatchEvent::new("scenario_a".into()),
WatchEvent::new("scenario_b".into()),
]);
let transform = PassthroughTransform;
let sink = RecordingSink::new();
let mut pipeline = Pipeline::new(source, transform, sink);
pipeline.run().await.unwrap();
assert_eq!(pipeline.sink().processed.len(), 2);
assert_eq!(pipeline.sink().processed[0].scenario, "scenario_a");
assert_eq!(pipeline.sink().processed[1].scenario, "scenario_b");
}
#[tokio::test]
async fn test_pipeline_with_debounce() {
let source = MockSource::new(vec![
WatchEvent::new("test".into()),
WatchEvent::new("test".into()), WatchEvent::new("other".into()),
]);
let transform = DebounceTransform::new(Duration::from_secs(5));
let sink = RecordingSink::new();
let mut pipeline = Pipeline::new(source, transform, sink);
pipeline.run().await.unwrap();
assert_eq!(pipeline.sink().processed.len(), 2);
assert_eq!(pipeline.sink().processed[0].scenario, "test");
assert_eq!(pipeline.sink().processed[1].scenario, "other");
}
}