use async_trait::async_trait;
use crate::common::message::Message;
use crate::error::Result;
use crate::transform::Transform;
use crate::transform::step::Step;
pub struct TransformPipeline {
id: String,
steps: Vec<Box<dyn Step>>,
}
impl TransformPipeline {
pub fn new(id: impl Into<String>, steps: Vec<Box<dyn Step>>) -> Self {
Self {
id: id.into(),
steps,
}
}
}
#[async_trait]
impl Transform for TransformPipeline {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: Message) -> Result<Vec<Message>> {
let mut current = msg;
tracing::debug!(
transform_id = %self.id,
"Pipeline processing message"
);
for step in &self.steps {
let step_type = step.step_type();
tracing::trace!(
transform_id = %self.id,
step = %step_type,
"Executing step"
);
match step.process(current)? {
Some(output) => {
current = output;
}
None => {
tracing::debug!(
transform_id = %self.id,
step = %step_type,
"Message filtered out by step"
);
return Ok(vec![]);
}
}
}
tracing::debug!(
transform_id = %self.id,
"Pipeline finished processing"
);
Ok(vec![current])
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
struct PassThroughStep;
impl Step for PassThroughStep {
fn step_type(&self) -> &'static str {
"passthrough"
}
fn process(&self, msg: Message) -> Result<Option<Message>> {
Ok(Some(msg))
}
}
struct DropIfTrueStep;
impl Step for DropIfTrueStep {
fn step_type(&self) -> &'static str {
"drop_if_true"
}
fn process(&self, msg: Message) -> Result<Option<Message>> {
if msg.payload.get("drop") == Some(&serde_json::json!(true)) {
return Ok(None);
}
Ok(Some(msg))
}
}
#[tokio::test]
async fn test_pipeline_passes_through_all_steps() {
let pipeline = TransformPipeline::new(
"test",
vec![Box::new(PassThroughStep), Box::new(PassThroughStep)],
);
let msg = Message::new("test", json!({"value": 42}));
let result = pipeline.process(msg).await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].payload["value"], 42);
}
#[tokio::test]
async fn test_pipeline_stops_on_filter() {
let pipeline = TransformPipeline::new(
"test",
vec![Box::new(DropIfTrueStep), Box::new(PassThroughStep)],
);
let msg = Message::new("test", json!({"drop": true}));
let result = pipeline.process(msg).await.unwrap();
assert!(result.is_empty());
let msg = Message::new("test", json!({"drop": false, "data": "hello"}));
let result = pipeline.process(msg).await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].payload["data"], "hello");
}
}