use async_trait::async_trait;
use crate::{ExecutionMetrics, ExecutionResult, FailureBehavior, FlowPattern, RunError, RunId, RunStatus};
use super::{build_capability_output, execute_worker, PatternContext, PatternExecutor};
pub struct SequenceExecutor;
impl SequenceExecutor {
pub fn new() -> Self {
SequenceExecutor
}
}
impl Default for SequenceExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl PatternExecutor for SequenceExecutor {
fn name(&self) -> &'static str {
"sequence"
}
async fn execute(
&self,
ctx: &PatternContext,
runtime: &dyn crate::RuntimeAdapter,
cancel: &crate::CancellationToken,
) -> Result<ExecutionResult, RunError> {
let steps = match &ctx.swarm.flow {
FlowPattern::Sequence { steps } => steps,
_ => {
return Err(RunError::PatternError {
pattern: "sequence".into(),
step: "flow".into(),
message: "SequenceExecutor requires Sequence pattern in flow".into(),
})
}
};
if steps.is_empty() {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Completed,
artifacts: vec![],
error: None,
metrics: ExecutionMetrics::default(),
output: None,
});
}
let mut artifacts = vec![];
let mut current_ctx = ctx.clone();
let on_failure = ctx.swarm.on_failure;
let mut failed_steps: Vec<String> = vec![];
for (idx, step_name) in steps.iter().enumerate() {
current_ctx.state.current_step = idx;
if cancel.is_cancelled().await {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts,
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics: ExecutionMetrics::default(),
output: None,
});
}
let worker =
current_ctx
.get_worker(step_name)
.ok_or_else(|| RunError::PatternError {
pattern: "sequence".into(),
step: step_name.clone(),
message: format!("Worker '{}' not found in swarm", step_name),
})?;
let result = execute_worker(
worker,
runtime,
¤t_ctx.runtime_ctx,
¤t_ctx.scope,
cancel,
)
.await?;
match result.status {
RunStatus::Completed => {
artifacts.extend(result.artifacts);
if let Some(output) = &result.output {
current_ctx.add_step_output(step_name, output.clone());
}
}
RunStatus::Failed => {
match on_failure {
FailureBehavior::FailFast => {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Failed,
artifacts,
error: result.error,
metrics: result.metrics,
output: None,
});
}
FailureBehavior::Continue | FailureBehavior::Ignore => {
failed_steps.push(step_name.clone());
}
}
}
RunStatus::Cancelled => {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts,
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics: result.metrics,
output: None,
});
}
_ => {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Failed,
artifacts,
error: Some(RunError::RuntimeError {
message: format!("Unexpected status after wait: {:?}", result.status),
}),
metrics: result.metrics,
output: None,
});
}
}
}
let (final_status, final_error) = if failed_steps.is_empty() {
(RunStatus::Completed, None)
} else {
match on_failure {
FailureBehavior::Continue => (
RunStatus::Failed,
Some(RunError::PatternError {
pattern: "sequence".into(),
step: "summary".into(),
message: format!(
"{} worker(s) failed: {}",
failed_steps.len(),
failed_steps.join(", ")
),
}),
),
FailureBehavior::Ignore => (RunStatus::Completed, None),
FailureBehavior::FailFast => unreachable!(),
}
};
let result = ExecutionResult {
run_id: RunId::new(),
status: final_status,
artifacts,
error: final_error,
metrics: ExecutionMetrics::default(),
output: None,
};
Ok(build_capability_output(
result,
&ctx.swarm,
¤t_ctx.scope,
))
}
async fn on_failure(
&self,
_ctx: &mut PatternContext,
_runtime: &dyn crate::RuntimeAdapter,
_failed_worker: &str,
_error: &RunError,
) -> Result<bool, RunError> {
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{CancellationToken, ExecutionContext, FailureBehavior, RuntimeKind, Scope, SwarmFile, Worker};
use serde_json::json;
use std::io::Write;
#[test]
fn test_sequence_executor_name() {
let executor = SequenceExecutor::new();
assert_eq!(executor.name(), "sequence");
}
#[tokio::test]
async fn test_sequence_executor_wrong_pattern() {
let executor = SequenceExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Parallel {
branches: vec![],
fail_fast: false,
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await;
assert!(result.is_err());
}
#[test]
fn test_scope_propagation() {
let mut scope = Scope::empty();
scope.add_step_output("step1".to_string(), json!({ "result": "data" }));
assert!(scope.steps.contains_key("step1"));
assert_eq!(scope.steps["step1"].output["result"], "data");
let scope_json = scope.to_json();
assert_eq!(scope_json["steps"]["step1"]["output"]["result"], "data");
}
#[tokio::test]
async fn test_sequence_on_failure_continue() {
let executor = SequenceExecutor::new();
let temp_dir = std::env::temp_dir().join("bzzz-seq-continue-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let failing_spec_path = temp_dir.join("failing.yaml");
let mut file = std::fs::File::create(&failing_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: failing-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/false").unwrap();
drop(file);
let ok_spec_path = temp_dir.join("ok.yaml");
let mut file = std::fs::File::create(&ok_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: ok-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/true").unwrap();
drop(file);
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["failing".into(), "ok".into()],
},
)
.with_worker(Worker::new(
"failing",
failing_spec_path.to_string_lossy().to_string(),
))
.with_worker(Worker::new(
"ok",
ok_spec_path.to_string_lossy().to_string(),
))
.with_failure_behavior(FailureBehavior::Continue);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await
.unwrap();
std::fs::remove_dir_all(&temp_dir).ok();
assert_eq!(result.status, RunStatus::Failed);
assert!(result.error.is_some());
if let Some(RunError::PatternError { message, .. }) = &result.error {
assert!(message.contains("failing"));
}
}
#[tokio::test]
async fn test_sequence_on_failure_ignore() {
let executor = SequenceExecutor::new();
let temp_dir = std::env::temp_dir().join("bzzz-seq-ignore-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let failing_spec_path = temp_dir.join("failing.yaml");
let mut file = std::fs::File::create(&failing_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: failing-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/false").unwrap();
drop(file);
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["failing".into()],
},
)
.with_worker(Worker::new(
"failing",
failing_spec_path.to_string_lossy().to_string(),
))
.with_failure_behavior(FailureBehavior::Ignore);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await
.unwrap();
std::fs::remove_dir_all(&temp_dir).ok();
assert_eq!(result.status, RunStatus::Completed);
assert!(result.error.is_none());
}
}