use std::collections::HashMap;
use async_trait::async_trait;
use serde_json::Value;
use crate::{
template::{resolve_path_value, ExpressionResolver, HandlebarsResolver},
ExecutionMetrics, ExecutionResult, FlowPattern, RunError, RunId, RunStatus, RunTarget, Scope,
SwarmFile, Worker,
};
use super::{build_capability_output, execute_worker, PatternContext, PatternExecutor};
const DEFAULT_MAX_NESTING_DEPTH: u32 = crate::MAX_NESTING_DEPTH;
pub struct DelegateExecutor {
max_depth: u32,
}
impl Default for DelegateExecutor {
fn default() -> Self {
Self::new()
}
}
impl DelegateExecutor {
pub fn new() -> Self {
DelegateExecutor {
max_depth: DEFAULT_MAX_NESTING_DEPTH,
}
}
pub fn with_max_depth(max_depth: u32) -> Self {
DelegateExecutor { max_depth }
}
fn resolve_worker_expr(&self, expr: &str, scope: &Scope) -> Result<String, RunError> {
let resolver = HandlebarsResolver::new();
let resolved = resolver
.resolve(expr, scope)
.map_err(|e| RunError::PatternError {
pattern: "delegate".into(),
step: "worker_expr".into(),
message: format!("Failed to resolve worker_expr '{}': {}", expr, e),
})?;
let worker_name = resolved.trim();
if worker_name.is_empty() {
return Err(RunError::PatternError {
pattern: "delegate".into(),
step: "worker_expr".into(),
message: format!("Resolved worker_expr '{}' is empty", expr),
});
}
Ok(worker_name.to_string())
}
fn find_worker<'a>(&self, worker_name: &str, swarm: &'a SwarmFile) -> Option<&'a Worker> {
swarm.workers.iter().find(|w| w.name == worker_name)
}
fn build_selection_trace(
&self,
expr: &str,
resolved_name: &str,
found: bool,
fallback_used: Option<&str>,
) -> String {
let status = if found {
"selected"
} else if fallback_used.is_some() {
"fallback"
} else {
"failed"
};
let fallback_str = fallback_used
.map(|f| format!(" fallback={}", f))
.unwrap_or_default();
format!(
"expr={} resolved={} status={}{}",
expr, resolved_name, status, fallback_str
)
}
fn load_delegate_swarm(
&self,
path: &std::path::PathBuf,
ctx: &PatternContext,
) -> Result<SwarmFile, RunError> {
let default_path = std::path::PathBuf::from(".");
let base_dir = ctx
.swarm
.file_path
.as_ref()
.and_then(|p| p.parent())
.unwrap_or(&default_path);
let full_path = base_dir.join(path);
SwarmFile::from_yaml_file(&full_path)
}
fn build_nested_scope(
&self,
parent_scope: &Scope,
input_mapping: &HashMap<String, Value>,
) -> Result<Scope, RunError> {
let mut nested_scope = Scope::empty();
let mut nested_input = serde_json::Map::new();
for (key, value) in input_mapping {
let resolver = HandlebarsResolver::new();
let resolved = resolver.resolve_value(value, parent_scope).map_err(|e| {
RunError::PatternError {
pattern: "delegate".into(),
step: "input_mapping".into(),
message: format!("Failed to resolve input_mapping '{}': {}", key, e),
}
})?;
nested_input.insert(key.clone(), resolved);
}
nested_scope.input = Value::Object(nested_input);
nested_scope.steps = parent_scope.steps.clone();
nested_scope.env = parent_scope.env.clone();
nested_scope.sys = crate::template::SystemVariables {
run_id: parent_scope.sys.run_id.clone(),
step_id: format!("delegate-{}", parent_scope.sys.step_id),
timestamp: chrono::Utc::now().to_rfc3339(),
swarm_id: parent_scope.sys.swarm_id.clone(),
iteration_index: parent_scope.sys.iteration_index,
iteration_value: parent_scope.sys.iteration_value.clone(),
};
Ok(nested_scope)
}
fn apply_output_mapping(
&self,
nested_output: &Value,
parent_scope: &Scope,
output_mapping: &HashMap<String, Value>,
) -> Result<Value, RunError> {
let mut mapping_scope = parent_scope.clone();
mapping_scope.add_step_output("nested".to_string(), nested_output.clone());
let mut result = serde_json::Map::new();
for (key, value) in output_mapping {
let resolved = match value {
Value::String(s)
if s.contains("{{") && s.starts_with("{{") && s.ends_with("}}") =>
{
resolve_path_value(s, &mapping_scope).map_err(|e| RunError::PatternError {
pattern: "delegate".into(),
step: "output_mapping".into(),
message: format!("Failed to resolve output_mapping '{}': {}", key, e),
})?
}
_ => {
let resolver = HandlebarsResolver::new();
resolver.resolve_value(value, &mapping_scope).map_err(|e| {
RunError::PatternError {
pattern: "delegate".into(),
step: "output_mapping".into(),
message: format!("Failed to resolve output_mapping '{}': {}", key, e),
}
})?
}
};
result.insert(key.clone(), resolved);
}
Ok(Value::Object(result))
}
fn check_nesting_depth(&self, current_depth: u32) -> Result<(), RunError> {
if current_depth >= self.max_depth {
return Err(RunError::PatternError {
pattern: "delegate".into(),
step: "depth_check".into(),
message: format!(
"Nesting depth {} exceeds maximum allowed {}",
current_depth, self.max_depth
),
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn execute_swarm_delegate(
&self,
ctx: &PatternContext,
runtime: &dyn crate::RuntimeAdapter,
_cancel: &crate::CancellationToken,
swarm_path: &std::path::PathBuf,
input_mapping: &HashMap<String, Value>,
output_mapping: &HashMap<String, Value>,
failure_inherit: bool,
) -> Result<ExecutionResult, RunError> {
let _ = failure_inherit;
let current_depth = ctx
.state
.custom
.get("nesting_depth")
.and_then(|d| d.parse::<u32>().ok())
.unwrap_or(0);
self.check_nesting_depth(current_depth)?;
let delegate_swarm = self.load_delegate_swarm(swarm_path, ctx)?;
let nested_scope = self.build_nested_scope(&ctx.scope, input_mapping)?;
let delegate_ctx = crate::ExecutionContext::new(
format!("delegate-{}", swarm_path.display()),
runtime.kind(),
);
let mut nested_pattern_ctx = PatternContext::new(delegate_swarm, delegate_ctx);
nested_pattern_ctx.scope = nested_scope;
nested_pattern_ctx
.state
.custom
.insert("nesting_depth".to_string(), (current_depth + 1).to_string());
let run = crate::Run::new(
RunTarget::Swarm {
swarmfile_path: swarm_path.clone(),
},
runtime.kind(),
);
let handle = runtime
.execute(&nested_pattern_ctx.runtime_ctx, &run)
.await?;
let result = runtime.wait(&handle).await?;
let mut final_scope = ctx.scope.clone();
if !output_mapping.is_empty() {
if let Some(ref output) = result.output {
let mapped_output =
self.apply_output_mapping(output, &ctx.scope, output_mapping)?;
final_scope.add_step_output("delegate".to_string(), mapped_output);
}
} else if let Some(output) = &result.output {
final_scope.add_step_output("delegate".to_string(), output.clone());
}
let exec_result = ExecutionResult {
run_id: RunId::new(),
status: result.status,
artifacts: result.artifacts,
error: result.error,
metrics: result.metrics,
output: None,
};
Ok(build_capability_output(
exec_result,
&ctx.swarm,
&final_scope,
))
}
#[allow(clippy::too_many_arguments)]
async fn execute_worker_selection(
&self,
ctx: &PatternContext,
runtime: &dyn crate::RuntimeAdapter,
cancel: &crate::CancellationToken,
worker_expr: &str,
fallback: Option<&str>,
input_mapping: &HashMap<String, Value>,
output_mapping: &HashMap<String, Value>,
failure_inherit: bool,
) -> Result<ExecutionResult, RunError> {
let _ = failure_inherit;
let resolved_name = self.resolve_worker_expr(worker_expr, &ctx.scope)?;
let worker = self.find_worker(&resolved_name, &ctx.swarm);
let (selected_worker, selection_trace) = match worker {
Some(w) => {
let trace = self.build_selection_trace(worker_expr, &resolved_name, true, None);
(w, trace)
}
None => {
match fallback {
Some(fallback_name) => {
match self.find_worker(fallback_name, &ctx.swarm) {
Some(fw) => {
let trace = self.build_selection_trace(
worker_expr,
&resolved_name,
false,
Some(fallback_name),
);
(fw, trace)
}
None => {
let trace = self.build_selection_trace(
worker_expr,
&resolved_name,
false,
Some(fallback_name),
);
return Err(RunError::PatternError {
pattern: "delegate".into(),
step: "worker_selection".into(),
message: format!(
"Worker '{}' not found and fallback '{}' also not found. Trace: {}",
resolved_name, fallback_name, trace
),
});
}
}
}
None => {
let trace =
self.build_selection_trace(worker_expr, &resolved_name, false, None);
return Err(RunError::PatternError {
pattern: "delegate".into(),
step: "worker_selection".into(),
message: format!(
"Worker '{}' not found and no fallback provided. Trace: {}",
resolved_name, trace
),
});
}
}
}
};
let worker_scope = if !input_mapping.is_empty() {
self.build_nested_scope(&ctx.scope, input_mapping)?
} else {
ctx.scope.clone()
};
let result = execute_worker(
selected_worker,
runtime,
&ctx.runtime_ctx,
&worker_scope,
cancel,
)
.await?;
let mut final_scope = ctx.scope.clone();
if !output_mapping.is_empty() {
if let Some(ref output) = result.output {
let mapped_output =
self.apply_output_mapping(output, &ctx.scope, output_mapping)?;
final_scope.add_step_output(selected_worker.name.clone(), mapped_output);
}
} else if let Some(output) = &result.output {
final_scope.add_step_output(selected_worker.name.clone(), output.clone());
}
let metrics = ExecutionMetrics {
wall_time_ms: result.metrics.wall_time_ms,
cpu_time_ms: result.metrics.cpu_time_ms,
peak_memory_bytes: result.metrics.peak_memory_bytes,
retries: result.metrics.retries,
selection_trace: Some(selection_trace),
};
let exec_result = ExecutionResult {
run_id: RunId::new(),
status: result.status,
artifacts: result.artifacts,
error: result.error,
metrics,
output: None,
};
let final_result = build_capability_output(exec_result, &ctx.swarm, &final_scope);
Ok(final_result)
}
}
#[async_trait]
impl PatternExecutor for DelegateExecutor {
fn name(&self) -> &'static str {
"delegate"
}
async fn execute(
&self,
ctx: &PatternContext,
runtime: &dyn crate::RuntimeAdapter,
cancel: &crate::CancellationToken,
) -> Result<ExecutionResult, RunError> {
let (swarm_path, worker_expr, fallback, input_mapping, output_mapping, failure_inherit) =
match &ctx.swarm.flow {
FlowPattern::Delegate {
swarm,
worker_expr,
fallback,
input_mapping,
output_mapping,
failure_inherit,
} => (
swarm.clone(),
worker_expr.clone(),
fallback.clone(),
input_mapping.clone(),
output_mapping.clone(),
*failure_inherit,
),
_ => {
return Err(RunError::PatternError {
pattern: "delegate".into(),
step: "flow".into(),
message: "DelegateExecutor requires Delegate pattern in flow".into(),
})
}
};
if cancel.is_cancelled().await {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts: vec![],
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics: ExecutionMetrics::default(),
output: None,
});
}
if let Some(path) = swarm_path {
self.execute_swarm_delegate(
ctx,
runtime,
cancel,
&path,
&input_mapping,
&output_mapping,
failure_inherit,
)
.await
} else if let Some(expr) = worker_expr {
self.execute_worker_selection(
ctx,
runtime,
cancel,
&expr,
fallback.as_deref(),
&input_mapping,
&output_mapping,
failure_inherit,
)
.await
} else {
Err(RunError::PatternError {
pattern: "delegate".into(),
step: "config".into(),
message:
"Delegate pattern requires either 'swarm' path or 'worker_expr' expression"
.into(),
})
}
}
async fn on_failure(
&self,
ctx: &mut PatternContext,
_runtime: &dyn crate::RuntimeAdapter,
_failed_worker: &str,
_error: &RunError,
) -> Result<bool, RunError> {
let failure_inherit = match &ctx.swarm.flow {
FlowPattern::Delegate {
failure_inherit, ..
} => *failure_inherit,
_ => true,
};
if failure_inherit {
Ok(false)
} else {
ctx.state
.custom
.insert("delegate_failed".to_string(), "true".to_string());
Ok(true)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{CancellationToken, ExecutionContext, FlowPattern, RuntimeKind, SwarmFile};
use serde_json::json;
#[test]
fn test_delegate_executor_name() {
let executor = DelegateExecutor::new();
assert_eq!(executor.name(), "delegate");
}
#[test]
fn test_delegate_executor_with_custom_max_depth() {
let executor = DelegateExecutor::with_max_depth(5);
assert_eq!(executor.max_depth, 5);
}
#[test]
fn test_build_nested_scope_basic() {
let executor = DelegateExecutor::new();
let parent_scope = Scope::with_input(json!({ "query": "test", "limit": 10 }));
let input_mapping = HashMap::from([(
"search".to_string(),
Value::String("{{input.query}}".to_string()),
)]);
let nested_scope = executor
.build_nested_scope(&parent_scope, &input_mapping)
.unwrap();
assert_eq!(nested_scope.input["search"], "test");
assert!(nested_scope.input.get("query").is_none());
}
#[test]
fn test_build_nested_scope_with_step_output() {
let executor = DelegateExecutor::new();
let mut parent_scope = Scope::with_input(json!({ "user_id": "123" }));
parent_scope.add_step_output("fetcher".to_string(), json!({ "data": "fetched_value" }));
let input_mapping = HashMap::from([(
"data".to_string(),
Value::String("{{steps.fetcher.output.data}}".to_string()),
)]);
let nested_scope = executor
.build_nested_scope(&parent_scope, &input_mapping)
.unwrap();
assert_eq!(nested_scope.input["data"], "fetched_value");
assert!(nested_scope.steps.contains_key("fetcher"));
}
#[test]
fn test_apply_output_mapping_basic() {
let executor = DelegateExecutor::new();
let parent_scope = Scope::empty();
let nested_output = json!({ "results": "value1", "total": 42 });
let output_mapping = HashMap::from([
(
"items".to_string(),
Value::String("{{steps.nested.output.results}}".to_string()),
),
(
"count".to_string(),
Value::String("{{steps.nested.output.total}}".to_string()),
),
]);
let mapped = executor
.apply_output_mapping(&nested_output, &parent_scope, &output_mapping)
.unwrap();
assert_eq!(mapped["items"], "value1");
assert_eq!(mapped["count"], 42);
}
#[test]
fn test_apply_output_mapping_with_object() {
let executor = DelegateExecutor::new();
let parent_scope = Scope::empty();
let nested_output = json!({ "data": { "name": "test", "value": 123 } });
let output_mapping = HashMap::from([(
"result".to_string(),
Value::String("{{steps.nested.output.data.name}}".to_string()),
)]);
let mapped = executor
.apply_output_mapping(&nested_output, &parent_scope, &output_mapping)
.unwrap();
assert_eq!(mapped["result"], "test");
}
#[test]
fn test_check_nesting_depth_within_limit() {
let executor = DelegateExecutor::with_max_depth(5);
assert!(executor.check_nesting_depth(0).is_ok());
assert!(executor.check_nesting_depth(4).is_ok());
}
#[test]
fn test_check_nesting_depth_exceeds_limit() {
let executor = DelegateExecutor::with_max_depth(5);
assert!(executor.check_nesting_depth(5).is_err());
assert!(executor.check_nesting_depth(10).is_err());
}
#[tokio::test]
async fn test_delegate_executor_wrong_pattern() {
let executor = DelegateExecutor::new();
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await;
assert!(result.is_err());
}
#[test]
fn test_delegate_pattern_with_mappings() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: Some(std::path::PathBuf::from("sub.yaml")),
worker_expr: None,
fallback: None,
input_mapping: HashMap::from([(
"query".to_string(),
Value::String("{{input.search}}".to_string()),
)]),
output_mapping: HashMap::from([(
"result".to_string(),
Value::String("{{steps.nested.output.data}}".to_string()),
)]),
failure_inherit: true,
},
);
assert!(matches!(swarm.flow, FlowPattern::Delegate { .. }));
}
#[test]
fn test_delegate_pattern_failure_inherit_false() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: Some(std::path::PathBuf::from("sub.yaml")),
worker_expr: None,
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: false,
},
);
if let FlowPattern::Delegate {
failure_inherit, ..
} = &swarm.flow
{
assert!(!*failure_inherit);
}
}
#[test]
fn test_resolve_worker_expr_basic() {
let executor = DelegateExecutor::new();
let scope = Scope::with_input(json!({ "processor_type": "image_processor" }));
let result = executor
.resolve_worker_expr("{{input.processor_type}}", &scope)
.unwrap();
assert_eq!(result, "image_processor");
}
#[test]
fn test_resolve_worker_expr_nested() {
let executor = DelegateExecutor::new();
let scope = Scope::with_input(json!({ "config": { "worker": "heavy_processor" } }));
let result = executor
.resolve_worker_expr("{{input.config.worker}}", &scope)
.unwrap();
assert_eq!(result, "heavy_processor");
}
#[test]
fn test_resolve_worker_expr_empty_fails() {
let executor = DelegateExecutor::new();
let scope = Scope::with_input(json!({ "processor_type": "" }));
let result = executor.resolve_worker_expr("{{input.processor_type}}", &scope);
assert!(result.is_err());
}
#[test]
fn test_resolve_worker_expr_missing_fails() {
let executor = DelegateExecutor::new();
let scope = Scope::empty();
let result = executor.resolve_worker_expr("{{input.nonexistent}}", &scope);
assert!(result.is_err());
}
#[test]
fn test_find_worker_exists() {
let executor = DelegateExecutor::new();
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_worker(Worker::new("processor_a", "agent.yaml"))
.with_worker(Worker::new("processor_b", "agent.yaml"));
let worker = executor.find_worker("processor_a", &swarm);
assert!(worker.is_some());
assert_eq!(worker.unwrap().name, "processor_a");
}
#[test]
fn test_find_worker_not_exists() {
let executor = DelegateExecutor::new();
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_worker(Worker::new("processor_a", "agent.yaml"));
let worker = executor.find_worker("nonexistent", &swarm);
assert!(worker.is_none());
}
#[test]
fn test_build_selection_trace_selected() {
let executor = DelegateExecutor::new();
let trace = executor.build_selection_trace("{{input.type}}", "processor_a", true, None);
assert!(trace.contains("expr={{input.type}}"));
assert!(trace.contains("resolved=processor_a"));
assert!(trace.contains("status=selected"));
assert!(!trace.contains("fallback"));
}
#[test]
fn test_build_selection_trace_fallback() {
let executor = DelegateExecutor::new();
let trace = executor.build_selection_trace(
"{{input.type}}",
"unknown",
false,
Some("default_worker"),
);
assert!(trace.contains("status=fallback"));
assert!(trace.contains("fallback=default_worker"));
}
#[test]
fn test_build_selection_trace_failed() {
let executor = DelegateExecutor::new();
let trace = executor.build_selection_trace("{{input.type}}", "unknown", false, None);
assert!(trace.contains("status=failed"));
}
#[test]
fn test_delegate_pattern_with_worker_expr() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.processor_type}}".to_string()),
fallback: Some("default_worker".to_string()),
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
);
if let FlowPattern::Delegate {
worker_expr,
fallback,
..
} = &swarm.flow
{
assert_eq!(worker_expr, &Some("{{input.processor_type}}".to_string()));
assert_eq!(fallback, &Some("default_worker".to_string()));
} else {
panic!("Expected Delegate pattern");
}
}
#[test]
fn test_delegate_pattern_swarm_takes_precedence() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: Some(std::path::PathBuf::from("sub.yaml")),
worker_expr: Some("{{input.worker}}".to_string()),
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
);
assert!(matches!(swarm.flow, FlowPattern::Delegate { .. }));
}
#[tokio::test]
async fn test_delegate_no_swarm_no_worker_expr_fails() {
let executor = DelegateExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: None,
worker_expr: None,
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("requires either 'swarm' path or 'worker_expr'"));
}
}