enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Sequential Flow - Execute callables one after another
//!
//! The output of each step becomes the input to the next.

use crate::callable::Callable;
use std::sync::Arc;

/// Sequential execution flow
pub struct SequentialFlow<C: Callable> {
    /// Ordered list of callables to execute
    steps: Vec<Arc<C>>,
    /// Flow name
    name: String,
}

impl<C: Callable> SequentialFlow<C> {
    /// Create a new sequential flow
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            steps: Vec::new(),
            name: name.into(),
        }
    }

    /// Add a step to the flow
    pub fn add_step(mut self, callable: Arc<C>) -> Self {
        self.steps.push(callable);
        self
    }

    /// Add multiple steps
    pub fn with_steps(mut self, callables: Vec<Arc<C>>) -> Self {
        self.steps.extend(callables);
        self
    }

    /// Execute the flow
    pub async fn execute(&self, input: &str) -> anyhow::Result<String> {
        let mut current_output = input.to_string();

        for step in &self.steps {
            current_output = step.run(&current_output).await?;
        }

        Ok(current_output)
    }

    /// Get the flow name
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Get the number of steps
    pub fn len(&self) -> usize {
        self.steps.len()
    }

    /// Check if empty
    pub fn is_empty(&self) -> bool {
        self.steps.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;

    /// Mock callable for testing - transforms input in a predictable way
    struct MockCallable {
        name: String,
        transform: Box<dyn Fn(&str) -> String + Send + Sync>,
    }

    impl MockCallable {
        fn new(name: &str, transform: impl Fn(&str) -> String + Send + Sync + 'static) -> Self {
            Self {
                name: name.to_string(),
                transform: Box::new(transform),
            }
        }

        fn uppercase(name: &str) -> Self {
            Self::new(name, |s| s.to_uppercase())
        }

        fn append(name: &str, suffix: &'static str) -> Self {
            Self::new(name, move |s| format!("{}{}", s, suffix))
        }

        fn prepend(name: &str, prefix: &'static str) -> Self {
            Self::new(name, move |s| format!("{}{}", prefix, s))
        }
    }

    #[async_trait]
    impl Callable for MockCallable {
        fn name(&self) -> &str {
            &self.name
        }

        async fn run(&self, input: &str) -> anyhow::Result<String> {
            Ok((self.transform)(input))
        }
    }

    #[tokio::test]
    async fn test_sequential_empty() {
        let flow: SequentialFlow<MockCallable> = SequentialFlow::new("empty");
        assert!(flow.is_empty());
        assert_eq!(flow.len(), 0);

        let result = flow.execute("input").await.unwrap();
        assert_eq!(result, "input"); // No steps = input passes through
    }

    #[tokio::test]
    async fn test_sequential_single_step() {
        let step = Arc::new(MockCallable::uppercase("upper"));
        let flow = SequentialFlow::new("single").add_step(step);

        assert_eq!(flow.len(), 1);
        assert!(!flow.is_empty());
        assert_eq!(flow.name(), "single");

        let result = flow.execute("hello").await.unwrap();
        assert_eq!(result, "HELLO");
    }

    #[tokio::test]
    async fn test_sequential_multiple_steps() {
        let flow = SequentialFlow::new("chain")
            .add_step(Arc::new(MockCallable::uppercase("step1")))
            .add_step(Arc::new(MockCallable::append("step2", "!")))
            .add_step(Arc::new(MockCallable::prepend("step3", ">> ")));

        assert_eq!(flow.len(), 3);

        let result = flow.execute("hello").await.unwrap();
        assert_eq!(result, ">> HELLO!");
    }

    #[tokio::test]
    async fn test_sequential_with_steps() {
        let steps = vec![
            Arc::new(MockCallable::uppercase("s1")),
            Arc::new(MockCallable::append("s2", "_done")),
        ];
        let flow = SequentialFlow::new("batch").with_steps(steps);

        assert_eq!(flow.len(), 2);
        let result = flow.execute("test").await.unwrap();
        assert_eq!(result, "TEST_done");
    }

    #[tokio::test]
    async fn test_sequential_error_propagation() {
        struct FailingCallable {
            name: String,
            fail_on_call: usize,
            call_count: std::sync::atomic::AtomicUsize,
        }

        impl FailingCallable {
            fn new(name: &str, fail_on: usize) -> Self {
                Self {
                    name: name.to_string(),
                    fail_on_call: fail_on,
                    call_count: std::sync::atomic::AtomicUsize::new(0),
                }
            }
        }

        #[async_trait]
        impl Callable for FailingCallable {
            fn name(&self) -> &str {
                &self.name
            }

            async fn run(&self, input: &str) -> anyhow::Result<String> {
                let n = self
                    .call_count
                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                if n >= self.fail_on_call {
                    anyhow::bail!("Intentional failure at step {}", n)
                }
                Ok(input.to_uppercase())
            }
        }

        // Flow of 3 FailingCallable steps, second one fails
        let flow = SequentialFlow::new("with_error")
            .add_step(Arc::new(FailingCallable::new("step1", 10))) // Won't fail
            .add_step(Arc::new(FailingCallable::new("step2", 0))) // Fails immediately
            .add_step(Arc::new(FailingCallable::new("step3", 10))); // Never reached

        let result = flow.execute("hello").await;
        assert!(result.is_err());
        assert!(result
            .unwrap_err()
            .to_string()
            .contains("Intentional failure"));
    }

    #[tokio::test]
    async fn test_sequential_preserves_order() {
        use std::sync::atomic::{AtomicUsize, Ordering};

        let counter = Arc::new(AtomicUsize::new(0));
        let execution_order = Arc::new(std::sync::Mutex::new(Vec::new()));

        struct OrderTracker {
            name: String,
            counter: Arc<AtomicUsize>,
            order: Arc<std::sync::Mutex<Vec<usize>>>,
        }

        #[async_trait]
        impl Callable for OrderTracker {
            fn name(&self) -> &str {
                &self.name
            }

            async fn run(&self, input: &str) -> anyhow::Result<String> {
                let n = self.counter.fetch_add(1, Ordering::SeqCst);
                self.order.lock().unwrap().push(n);
                Ok(input.to_string())
            }
        }

        let flow = SequentialFlow::new("ordered")
            .add_step(Arc::new(OrderTracker {
                name: "first".to_string(),
                counter: counter.clone(),
                order: execution_order.clone(),
            }))
            .add_step(Arc::new(OrderTracker {
                name: "second".to_string(),
                counter: counter.clone(),
                order: execution_order.clone(),
            }))
            .add_step(Arc::new(OrderTracker {
                name: "third".to_string(),
                counter: counter.clone(),
                order: execution_order.clone(),
            }));

        flow.execute("test").await.unwrap();

        let order = execution_order.lock().unwrap();
        assert_eq!(*order, vec![0, 1, 2]);
    }
}