use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::AgentSpec;
use car_engine::ToolExecutor;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::instrument;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegationRecord {
pub specialist: String,
pub subtask: String,
pub result: String,
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegatorResult {
pub task: String,
pub final_answer: String,
pub delegations: Vec<DelegationRecord>,
}
pub struct Delegator {
pub main: AgentSpec,
pub specialists: HashMap<String, AgentSpec>,
}
impl Delegator {
pub fn new(main: AgentSpec, specialists: HashMap<String, AgentSpec>) -> Self {
Self { main, specialists }
}
#[instrument(name = "multi.delegator", skip_all)]
pub async fn run(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<DelegatorResult, MultiError> {
let delegations = Arc::new(Mutex::new(Vec::<DelegationRecord>::new()));
let rt = infra.make_runtime();
rt.register_tool("delegate").await;
for tool in &self.main.tools {
rt.register_tool(tool).await;
}
let executor = Arc::new(DelegatingExecutor {
specialists: self.specialists.clone(),
runner: Arc::clone(runner),
infra_state: Arc::clone(&infra.state),
infra_log: Arc::clone(&infra.log),
infra_policies: Arc::clone(&infra.policies),
delegations: Arc::clone(&delegations),
});
rt.set_executor(executor).await;
let mailbox = Mailbox::default();
let output = runner
.run(&self.main, task, &rt, &mailbox)
.await
.map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
let delegations = delegations.lock().await.clone();
Ok(DelegatorResult {
task: task.to_string(),
final_answer: output.answer,
delegations,
})
}
}
struct DelegatingExecutor {
specialists: HashMap<String, AgentSpec>,
runner: Arc<dyn AgentRunner>,
infra_state: Arc<car_state::StateStore>,
infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
delegations: Arc<Mutex<Vec<DelegationRecord>>>,
}
#[async_trait::async_trait]
impl ToolExecutor for DelegatingExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
if tool != "delegate" {
return Err(format!("unknown tool: {}", tool));
}
let specialist_name = params
.get("specialist")
.and_then(|v| v.as_str())
.ok_or("delegate requires 'specialist' parameter")?;
let subtask = params
.get("subtask")
.and_then(|v| v.as_str())
.ok_or("delegate requires 'subtask' parameter")?;
let spec = match self.specialists.get(specialist_name) {
Some(s) => s.clone(),
None => {
let available: Vec<&str> = self.specialists.keys().map(|s| s.as_str()).collect();
let msg = format!(
"Unknown specialist '{}'. Available: {}",
specialist_name,
available.join(", ")
);
self.delegations.lock().await.push(DelegationRecord {
specialist: specialist_name.to_string(),
subtask: subtask.to_string(),
result: msg.clone(),
success: false,
});
return Ok(Value::String(msg));
}
};
let infra = SharedInfra {
state: Arc::clone(&self.infra_state),
log: Arc::clone(&self.infra_log),
policies: Arc::clone(&self.infra_policies),
};
let rt = infra.make_runtime();
for tool_name in &spec.tools {
rt.register_tool(tool_name).await;
}
let mailbox = Mailbox::default();
let result = self.runner.run(&spec, subtask, &rt, &mailbox).await;
match result {
Ok(output) => {
self.delegations.lock().await.push(DelegationRecord {
specialist: specialist_name.to_string(),
subtask: subtask.to_string(),
result: output.answer.clone(),
success: true,
});
Ok(Value::String(output.answer))
}
Err(e) => {
let msg = format!("specialist '{}' failed: {}", specialist_name, e);
self.delegations.lock().await.push(DelegationRecord {
specialist: specialist_name.to_string(),
subtask: subtask.to_string(),
result: msg.clone(),
success: false,
});
Ok(Value::String(msg))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{AgentOutput, AgentSpec};
use car_engine::Runtime;
struct SimpleRunner;
#[async_trait::async_trait]
impl crate::runner::AgentRunner for SimpleRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("{} handled: {}", spec.name, &task[..task.len().min(50)]),
turns: 1,
tool_calls: 0,
duration_ms: 5.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn test_delegator_basic() {
let main = AgentSpec::new("lead", "You manage the project");
let mut specialists = HashMap::new();
specialists.insert(
"researcher".to_string(),
AgentSpec::new("researcher", "Find information"),
);
let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(SimpleRunner);
let infra = SharedInfra::new();
let result = Delegator::new(main, specialists)
.run("build a CLI tool", &runner, &infra)
.await
.unwrap();
assert!(!result.final_answer.is_empty());
assert!(result.final_answer.contains("lead"));
}
}