1use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::AgentSpec;
18use car_engine::ToolExecutor;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21use std::collections::HashMap;
22use std::sync::Arc;
23use tokio::sync::Mutex;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct DelegationRecord {
27 pub specialist: String,
28 pub subtask: String,
29 pub result: String,
30 pub success: bool,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DelegatorResult {
35 pub task: String,
36 pub final_answer: String,
37 pub delegations: Vec<DelegationRecord>,
38}
39
40pub struct Delegator {
41 pub main: AgentSpec,
42 pub specialists: HashMap<String, AgentSpec>,
43}
44
45impl Delegator {
46 pub fn new(main: AgentSpec, specialists: HashMap<String, AgentSpec>) -> Self {
47 Self { main, specialists }
48 }
49
50 pub async fn run(
51 &self,
52 task: &str,
53 runner: &Arc<dyn AgentRunner>,
54 infra: &SharedInfra,
55 ) -> Result<DelegatorResult, MultiError> {
56 let delegations = Arc::new(Mutex::new(Vec::<DelegationRecord>::new()));
57
58 let rt = infra.make_runtime();
60
61 rt.register_tool("delegate").await;
63 for tool in &self.main.tools {
64 rt.register_tool(tool).await;
65 }
66
67 let executor = Arc::new(DelegatingExecutor {
69 specialists: self.specialists.clone(),
70 runner: Arc::clone(runner),
71 infra_state: Arc::clone(&infra.state),
72 infra_log: Arc::clone(&infra.log),
73 infra_policies: Arc::clone(&infra.policies),
74 delegations: Arc::clone(&delegations),
75 });
76 rt.set_executor(executor).await;
77
78 let mailbox = Mailbox::default();
80 let output = runner
81 .run(&self.main, task, &rt, &mailbox)
82 .await
83 .map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
84
85 let delegations = delegations.lock().await.clone();
86
87 Ok(DelegatorResult {
88 task: task.to_string(),
89 final_answer: output.answer,
90 delegations,
91 })
92 }
93}
94
95struct DelegatingExecutor {
97 specialists: HashMap<String, AgentSpec>,
98 runner: Arc<dyn AgentRunner>,
99 infra_state: Arc<car_state::StateStore>,
100 infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
101 infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
102 delegations: Arc<Mutex<Vec<DelegationRecord>>>,
103}
104
105#[async_trait::async_trait]
106impl ToolExecutor for DelegatingExecutor {
107 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
108 if tool != "delegate" {
109 return Err(format!("unknown tool: {}", tool));
110 }
111
112 let specialist_name = params
113 .get("specialist")
114 .and_then(|v| v.as_str())
115 .ok_or("delegate requires 'specialist' parameter")?;
116
117 let subtask = params
118 .get("subtask")
119 .and_then(|v| v.as_str())
120 .ok_or("delegate requires 'subtask' parameter")?;
121
122 let spec = match self.specialists.get(specialist_name) {
123 Some(s) => s.clone(),
124 None => {
125 let available: Vec<&str> = self.specialists.keys().map(|s| s.as_str()).collect();
126 let msg = format!(
127 "Unknown specialist '{}'. Available: {}",
128 specialist_name,
129 available.join(", ")
130 );
131 self.delegations.lock().await.push(DelegationRecord {
132 specialist: specialist_name.to_string(),
133 subtask: subtask.to_string(),
134 result: msg.clone(),
135 success: false,
136 });
137 return Ok(Value::String(msg));
138 }
139 };
140
141 let infra = SharedInfra {
143 state: Arc::clone(&self.infra_state),
144 log: Arc::clone(&self.infra_log),
145 policies: Arc::clone(&self.infra_policies),
146 };
147 let rt = infra.make_runtime();
148 for tool_name in &spec.tools {
149 rt.register_tool(tool_name).await;
150 }
151
152 let mailbox = Mailbox::default();
153 let result = self.runner.run(&spec, subtask, &rt, &mailbox).await;
154
155 match result {
156 Ok(output) => {
157 self.delegations.lock().await.push(DelegationRecord {
158 specialist: specialist_name.to_string(),
159 subtask: subtask.to_string(),
160 result: output.answer.clone(),
161 success: true,
162 });
163 Ok(Value::String(output.answer))
164 }
165 Err(e) => {
166 let msg = format!("specialist '{}' failed: {}", specialist_name, e);
167 self.delegations.lock().await.push(DelegationRecord {
168 specialist: specialist_name.to_string(),
169 subtask: subtask.to_string(),
170 result: msg.clone(),
171 success: false,
172 });
173 Ok(Value::String(msg))
174 }
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::types::{AgentOutput, AgentSpec};
183 use car_engine::Runtime;
184
185 struct SimpleRunner;
186
187 #[async_trait::async_trait]
188 impl crate::runner::AgentRunner for SimpleRunner {
189 async fn run(
190 &self,
191 spec: &AgentSpec,
192 task: &str,
193 _runtime: &Runtime,
194 _mailbox: &Mailbox,
195 ) -> Result<AgentOutput, MultiError> {
196 Ok(AgentOutput {
197 name: spec.name.clone(),
198 answer: format!("{} handled: {}", spec.name, &task[..task.len().min(50)]),
199 turns: 1,
200 tool_calls: 0,
201 duration_ms: 5.0,
202 error: None,
203 })
204 }
205 }
206
207 #[tokio::test]
208 async fn test_delegator_basic() {
209 let main = AgentSpec::new("lead", "You manage the project");
210 let mut specialists = HashMap::new();
211 specialists.insert(
212 "researcher".to_string(),
213 AgentSpec::new("researcher", "Find information"),
214 );
215
216 let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(SimpleRunner);
217 let infra = SharedInfra::new();
218
219 let result = Delegator::new(main, specialists)
220 .run("build a CLI tool", &runner, &infra)
221 .await
222 .unwrap();
223
224 assert!(!result.final_answer.is_empty());
225 assert!(result.final_answer.contains("lead"));
226 }
227}