bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! Alongside pattern executor
//!
//! Executes main worker normally, side workers in background via real runtime.
//! Failure semantics: side failure doesn't affect main, main failure cancels side.
//!
//! ## CR2: CapabilityOutput
//!
//! After main worker completes, applies expose resolution or output behavior
//! to produce the final capability output.

use std::sync::Arc;

use async_trait::async_trait;
use tokio::task::JoinSet;

use crate::{ExecutionMetrics, ExecutionResult, FlowPattern, RunError, RunId, RunStatus};

use super::{build_capability_output, execute_worker_with_arc, PatternContext, PatternExecutor};

/// Alongside pattern executor
pub struct AlongsideExecutor;

impl AlongsideExecutor {
    /// Create a new alongside executor
    pub fn new() -> Self {
        AlongsideExecutor
    }
}

impl Default for AlongsideExecutor {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl PatternExecutor for AlongsideExecutor {
    fn name(&self) -> &'static str {
        "alongside"
    }

    async fn execute(
        &self,
        _ctx: &PatternContext,
        _runtime: &dyn crate::RuntimeAdapter,
        _cancel: &crate::CancellationToken,
    ) -> Result<ExecutionResult, RunError> {
        // AlongsideExecutor requires Arc runtime for concurrent side worker execution
        Err(RunError::RuntimeError {
            message: "AlongsideExecutor requires Arc runtime. Use execute_with_arc() instead."
                .into(),
        })
    }

    async fn execute_with_arc(
        &self,
        ctx: &PatternContext,
        runtime: Arc<dyn crate::RuntimeAdapter>,
        cancel: &crate::CancellationToken,
    ) -> Result<ExecutionResult, RunError> {
        let (main_worker_name, side_worker_names) = match &ctx.swarm.flow {
            FlowPattern::Alongside { main, side } => (main.clone(), side.clone()),
            _ => {
                return Err(RunError::PatternError {
                    pattern: "alongside".into(),
                    step: "flow".into(),
                    message: "AlongsideExecutor requires Alongside pattern in flow".into(),
                })
            }
        };

        // Check cancellation before launching
        if cancel.is_cancelled().await {
            return Ok(ExecutionResult {
                run_id: RunId::new(),
                status: RunStatus::Cancelled,
                artifacts: vec![],
                error: Some(RunError::Cancelled {
                    reason: "Execution cancelled".into(),
                }),
                metrics: ExecutionMetrics::default(),
                output: None,
            });
        }

        // Get main worker
        let main = ctx
            .get_worker(&main_worker_name)
            .ok_or_else(|| RunError::PatternError {
                pattern: "alongside".into(),
                step: main_worker_name.clone(),
                message: format!("Main worker '{}' not found in swarm", main_worker_name),
            })?
            .clone();

        // Spawn side workers via real runtime in background (parallel with main)
        let mut side_tasks: JoinSet<Option<ExecutionResult>> = JoinSet::new();

        for side_name in side_worker_names.iter() {
            let side_worker = ctx
                .get_worker(side_name)
                .ok_or_else(|| RunError::PatternError {
                    pattern: "alongside".into(),
                    step: side_name.clone(),
                    message: format!("Side worker '{}' not found in swarm", side_name),
                })?
                .clone();

            let runtime_clone = runtime.clone();
            let runtime_ctx = ctx.runtime_ctx.clone();
            let scope = ctx.scope.clone();
            let cancel_clone = cancel.clone();

            side_tasks.spawn(async move {
                if cancel_clone.is_cancelled().await {
                    return None;
                }
                // Execute side worker through real runtime
                execute_worker_with_arc(
                    &side_worker,
                    runtime_clone,
                    &runtime_ctx,
                    &scope,
                    &cancel_clone,
                )
                .await
                .ok()
            });
        }

        // Execute main worker via real runtime (runs concurrently with side workers above)
        let main_result =
            execute_worker_with_arc(&main, runtime, &ctx.runtime_ctx, &ctx.scope, cancel).await?;

        // Main result determines outcome
        match main_result.status {
            RunStatus::Completed => {
                // Collect side results (side failures are ignored per alongside semantics)
                let mut all_artifacts = main_result.artifacts.clone();

                while let Some(side_result) = side_tasks.join_next().await {
                    if let Ok(Some(result)) = side_result {
                        all_artifacts.extend(result.artifacts);
                    }
                }

                // CR2: Build scope with main output for expose resolution
                let mut final_scope = ctx.scope.clone();
                if let Some(output) = &main_result.output {
                    final_scope.add_step_output(main_worker_name.clone(), output.clone());
                }

                let result = ExecutionResult {
                    run_id: RunId::new(),
                    status: RunStatus::Completed,
                    artifacts: all_artifacts,
                    error: None,
                    metrics: main_result.metrics,
                    output: None,
                };

                Ok(build_capability_output(result, &ctx.swarm, &final_scope))
            }
            RunStatus::Failed => {
                // Main failed — cancel and discard side workers
                cancel.cancel().await;
                side_tasks.abort_all();

                Ok(ExecutionResult {
                    run_id: RunId::new(),
                    status: RunStatus::Failed,
                    artifacts: main_result.artifacts,
                    error: main_result.error,
                    metrics: main_result.metrics,
                    output: None,
                })
            }
            RunStatus::Cancelled => {
                // Main cancelled — cancel and discard side workers
                cancel.cancel().await;
                side_tasks.abort_all();

                Ok(ExecutionResult {
                    run_id: RunId::new(),
                    status: RunStatus::Cancelled,
                    artifacts: vec![],
                    error: Some(RunError::Cancelled {
                        reason: "Execution cancelled".into(),
                    }),
                    metrics: main_result.metrics,
                    output: None,
                })
            }
            _ => {
                cancel.cancel().await;
                side_tasks.abort_all();

                Ok(ExecutionResult {
                    run_id: RunId::new(),
                    status: main_result.status,
                    artifacts: main_result.artifacts,
                    error: main_result.error,
                    metrics: main_result.metrics,
                    output: None,
                })
            }
        }
    }

    async fn on_failure(
        &self,
        ctx: &mut PatternContext,
        _runtime: &dyn crate::RuntimeAdapter,
        failed_worker: &str,
        _error: &RunError,
    ) -> Result<bool, RunError> {
        let main = match &ctx.swarm.flow {
            FlowPattern::Alongside { main, .. } => main,
            _ => return Ok(false),
        };

        // If main failed, stop everything
        if failed_worker == main {
            return Ok(false);
        }

        // Side worker failure: continue (doesn't affect main)
        ctx.state.failed.push(failed_worker.to_string());
        Ok(true)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{CancellationToken, ExecutionContext, FlowPattern, RuntimeKind, SwarmFile, Worker};

    #[test]
    fn test_alongside_executor_name() {
        let executor = AlongsideExecutor::new();
        assert_eq!(executor.name(), "alongside");
    }

    /// execute() must return an error directing callers to use execute_with_arc()
    #[tokio::test]
    async fn test_alongside_execute_requires_arc() {
        let executor = AlongsideExecutor::new();
        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Alongside {
                main: "main".into(),
                side: vec![],
            },
        );
        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let result = executor
            .execute(&ctx, &crate::LocalRuntime::new(), &cancel)
            .await;

        assert!(result.is_err());
        match result.unwrap_err() {
            RunError::RuntimeError { message } => {
                assert!(
                    message.contains("execute_with_arc"),
                    "Error message should mention execute_with_arc, got: {}",
                    message
                );
            }
            other => panic!("Expected RuntimeError, got: {:?}", other),
        }
    }

    #[tokio::test]
    async fn test_alongside_executor_wrong_pattern() {
        let executor = AlongsideExecutor::new();
        let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
        let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;
        assert!(result.is_err());
    }

    /// AC2 + AC3: Main and side workers execute through real runtime in parallel
    #[tokio::test]
    async fn test_alongside_executor_real_execution() {
        let executor = AlongsideExecutor::new();

        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Alongside {
                main: "main".into(),
                side: vec!["side1".into()],
            },
        )
        .with_worker(Worker::new("main", "agent.yaml"))
        .with_worker(Worker::new("side1", "agent.yaml"));

        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
        let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;

        assert!(result.is_ok());
        let result = result.unwrap();
        assert_eq!(result.status, RunStatus::Completed);
    }

    /// AC4 (alongside semantics): Side worker failure does not affect main outcome
    #[tokio::test]
    async fn test_alongside_side_failure_does_not_affect_main() {
        use std::io::Write;

        let executor = AlongsideExecutor::new();

        let temp_dir = std::env::temp_dir().join("bzzz-alongside-side-fail-test");
        std::fs::create_dir_all(&temp_dir).unwrap();

        let failing_spec_path = temp_dir.join("failing.yaml");
        let mut file = std::fs::File::create(&failing_spec_path).unwrap();
        writeln!(file, "apiVersion: v1").unwrap();
        writeln!(file, "id: failing-agent").unwrap();
        writeln!(file, "runtime:").unwrap();
        writeln!(file, "  kind: Local").unwrap();
        writeln!(file, "  config:").unwrap();
        writeln!(file, "    command: /usr/bin/false").unwrap();
        drop(file);

        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Alongside {
                main: "main".into(),
                side: vec!["failing-side".into()],
            },
        )
        .with_worker(Worker::new("main", "agent.yaml"))
        .with_worker(Worker::new(
            "failing-side",
            failing_spec_path.to_string_lossy().to_string(),
        ));

        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();

        let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
        let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;

        std::fs::remove_dir_all(&temp_dir).ok();

        // Main succeeded, side failed — overall must be Completed (alongside semantics)
        assert!(result.is_ok());
        let result = result.unwrap();
        assert_eq!(result.status, RunStatus::Completed);
    }

    /// Cancellation before execution returns Cancelled status
    #[tokio::test]
    async fn test_alongside_cancellation_before_start() {
        let executor = AlongsideExecutor::new();

        let swarm = SwarmFile::new(
            "test",
            FlowPattern::Alongside {
                main: "main".into(),
                side: vec!["side1".into()],
            },
        )
        .with_worker(Worker::new("main", "agent.yaml"))
        .with_worker(Worker::new("side1", "agent.yaml"));

        let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
        let cancel = CancellationToken::new();
        cancel.cancel().await;

        let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
        let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;

        assert!(result.is_ok());
        let result = result.unwrap();
        assert_eq!(result.status, RunStatus::Cancelled);
    }
}