Skip to main content

car_multi/patterns/
delegator.rs

1//! Delegator — main agent spawns specialist sub-agents mid-run via a tool.
2//!
3//! The delegator's ToolExecutor intercepts `delegate` tool calls and spawns
4//! specialist agents. This maps naturally to the callback-based runtime.
5//!
6//! ```text
7//! main agent calls delegate("researcher", "find data about X")
8//!   → DelegatingExecutor intercepts
9//!   → spawns researcher agent
10//!   → returns researcher's answer as tool result
11//! ```
12
13use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::AgentSpec;
18use car_engine::ToolExecutor;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21use std::collections::HashMap;
22use std::sync::Arc;
23use tokio::sync::Mutex;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct DelegationRecord {
27    pub specialist: String,
28    pub subtask: String,
29    pub result: String,
30    pub success: bool,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DelegatorResult {
35    pub task: String,
36    pub final_answer: String,
37    pub delegations: Vec<DelegationRecord>,
38}
39
40pub struct Delegator {
41    pub main: AgentSpec,
42    pub specialists: HashMap<String, AgentSpec>,
43}
44
45impl Delegator {
46    pub fn new(main: AgentSpec, specialists: HashMap<String, AgentSpec>) -> Self {
47        Self { main, specialists }
48    }
49
50    pub async fn run(
51        &self,
52        task: &str,
53        runner: &Arc<dyn AgentRunner>,
54        infra: &SharedInfra,
55    ) -> Result<DelegatorResult, MultiError> {
56        let delegations = Arc::new(Mutex::new(Vec::<DelegationRecord>::new()));
57
58        // Create a runtime for the main agent with shared infra
59        let rt = infra.make_runtime();
60
61        // Register the delegate tool + main agent's tools
62        rt.register_tool("delegate").await;
63        for tool in &self.main.tools {
64            rt.register_tool(tool).await;
65        }
66
67        // Set up the delegating executor
68        let executor = Arc::new(DelegatingExecutor {
69            specialists: self.specialists.clone(),
70            runner: Arc::clone(runner),
71            infra_state: Arc::clone(&infra.state),
72            infra_log: Arc::clone(&infra.log),
73            infra_policies: Arc::clone(&infra.policies),
74            delegations: Arc::clone(&delegations),
75        });
76        rt.set_executor(executor).await;
77
78        // Run the main agent
79        let mailbox = Mailbox::default();
80        let output = runner
81            .run(&self.main, task, &rt, &mailbox)
82            .await
83            .map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;
84
85        let delegations = delegations.lock().await.clone();
86
87        Ok(DelegatorResult {
88            task: task.to_string(),
89            final_answer: output.answer,
90            delegations,
91        })
92    }
93}
94
95/// A ToolExecutor that intercepts `delegate` calls and spawns specialist agents.
96struct DelegatingExecutor {
97    specialists: HashMap<String, AgentSpec>,
98    runner: Arc<dyn AgentRunner>,
99    infra_state: Arc<car_state::StateStore>,
100    infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
101    infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
102    delegations: Arc<Mutex<Vec<DelegationRecord>>>,
103}
104
105#[async_trait::async_trait]
106impl ToolExecutor for DelegatingExecutor {
107    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
108        if tool != "delegate" {
109            return Err(format!("unknown tool: {}", tool));
110        }
111
112        let specialist_name = params
113            .get("specialist")
114            .and_then(|v| v.as_str())
115            .ok_or("delegate requires 'specialist' parameter")?;
116
117        let subtask = params
118            .get("subtask")
119            .and_then(|v| v.as_str())
120            .ok_or("delegate requires 'subtask' parameter")?;
121
122        let spec = match self.specialists.get(specialist_name) {
123            Some(s) => s.clone(),
124            None => {
125                let available: Vec<&str> = self.specialists.keys().map(|s| s.as_str()).collect();
126                let msg = format!(
127                    "Unknown specialist '{}'. Available: {}",
128                    specialist_name,
129                    available.join(", ")
130                );
131                self.delegations.lock().await.push(DelegationRecord {
132                    specialist: specialist_name.to_string(),
133                    subtask: subtask.to_string(),
134                    result: msg.clone(),
135                    success: false,
136                });
137                return Ok(Value::String(msg));
138            }
139        };
140
141        // Create a runtime with shared infra for the specialist
142        let infra = SharedInfra {
143            state: Arc::clone(&self.infra_state),
144            log: Arc::clone(&self.infra_log),
145            policies: Arc::clone(&self.infra_policies),
146        };
147        let rt = infra.make_runtime();
148        for tool_name in &spec.tools {
149            rt.register_tool(tool_name).await;
150        }
151
152        let mailbox = Mailbox::default();
153        let result = self.runner.run(&spec, subtask, &rt, &mailbox).await;
154
155        match result {
156            Ok(output) => {
157                self.delegations.lock().await.push(DelegationRecord {
158                    specialist: specialist_name.to_string(),
159                    subtask: subtask.to_string(),
160                    result: output.answer.clone(),
161                    success: true,
162                });
163                Ok(Value::String(output.answer))
164            }
165            Err(e) => {
166                let msg = format!("specialist '{}' failed: {}", specialist_name, e);
167                self.delegations.lock().await.push(DelegationRecord {
168                    specialist: specialist_name.to_string(),
169                    subtask: subtask.to_string(),
170                    result: msg.clone(),
171                    success: false,
172                });
173                Ok(Value::String(msg))
174            }
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::types::{AgentOutput, AgentSpec};
183    use car_engine::Runtime;
184
185    struct SimpleRunner;
186
187    #[async_trait::async_trait]
188    impl crate::runner::AgentRunner for SimpleRunner {
189        async fn run(
190            &self,
191            spec: &AgentSpec,
192            task: &str,
193            _runtime: &Runtime,
194            _mailbox: &Mailbox,
195        ) -> Result<AgentOutput, MultiError> {
196            Ok(AgentOutput {
197                name: spec.name.clone(),
198                answer: format!("{} handled: {}", spec.name, &task[..task.len().min(50)]),
199                turns: 1,
200                tool_calls: 0,
201                duration_ms: 5.0,
202                error: None,
203            })
204        }
205    }
206
207    #[tokio::test]
208    async fn test_delegator_basic() {
209        let main = AgentSpec::new("lead", "You manage the project");
210        let mut specialists = HashMap::new();
211        specialists.insert(
212            "researcher".to_string(),
213            AgentSpec::new("researcher", "Find information"),
214        );
215
216        let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(SimpleRunner);
217        let infra = SharedInfra::new();
218
219        let result = Delegator::new(main, specialists)
220            .run("build a CLI tool", &runner, &infra)
221            .await
222            .unwrap();
223
224        assert!(!result.final_answer.is_empty());
225        assert!(result.final_answer.contains("lead"));
226    }
227}