use async_trait::async_trait;
use crate::template::{ExpressionResolver, HandlebarsResolver};
use crate::{ExecutionMetrics, ExecutionResult, FailureBehavior, FlowPattern, RunError, RunId, RunStatus};
use super::{build_capability_output, execute_worker, PatternContext, PatternExecutor};
pub struct LoopExecutor;
impl Default for LoopExecutor {
fn default() -> Self {
Self::new()
}
}
impl LoopExecutor {
pub fn new() -> Self {
LoopExecutor
}
fn resolve_iterable(&self, over: &str, ctx: &PatternContext) -> Result<Vec<String>, RunError> {
let resolver = HandlebarsResolver::new();
let resolved = resolver
.resolve(over, &ctx.scope)
.map_err(|e| e.to_run_error())?;
if resolved.starts_with('[') && resolved.ends_with(']') {
if let Ok(serde_json::Value::Array(arr)) =
serde_json::from_str::<serde_json::Value>(&resolved)
{
return Ok(arr.iter().map(|v| v.to_string()).collect());
}
}
let resolved_str = resolved.trim();
if resolved_str.contains("..") {
let parts: Vec<&str> = resolved_str.split("..").collect();
if parts.len() == 2 {
let start: usize =
parts[0]
.trim()
.parse()
.map_err(|_| RunError::InvalidConfig {
message: format!("Invalid range start: {}", parts[0]),
})?;
let end: usize = parts[1]
.trim()
.parse()
.map_err(|_| RunError::InvalidConfig {
message: format!("Invalid range end: {}", parts[1]),
})?;
return Ok((start..end).map(|i| i.to_string()).collect());
}
}
if let Ok(count) = resolved_str.parse::<u32>() {
return Ok((0..count).map(|i| i.to_string()).collect());
}
Ok(resolved_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect())
}
}
#[async_trait]
impl PatternExecutor for LoopExecutor {
fn name(&self) -> &'static str {
"loop"
}
async fn execute(
&self,
ctx: &PatternContext,
runtime: &dyn crate::RuntimeAdapter,
cancel: &crate::CancellationToken,
) -> Result<ExecutionResult, RunError> {
let (over, do_worker, max_iterations) = match &ctx.swarm.flow {
FlowPattern::Loop {
over,
do_,
max_iterations,
} => (over.clone(), do_.clone(), *max_iterations),
_ => {
return Err(RunError::PatternError {
pattern: "loop".into(),
step: "flow".into(),
message: "LoopExecutor requires Loop pattern in flow".into(),
})
}
};
let items = self.resolve_iterable(&over, ctx)?;
if items.is_empty() {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Completed,
artifacts: vec![],
error: None,
metrics: ExecutionMetrics::default(),
output: None,
});
}
let worker = ctx
.get_worker(&do_worker)
.ok_or_else(|| RunError::PatternError {
pattern: "loop".into(),
step: do_worker.clone(),
message: format!("Worker '{}' not found in swarm for loop body", do_worker),
})?;
let mut artifacts = vec![];
let mut metrics = ExecutionMetrics::default();
let mut current_ctx = ctx.clone();
let on_failure = ctx.swarm.on_failure;
let mut failed_iterations: Vec<usize> = vec![];
for (iteration_count, item) in items.iter().enumerate() {
if max_iterations > 0 && iteration_count >= max_iterations as usize {
break;
}
if cancel.is_cancelled().await {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts,
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics,
output: None,
});
}
current_ctx.scope.sys.with_iteration(
iteration_count as u32,
serde_json::Value::String(item.clone()),
);
let result = execute_worker(
worker,
runtime,
¤t_ctx.runtime_ctx,
¤t_ctx.scope,
cancel,
)
.await?;
match result.status {
RunStatus::Completed => {
artifacts.extend(result.artifacts);
metrics.wall_time_ms += result.metrics.wall_time_ms;
metrics.retries += result.metrics.retries;
if let Some(output) = &result.output {
current_ctx.add_step_output(&do_worker, output.clone());
}
}
RunStatus::Failed => {
match on_failure {
FailureBehavior::FailFast => {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Failed,
artifacts,
error: result.error,
metrics,
output: None,
});
}
FailureBehavior::Continue | FailureBehavior::Ignore => {
failed_iterations.push(iteration_count);
metrics.wall_time_ms += result.metrics.wall_time_ms;
metrics.retries += result.metrics.retries;
}
}
}
RunStatus::Cancelled => {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts,
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics,
output: None,
});
}
_ => {}
}
}
let (final_status, final_error) = if failed_iterations.is_empty() {
(RunStatus::Completed, None)
} else {
match on_failure {
FailureBehavior::Continue => (
RunStatus::Failed,
Some(RunError::PatternError {
pattern: "loop".into(),
step: "summary".into(),
message: format!(
"{} iteration(s) failed: {}",
failed_iterations.len(),
failed_iterations
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(", ")
),
}),
),
FailureBehavior::Ignore => (RunStatus::Completed, None),
FailureBehavior::FailFast => unreachable!(),
}
};
let result = ExecutionResult {
run_id: RunId::new(),
status: final_status,
artifacts,
error: final_error,
metrics,
output: None,
};
Ok(build_capability_output(
result,
&ctx.swarm,
¤t_ctx.scope,
))
}
async fn on_failure(
&self,
ctx: &mut PatternContext,
_runtime: &dyn crate::RuntimeAdapter,
_failed_worker: &str,
_error: &RunError,
) -> Result<bool, RunError> {
ctx.state.iteration += 1;
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::template::Scope;
use crate::{CancellationToken, ExecutionContext, FailureBehavior, FlowPattern, RuntimeKind, SwarmFile, Worker};
use serde_json::json;
use std::io::Write;
#[test]
fn test_loop_executor_name() {
let executor = LoopExecutor::new();
assert_eq!(executor.name(), "loop");
}
#[test]
fn test_resolve_iterable_range() {
let executor = LoopExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "1..5".into(),
do_: "worker".into(),
max_iterations: 0,
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let items = executor.resolve_iterable("1..5", &ctx).unwrap();
assert_eq!(items, vec!["1", "2", "3", "4"]);
}
#[test]
fn test_resolve_iterable_count() {
let executor = LoopExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "3".into(),
do_: "worker".into(),
max_iterations: 0,
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let items = executor.resolve_iterable("3", &ctx).unwrap();
assert_eq!(items, vec!["0", "1", "2"]);
}
#[test]
fn test_resolve_iterable_list() {
let executor = LoopExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "a,b,c".into(),
do_: "worker".into(),
max_iterations: 0,
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let items = executor.resolve_iterable("a,b,c", &ctx).unwrap();
assert_eq!(items, vec!["a", "b", "c"]);
}
#[test]
fn test_resolve_iterable_template() {
use crate::pattern::PatternState;
use crate::template::Scope;
use serde_json::json;
let executor = LoopExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "{{input.items}}".into(),
do_: "worker".into(),
max_iterations: 0,
},
);
let scope = Scope::with_input(json!({ "items": ["x", "y", "z"] }));
let ctx = PatternContext {
swarm,
runtime_ctx: ExecutionContext::new("ctx", RuntimeKind::Local),
handles: std::collections::HashMap::new(),
state: PatternState::default(),
scope,
};
let items = executor.resolve_iterable("{{input.items}}", &ctx).unwrap();
assert_eq!(items.len(), 3);
}
#[tokio::test]
async fn test_loop_executor_wrong_pattern() {
let executor = LoopExecutor::new();
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await;
assert!(result.is_err());
}
#[test]
fn test_loop_scope_output_last_wins() {
let mut scope = Scope::with_input(json!({}));
scope.add_step_output("printer".to_string(), json!({ "value": "first" }));
let data = scope.to_json();
assert_eq!(data["steps"]["printer"]["output"]["value"], json!("first"));
scope.add_step_output("printer".to_string(), json!({ "value": "final" }));
let data = scope.to_json();
assert_eq!(data["steps"]["printer"]["output"]["value"], json!("final"));
}
#[tokio::test]
async fn test_loop_on_failure_continue() {
let executor = LoopExecutor::new();
let temp_dir = std::env::temp_dir().join("bzzz-loop-continue-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let failing_spec_path = temp_dir.join("failing.yaml");
let mut file = std::fs::File::create(&failing_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: failing-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/false").unwrap();
drop(file);
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "3".into(),
do_: "failing".into(),
max_iterations: 0,
},
)
.with_worker(Worker::new(
"failing",
failing_spec_path.to_string_lossy().to_string(),
))
.with_failure_behavior(FailureBehavior::Continue);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await
.unwrap();
std::fs::remove_dir_all(&temp_dir).ok();
assert_eq!(result.status, RunStatus::Failed);
assert!(result.error.is_some());
if let Some(RunError::PatternError { message, .. }) = &result.error {
assert!(message.contains("3 iteration(s) failed"));
}
}
#[tokio::test]
async fn test_loop_on_failure_ignore() {
let executor = LoopExecutor::new();
let temp_dir = std::env::temp_dir().join("bzzz-loop-ignore-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let failing_spec_path = temp_dir.join("failing.yaml");
let mut file = std::fs::File::create(&failing_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: failing-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/false").unwrap();
drop(file);
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "2".into(),
do_: "failing".into(),
max_iterations: 0,
},
)
.with_worker(Worker::new(
"failing",
failing_spec_path.to_string_lossy().to_string(),
))
.with_failure_behavior(FailureBehavior::Ignore);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await
.unwrap();
std::fs::remove_dir_all(&temp_dir).ok();
assert_eq!(result.status, RunStatus::Completed);
assert!(result.error.is_none());
}
#[test]
fn test_scope_chain_template_resolution() {
use crate::template::{resolve_worker_input, Scope};
use std::collections::HashMap;
let mut scope = Scope::with_input(json!({}));
scope.add_step_output(
"step_a".to_string(),
json!({ "count": 42, "nested": { "key": "hello" } }),
);
let mut input: HashMap<String, serde_json::Value> = HashMap::new();
input.insert("total".to_string(), json!("{{steps.step_a.output.count}}"));
input.insert(
"msg".to_string(),
json!("{{steps.step_a.output.nested.key}}"),
);
let resolved = resolve_worker_input(&input, &scope).unwrap();
assert_eq!(resolved["total"], json!(42));
assert_eq!(resolved["msg"], json!("hello"));
}
}