use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value;
use crate::{
create_runtime,
template::{resolve_expose, resolve_worker_input},
CancellationToken, ExecutionContext, ExecutionHandle, ExecutionResult, RunError,
RuntimeAdapter, RuntimeKind, Scope, SwarmFile, Worker,
};
#[derive(Debug, Clone)]
pub struct PatternContext {
pub swarm: SwarmFile,
pub runtime_ctx: ExecutionContext,
pub handles: HashMap<String, ExecutionHandle>,
pub state: PatternState,
pub scope: Scope,
}
#[derive(Debug, Clone, Default)]
pub struct PatternState {
pub current_step: usize,
pub completed: Vec<String>,
pub failed: Vec<String>,
pub iteration: u32,
pub custom: HashMap<String, String>,
}
impl PatternContext {
pub fn new(swarm: SwarmFile, runtime_ctx: ExecutionContext) -> Self {
PatternContext {
swarm,
runtime_ctx,
handles: HashMap::new(),
state: PatternState::default(),
scope: Scope::empty(),
}
}
pub fn with_input(swarm: SwarmFile, runtime_ctx: ExecutionContext, input: Value) -> Self {
PatternContext {
swarm,
runtime_ctx,
handles: HashMap::new(),
state: PatternState::default(),
scope: Scope::with_input(input),
}
}
pub fn get_worker(&self, name: &str) -> Option<&Worker> {
self.swarm.workers.iter().find(|w| w.name == name)
}
pub fn add_step_output(&mut self, worker_name: &str, output: Value) {
self.scope.add_step_output(worker_name.to_string(), output);
}
}
#[async_trait]
pub trait PatternExecutor: Send + Sync {
fn name(&self) -> &'static str;
async fn execute(
&self,
ctx: &PatternContext,
runtime: &dyn RuntimeAdapter,
cancel: &CancellationToken,
) -> Result<ExecutionResult, RunError>;
async fn execute_with_arc(
&self,
ctx: &PatternContext,
runtime: Arc<dyn RuntimeAdapter>,
cancel: &CancellationToken,
) -> Result<ExecutionResult, RunError> {
self.execute(ctx, runtime.as_ref(), cancel).await
}
async fn on_failure(
&self,
ctx: &mut PatternContext,
runtime: &dyn RuntimeAdapter,
failed_worker: &str,
error: &RunError,
) -> Result<bool, RunError>;
}
async fn execute_a2a_worker(
worker: &Worker,
url: &str,
input_value: Option<serde_json::Value>,
) -> Result<ExecutionResult, RunError> {
let a2a_runtime = crate::runtime::A2ARuntime::new();
let a2a_spec = crate::AgentSpec::new(worker.name.clone(), crate::RuntimeKind::Http);
let a2a_ctx = a2a_runtime.create(&a2a_spec).await?;
let mut run = crate::Run::new(
crate::RunTarget::A2AAgent {
url: url.to_string(),
},
crate::RuntimeKind::Http,
);
if let Some(v) = input_value {
run = run.with_input(v);
}
let handle = a2a_runtime.execute(&a2a_ctx, &run).await?;
a2a_runtime.wait(&handle).await
}
pub async fn execute_worker(
worker: &Worker,
default_runtime: &dyn RuntimeAdapter,
ctx: &ExecutionContext,
scope: &Scope,
cancel: &CancellationToken,
) -> Result<ExecutionResult, RunError> {
if cancel.is_cancelled().await {
return Err(RunError::Cancelled {
reason: "Execution cancelled".into(),
});
}
let input_value = if !worker.input.is_empty() {
let resolved_input =
resolve_worker_input(&worker.input, scope).map_err(|e| e.to_run_error())?;
let v = serde_json::to_value(&resolved_input).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to serialize input: {}", e),
})?;
Some(v)
} else {
None
};
let core = execute_worker_core(worker, default_runtime, ctx, input_value);
match worker.timeout {
Some(duration) => {
match tokio::time::timeout(duration, core).await {
Ok(result) => result,
Err(_elapsed) => Err(RunError::WorkerTimeout {
worker: worker.name.clone(),
after: duration,
}),
}
}
None => core.await,
}
}
async fn execute_worker_core(
worker: &Worker,
default_runtime: &dyn RuntimeAdapter,
ctx: &ExecutionContext,
input_value: Option<Value>,
) -> Result<ExecutionResult, RunError> {
if let Some(a2a_url) = &worker.a2a {
return execute_a2a_worker(worker, a2a_url, input_value).await;
}
let runtime_kind = worker.runtime.unwrap_or_else(|| default_runtime.kind());
let runtime: Arc<dyn RuntimeAdapter> = if worker.runtime.is_some() {
create_runtime(runtime_kind)?
} else {
return execute_with_runtime(worker, default_runtime, ctx, runtime_kind, input_value).await;
};
execute_with_runtime(worker, runtime.as_ref(), ctx, runtime_kind, input_value).await
}
pub async fn execute_worker_with_arc(
worker: &Worker,
runtime: Arc<dyn RuntimeAdapter>,
ctx: &ExecutionContext,
scope: &Scope,
cancel: &CancellationToken,
) -> Result<ExecutionResult, RunError> {
if cancel.is_cancelled().await {
return Err(RunError::Cancelled {
reason: "Execution cancelled".into(),
});
}
let input_value = if !worker.input.is_empty() {
let resolved_input =
resolve_worker_input(&worker.input, scope).map_err(|e| e.to_run_error())?;
let v = serde_json::to_value(&resolved_input).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to serialize input: {}", e),
})?;
Some(v)
} else {
None
};
let worker_name = worker.name.clone();
let core = execute_worker_with_arc_core(worker, runtime, ctx, input_value);
match worker.timeout {
Some(duration) => {
match tokio::time::timeout(duration, core).await {
Ok(result) => result,
Err(_elapsed) => Err(RunError::WorkerTimeout {
worker: worker_name,
after: duration,
}),
}
}
None => core.await,
}
}
async fn execute_worker_with_arc_core(
worker: &Worker,
runtime: Arc<dyn RuntimeAdapter>,
ctx: &ExecutionContext,
input_value: Option<Value>,
) -> Result<ExecutionResult, RunError> {
if let Some(a2a_url) = &worker.a2a {
return execute_a2a_worker(worker, a2a_url, input_value).await;
}
let runtime_kind = worker.runtime.unwrap_or_else(|| runtime.kind());
let actual_runtime: Arc<dyn RuntimeAdapter> = if worker.runtime.is_some() {
create_runtime(runtime_kind)?
} else {
runtime
};
let mut run = crate::Run::new(
crate::RunTarget::Agent {
spec_path: worker.spec.clone().unwrap_or_default(),
},
runtime_kind,
);
if let Some(v) = input_value {
run = run.with_input(v);
}
let handle = actual_runtime.execute(ctx, &run).await?;
actual_runtime.wait(&handle).await
}
async fn execute_with_runtime(
worker: &Worker,
runtime: &dyn RuntimeAdapter,
ctx: &ExecutionContext,
runtime_kind: RuntimeKind,
input: Option<Value>,
) -> Result<ExecutionResult, RunError> {
let _spec = crate::AgentSpec::new(worker.name.clone(), runtime_kind);
let mut run = crate::Run::new(
crate::RunTarget::Agent {
spec_path: worker.spec.clone().unwrap_or_default(),
},
runtime_kind,
);
if let Some(v) = input {
run = run.with_input(v);
}
let handle = runtime.execute(ctx, &run).await?;
runtime.wait(&handle).await
}
pub fn apply_expose_resolution(
swarm: &SwarmFile,
scope: &Scope,
) -> Result<Option<Value>, RunError> {
if swarm.expose.is_empty() {
return Ok(None);
}
let output = resolve_expose(&swarm.expose, scope).map_err(|e| RunError::PatternError {
pattern: "expose".into(),
step: "resolution".into(),
message: e.to_string(),
})?;
Ok(Some(output))
}
pub fn apply_output_behavior(swarm: &SwarmFile, scope: &Scope) -> Option<Value> {
use crate::OutputBehavior;
if !swarm.expose.is_empty() {
return apply_expose_resolution(swarm, scope).unwrap_or(None);
}
let effective_output = match (&swarm.output, &swarm.flow) {
(OutputBehavior::Last, crate::FlowPattern::Parallel { .. }) => OutputBehavior::All,
(other, _) => *other,
};
match effective_output {
OutputBehavior::Last => {
let steps = match &swarm.flow {
crate::FlowPattern::Sequence { steps } => steps,
_ => return None,
};
steps
.last()
.and_then(|step_name| scope.steps.get(step_name).map(|s| s.output.clone()))
}
OutputBehavior::All => {
let outputs: serde_json::Map<String, Value> = scope
.steps
.iter()
.map(|(name, output)| (name.clone(), output.output.clone()))
.collect();
if outputs.is_empty() {
None
} else {
Some(Value::Object(outputs))
}
}
OutputBehavior::Aggregate => {
None
}
}
}
pub fn build_capability_output(
result: ExecutionResult,
swarm: &SwarmFile,
scope: &Scope,
) -> ExecutionResult {
let output = if !swarm.expose.is_empty() {
apply_expose_resolution(swarm, scope).unwrap_or(None)
} else {
apply_output_behavior(swarm, scope)
};
match output {
Some(v) => result.with_exposed_output(v),
None => result,
}
}
pub async fn execute_with_timeout(
executor: &dyn PatternExecutor,
ctx: &PatternContext,
runtime: Arc<dyn RuntimeAdapter>,
cancel: &CancellationToken,
) -> Result<ExecutionResult, RunError> {
match ctx.swarm.timeout {
Some(duration) => {
match tokio::time::timeout(
duration,
executor.execute_with_arc(ctx, runtime, cancel),
)
.await
{
Ok(result) => result,
Err(_elapsed) => {
cancel.cancel().await;
Err(RunError::Timeout { after: duration })
}
}
}
None => executor.execute_with_arc(ctx, runtime, cancel).await,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{FlowPattern, LocalRuntime, Worker};
#[test]
fn test_pattern_context_creation() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let runtime_ctx = ExecutionContext::new("ctx-1", crate::RuntimeKind::Local);
let ctx = PatternContext::new(swarm, runtime_ctx);
assert_eq!(ctx.swarm.id.as_str(), "test");
assert_eq!(ctx.state.current_step, 0);
}
#[test]
fn test_get_worker() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_worker(Worker::new("w1", "agent.yaml"));
let runtime_ctx = ExecutionContext::new("ctx-1", crate::RuntimeKind::Local);
let ctx = PatternContext::new(swarm, runtime_ctx);
assert!(ctx.get_worker("w1").is_some());
assert!(ctx.get_worker("nonexistent").is_none());
}
#[tokio::test]
async fn test_execute_worker_default_runtime() {
let worker = Worker::new("test-worker", "agent.yaml");
let runtime = LocalRuntime::new();
let ctx = ExecutionContext::new("test", RuntimeKind::Local);
let scope = Scope::empty();
let cancel = CancellationToken::new();
let result = execute_worker(&worker, &runtime, &ctx, &scope, &cancel).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_execute_worker_with_runtime_override() {
let worker = Worker::new("test-worker", "agent.yaml").with_runtime(RuntimeKind::Docker);
let runtime = LocalRuntime::new();
let ctx = ExecutionContext::new("test", RuntimeKind::Local);
let scope = Scope::empty();
let cancel = CancellationToken::new();
let _ = execute_worker(&worker, &runtime, &ctx, &scope, &cancel).await;
}
#[test]
fn test_apply_expose_resolution_empty() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let scope = Scope::empty();
let result = apply_expose_resolution(&swarm, &scope).unwrap();
assert!(result.is_none());
}
#[test]
fn test_apply_expose_resolution_with_mapping() {
use crate::ExposeMapping;
use serde_json::json;
let mut scope = Scope::empty();
scope.add_step_output(
"parser".to_string(),
json!({ "items": ["a", "b"], "count": 2 }),
);
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["parser".into()],
},
)
.with_expose(ExposeMapping::new("results", "steps.parser.output.items"))
.with_expose(ExposeMapping::new("total", "steps.parser.output.count"));
let result = apply_expose_resolution(&swarm, &scope).unwrap();
assert!(result.is_some());
let output = result.unwrap();
assert_eq!(output["results"], json!(["a", "b"]));
assert_eq!(output["total"], 2);
}
#[test]
fn test_apply_output_behavior_last() {
use serde_json::json;
let mut scope = Scope::empty();
scope.add_step_output("step1".to_string(), json!({ "a": 1 }));
scope.add_step_output("step2".to_string(), json!({ "b": 2 }));
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["step1".into(), "step2".into()],
},
);
let result = apply_output_behavior(&swarm, &scope);
assert!(result.is_some());
let output = result.unwrap();
assert_eq!(output["b"], 2);
}
#[test]
fn test_apply_output_behavior_all() {
use crate::OutputBehavior;
use serde_json::json;
let mut scope = Scope::empty();
scope.add_step_output("step1".to_string(), json!({ "a": 1 }));
scope.add_step_output("step2".to_string(), json!({ "b": 2 }));
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["step1".into(), "step2".into()],
},
)
.with_output_behavior(OutputBehavior::All);
let result = apply_output_behavior(&swarm, &scope);
assert!(result.is_some());
let output = result.unwrap();
assert_eq!(output["step1"]["a"], 1);
assert_eq!(output["step2"]["b"], 2);
}
#[test]
fn test_build_capability_output_with_expose() {
use crate::{ExposeMapping, RunId, RunStatus};
use serde_json::json;
let mut scope = Scope::empty();
scope.add_step_output("worker".to_string(), json!({ "value": 42 }));
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["worker".into()],
},
)
.with_expose(ExposeMapping::new("result", "steps.worker.output.value"));
let result = ExecutionResult::new(RunId::new(), RunStatus::Completed);
let result = build_capability_output(result, &swarm, &scope);
assert!(result.output.is_some());
assert_eq!(result.output.unwrap()["result"], 42);
}
#[test]
fn test_build_capability_output_without_expose() {
use crate::{RunId, RunStatus};
use serde_json::json;
let mut scope = Scope::empty();
scope.add_step_output("step1".to_string(), json!({ "a": 1 }));
scope.add_step_output("step2".to_string(), json!({ "b": 2 }));
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["step1".into(), "step2".into()],
},
);
let result = ExecutionResult::new(RunId::new(), RunStatus::Completed);
let result = build_capability_output(result, &swarm, &scope);
assert!(result.output.is_some());
assert_eq!(result.output.unwrap()["b"], 2);
}
#[tokio::test]
async fn test_execute_with_timeout_none_completes() {
use crate::{create_runtime, RunStatus};
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["w1".into()],
},
)
.with_worker(Worker::new("w1", "agent.yaml"));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let executor = crate::SequenceExecutor::new();
let runtime = create_runtime(RuntimeKind::Local).unwrap();
let result = execute_with_timeout(&executor, &ctx, runtime, &cancel)
.await
.unwrap();
assert_eq!(result.status, RunStatus::Completed);
}
#[tokio::test]
async fn test_execute_with_generous_timeout_completes() {
use crate::{create_runtime, RunStatus};
use std::time::Duration;
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["w1".into()],
},
)
.with_worker(Worker::new("w1", "agent.yaml"))
.with_timeout(Duration::from_secs(60));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let executor = crate::SequenceExecutor::new();
let runtime = create_runtime(RuntimeKind::Local).unwrap();
let result = execute_with_timeout(&executor, &ctx, runtime, &cancel)
.await
.unwrap();
assert_eq!(result.status, RunStatus::Completed);
}
#[tokio::test]
async fn test_execute_with_timeout_fires() {
use crate::{create_runtime, RunError};
use std::io::Write;
use std::time::Duration;
let temp_dir = std::env::temp_dir().join("bzzz-timeout-fires-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let slow_spec_path = temp_dir.join("slow.yaml");
let mut f = std::fs::File::create(&slow_spec_path).unwrap();
writeln!(f, "apiVersion: v1").unwrap();
writeln!(f, "id: slow-agent").unwrap();
writeln!(f, "runtime:").unwrap();
writeln!(f, " kind: Local").unwrap();
writeln!(f, " config:").unwrap();
writeln!(f, " command: sleep 30").unwrap();
drop(f);
let timeout_dur = Duration::from_millis(50);
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["slow".into()],
},
)
.with_worker(Worker::new(
"slow",
slow_spec_path.to_string_lossy().to_string(),
))
.with_timeout(timeout_dur);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let executor = crate::SequenceExecutor::new();
let runtime = create_runtime(RuntimeKind::Local).unwrap();
let result = execute_with_timeout(&executor, &ctx, runtime, &cancel).await;
std::fs::remove_dir_all(&temp_dir).ok();
assert!(result.is_err());
match result.unwrap_err() {
RunError::Timeout { after } => assert_eq!(after, timeout_dur),
other => panic!("Expected RunError::Timeout, got: {:?}", other),
}
}
#[tokio::test]
async fn test_execute_with_timeout_cancels_token() {
use crate::{create_runtime, RunError};
use std::io::Write;
use std::time::Duration;
let temp_dir = std::env::temp_dir().join("bzzz-timeout-cancel-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let slow_spec_path = temp_dir.join("slow.yaml");
let mut f = std::fs::File::create(&slow_spec_path).unwrap();
writeln!(f, "apiVersion: v1").unwrap();
writeln!(f, "id: slow-agent").unwrap();
writeln!(f, "runtime:").unwrap();
writeln!(f, " kind: Local").unwrap();
writeln!(f, " config:").unwrap();
writeln!(f, " command: sleep 30").unwrap();
drop(f);
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["slow".into()],
},
)
.with_worker(Worker::new(
"slow",
slow_spec_path.to_string_lossy().to_string(),
))
.with_timeout(Duration::from_millis(50));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let executor = crate::SequenceExecutor::new();
let runtime = create_runtime(RuntimeKind::Local).unwrap();
let result = execute_with_timeout(&executor, &ctx, runtime, &cancel).await;
std::fs::remove_dir_all(&temp_dir).ok();
assert!(matches!(result, Err(RunError::Timeout { .. })));
assert!(cancel.is_cancelled().await);
}
}