bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! Sequence pattern executor
//!
//! Executes workers sequentially, each step waits for the previous to complete.
//! Failure semantics controlled by `SwarmFile.on_failure`:
//! - `FailFast` (default): any step failure immediately terminates the flow
//! - `Continue`: records failure, continues remaining steps, returns aggregated error
//! - `Ignore`: records failure but treats as success, continues remaining steps
//!
//! Supports parameter substitution:
//! - Worker input expressions are resolved before execution
//! - Worker outputs are added to scope for subsequent steps
//!
//! ## CR2: CapabilityOutput
//!
//! After all steps complete, the executor applies expose resolution or output behavior
//! to produce the final capability output.

use async_trait::async_trait;

use crate::{ExecutionMetrics, ExecutionResult, FailureBehavior, FlowPattern, RunError, RunId, RunStatus};

use super::{build_capability_output, execute_worker, PatternContext, PatternExecutor};

/// Sequence pattern executor
pub struct SequenceExecutor;

impl SequenceExecutor {
    /// Create a new sequence executor
    pub fn new() -> Self {
        SequenceExecutor
    }
}

impl Default for SequenceExecutor {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl PatternExecutor for SequenceExecutor {
    fn name(&self) -> &'static str {
        "sequence"
    }

    async fn execute(
        &self,
        ctx: &PatternContext,
        runtime: &dyn crate::RuntimeAdapter,
        cancel: &crate::CancellationToken,
    ) -> Result<ExecutionResult, RunError> {
        let steps = match &ctx.swarm.flow {
            FlowPattern::Sequence { steps } => steps,
            _ => {
                return Err(RunError::PatternError {
                    pattern: "sequence".into(),
                    step: "flow".into(),
                    message: "SequenceExecutor requires Sequence pattern in flow".into(),
                })
            }
        };

        if steps.is_empty() {
            return Ok(ExecutionResult {
                run_id: RunId::new(),
                status: RunStatus::Completed,
                artifacts: vec![],
                error: None,
                metrics: ExecutionMetrics::default(),
                output: None,
            });
        }

        // Execute each step in sequence
        let mut artifacts = vec![];
        let mut current_ctx = ctx.clone();
        let on_failure = ctx.swarm.on_failure;
        // Tracks failed step names for Continue/Ignore summary
        let mut failed_steps: Vec<String> = vec![];

        for (idx, step_name) in steps.iter().enumerate() {
            current_ctx.state.current_step = idx;

            // Check cancellation
            if cancel.is_cancelled().await {
                return Ok(ExecutionResult {
                    run_id: RunId::new(),
                    status: RunStatus::Cancelled,
                    artifacts,
                    error: Some(RunError::Cancelled {
                        reason: "Execution cancelled".into(),
                    }),
                    metrics: ExecutionMetrics::default(),
                    output: None,
                });
            }

            // Get worker
            let worker =
                current_ctx
                    .get_worker(step_name)
                    .ok_or_else(|| RunError::PatternError {
                        pattern: "sequence".into(),
                        step: step_name.clone(),
                        message: format!("Worker '{}' not found in swarm", step_name),
                    })?;

            // Execute worker
            let result = execute_worker(
                worker,
                runtime,
                &current_ctx.runtime_ctx,
                &current_ctx.scope,
                cancel,
            )
            .await?;

            match result.status {
                RunStatus::Completed => {
                    artifacts.extend(result.artifacts);

                    // Add step output to scope for subsequent steps
                    if let Some(output) = &result.output {
                        current_ctx.add_step_output(step_name, output.clone());
                    }
                }
                RunStatus::Failed => {
                    match on_failure {
                        FailureBehavior::FailFast => {
                            return Ok(ExecutionResult {
                                run_id: RunId::new(),
                                status: RunStatus::Failed,
                                artifacts,
                                error: result.error,
                                metrics: result.metrics,
                                output: None,
                            });
                        }
                        FailureBehavior::Continue | FailureBehavior::Ignore => {
                            // Record failure and continue with next step
                            failed_steps.push(step_name.clone());
                        }
                    }
                }
                RunStatus::Cancelled => {
                    return Ok(ExecutionResult {
                        run_id: RunId::new(),
                        status: RunStatus::Cancelled,
                        artifacts,
                        error: Some(RunError::Cancelled {
                            reason: "Execution cancelled".into(),
                        }),
                        metrics: result.metrics,
                        output: None,
                    });
                }
                _ => {
                    // Other statuses not expected after wait
                    return Ok(ExecutionResult {
                        run_id: RunId::new(),
                        status: RunStatus::Failed,
                        artifacts,
                        error: Some(RunError::RuntimeError {
                            message: format!("Unexpected status after wait: {:?}", result.status),
                        }),
                        metrics: result.metrics,
                        output: None,
                    });
                }
            }
        }

        // Determine final status based on on_failure behavior
        let (final_status, final_error) = if failed_steps.is_empty() {
            (RunStatus::Completed, None)
        } else {
            match on_failure {
                FailureBehavior::Continue => (
                    RunStatus::Failed,
                    Some(RunError::PatternError {
                        pattern: "sequence".into(),
                        step: "summary".into(),
                        message: format!(
                            "{} worker(s) failed: {}",
                            failed_steps.len(),
                            failed_steps.join(", ")
                        ),
                    }),
                ),
                // Ignore: treat failures as success
                FailureBehavior::Ignore => (RunStatus::Completed, None),
                // FailFast already returned early above
                FailureBehavior::FailFast => unreachable!(),
            }
        };

        // All steps processed - build capability output with expose resolution
        let result = ExecutionResult {
            run_id: RunId::new(),
            status: final_status,
            artifacts,
            error: final_error,
            metrics: ExecutionMetrics::default(),
            output: None,
        };

        // CR2: Apply expose resolution or output behavior
        Ok(build_capability_output(
            result,
            &ctx.swarm,
            &current_ctx.scope,
        ))
    }

    async fn on_failure(
        &self,
        _ctx: &mut PatternContext,
        _runtime: &dyn crate::RuntimeAdapter,
        _failed_worker: &str,
        _error: &RunError,
    ) -> Result<bool, RunError> {
        // Sequence pattern: failure stops execution (fail_fast)
        Ok(false)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{CancellationToken, ExecutionContext, FailureBehavior, RuntimeKind, Scope, SwarmFile, Worker};
    use serde_json::json;
    use std::io::Write;

    #[test]
    fn test_sequence_executor_name() {
        let executor = SequenceExecutor::new();
        assert_eq!(executor.name(), "sequence");
    }

    #[tokio::test]
    async fn test_sequence_executor_wrong_pattern() {
        let executor = SequenceExecutor::new();
        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Parallel {
                branches: vec![],
                fail_fast: false,
            },
        );
        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let result = executor
            .execute(&ctx, &crate::LocalRuntime::new(), &cancel)
            .await;
        assert!(result.is_err());
    }

    #[test]
    fn test_scope_propagation() {
        // Test that scope correctly propagates step outputs
        let mut scope = Scope::empty();
        scope.add_step_output("step1".to_string(), json!({ "result": "data" }));

        // Verify step output is in scope
        assert!(scope.steps.contains_key("step1"));
        assert_eq!(scope.steps["step1"].output["result"], "data");

        // Verify it's accessible for template resolution
        let scope_json = scope.to_json();
        assert_eq!(scope_json["steps"]["step1"]["output"]["result"], "data");
    }

    /// AC2: Continue mode — failure is recorded but subsequent steps continue executing
    #[tokio::test]
    async fn test_sequence_on_failure_continue() {
        let executor = SequenceExecutor::new();

        let temp_dir = std::env::temp_dir().join("bzzz-seq-continue-test");
        std::fs::create_dir_all(&temp_dir).unwrap();

        let failing_spec_path = temp_dir.join("failing.yaml");
        let mut file = std::fs::File::create(&failing_spec_path).unwrap();
        writeln!(file, "apiVersion: v1").unwrap();
        writeln!(file, "id: failing-agent").unwrap();
        writeln!(file, "runtime:").unwrap();
        writeln!(file, "  kind: Local").unwrap();
        writeln!(file, "  config:").unwrap();
        writeln!(file, "    command: /usr/bin/false").unwrap();
        drop(file);

        let ok_spec_path = temp_dir.join("ok.yaml");
        let mut file = std::fs::File::create(&ok_spec_path).unwrap();
        writeln!(file, "apiVersion: v1").unwrap();
        writeln!(file, "id: ok-agent").unwrap();
        writeln!(file, "runtime:").unwrap();
        writeln!(file, "  kind: Local").unwrap();
        writeln!(file, "  config:").unwrap();
        writeln!(file, "    command: /usr/bin/true").unwrap();
        drop(file);

        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Sequence {
                steps: vec!["failing".into(), "ok".into()],
            },
        )
        .with_worker(Worker::new(
            "failing",
            failing_spec_path.to_string_lossy().to_string(),
        ))
        .with_worker(Worker::new(
            "ok",
            ok_spec_path.to_string_lossy().to_string(),
        ))
        .with_failure_behavior(FailureBehavior::Continue);

        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let result = executor
            .execute(&ctx, &crate::LocalRuntime::new(), &cancel)
            .await
            .unwrap();

        std::fs::remove_dir_all(&temp_dir).ok();

        // Continue: all steps ran, final status is Failed with summary error
        assert_eq!(result.status, RunStatus::Failed);
        assert!(result.error.is_some());
        if let Some(RunError::PatternError { message, .. }) = &result.error {
            assert!(message.contains("failing"));
        }
    }

    /// AC3: Ignore mode — failure is recorded but treated as success
    #[tokio::test]
    async fn test_sequence_on_failure_ignore() {
        let executor = SequenceExecutor::new();

        let temp_dir = std::env::temp_dir().join("bzzz-seq-ignore-test");
        std::fs::create_dir_all(&temp_dir).unwrap();

        let failing_spec_path = temp_dir.join("failing.yaml");
        let mut file = std::fs::File::create(&failing_spec_path).unwrap();
        writeln!(file, "apiVersion: v1").unwrap();
        writeln!(file, "id: failing-agent").unwrap();
        writeln!(file, "runtime:").unwrap();
        writeln!(file, "  kind: Local").unwrap();
        writeln!(file, "  config:").unwrap();
        writeln!(file, "    command: /usr/bin/false").unwrap();
        drop(file);

        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Sequence {
                steps: vec!["failing".into()],
            },
        )
        .with_worker(Worker::new(
            "failing",
            failing_spec_path.to_string_lossy().to_string(),
        ))
        .with_failure_behavior(FailureBehavior::Ignore);

        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let result = executor
            .execute(&ctx, &crate::LocalRuntime::new(), &cancel)
            .await
            .unwrap();

        std::fs::remove_dir_all(&temp_dir).ok();

        // Ignore: failure is treated as success
        assert_eq!(result.status, RunStatus::Completed);
        assert!(result.error.is_none());
    }
}