pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Transform pipeline for multi-step processing
//!
//! A TransformPipeline chains multiple steps together, executing them sequentially.
//! Messages flow through each step; if any step filters out the message, processing stops.

use async_trait::async_trait;

use crate::common::message::Message;
use crate::error::Result;
use crate::transform::Transform;
use crate::transform::step::Step;

/// Transform pipeline that chains multiple steps together.
pub struct TransformPipeline {
    id: String,
    /// Steps are executed sequentially. Stateful steps should use interior mutability
    /// (e.g. `Mutex`) if they need to maintain shared state.
    steps: Vec<Box<dyn Step>>,
}

impl TransformPipeline {
    /// Create a new transform pipeline with the given steps.
    pub fn new(id: impl Into<String>, steps: Vec<Box<dyn Step>>) -> Self {
        Self {
            id: id.into(),
            steps,
        }
    }
}

#[async_trait]
impl Transform for TransformPipeline {
    fn id(&self) -> &str {
        &self.id
    }

    async fn process(&self, msg: Message) -> Result<Vec<Message>> {
        let mut current = msg;

        // processing start log
        tracing::debug!(
            transform_id = %self.id,
            "Pipeline processing message"
        );

        for step in &self.steps {
            let step_type = step.step_type();

            // step start log
            tracing::trace!(
                transform_id = %self.id,
                step = %step_type,
                "Executing step"
            );

            match step.process(current)? {
                Some(output) => {
                    current = output;
                }
                None => {
                    tracing::debug!(
                        transform_id = %self.id,
                        step = %step_type,
                        "Message filtered out by step"
                    );
                    return Ok(vec![]);
                }
            }
        }

        tracing::debug!(
            transform_id = %self.id,
            "Pipeline finished processing"
        );

        Ok(vec![current])
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    /// A simple pass-through step for testing
    struct PassThroughStep;

    impl Step for PassThroughStep {
        fn step_type(&self) -> &'static str {
            "passthrough"
        }

        fn process(&self, msg: Message) -> Result<Option<Message>> {
            Ok(Some(msg))
        }
    }

    /// A step that filters out messages with payload containing {"drop": true}
    struct DropIfTrueStep;

    impl Step for DropIfTrueStep {
        fn step_type(&self) -> &'static str {
            "drop_if_true"
        }

        fn process(&self, msg: Message) -> Result<Option<Message>> {
            if msg.payload.get("drop") == Some(&serde_json::json!(true)) {
                return Ok(None);
            }
            Ok(Some(msg))
        }
    }

    #[tokio::test]
    async fn test_pipeline_passes_through_all_steps() {
        let pipeline = TransformPipeline::new(
            "test",
            vec![Box::new(PassThroughStep), Box::new(PassThroughStep)],
        );

        let msg = Message::new("test", json!({"value": 42}));
        let result = pipeline.process(msg).await.unwrap();

        assert_eq!(result.len(), 1);
        assert_eq!(result[0].payload["value"], 42);
    }

    #[tokio::test]
    async fn test_pipeline_stops_on_filter() {
        let pipeline = TransformPipeline::new(
            "test",
            vec![Box::new(DropIfTrueStep), Box::new(PassThroughStep)],
        );

        // Message that should be dropped
        let msg = Message::new("test", json!({"drop": true}));
        let result = pipeline.process(msg).await.unwrap();
        assert!(result.is_empty());

        // Message that should pass
        let msg = Message::new("test", json!({"drop": false, "data": "hello"}));
        let result = pipeline.process(msg).await.unwrap();
        assert_eq!(result.len(), 1);
        assert_eq!(result[0].payload["data"], "hello");
    }
}