car-multi 0.9.0

Multi-agent coordination patterns for Common Agent Runtime
Documentation
//! Delegator — main agent spawns specialist sub-agents mid-run via a tool.
//!
//! The delegator's ToolExecutor intercepts `delegate` tool calls and spawns
//! specialist agents. This maps naturally to the callback-based runtime.
//!
//! ```text
//! main agent calls delegate("researcher", "find data about X")
//!   → DelegatingExecutor intercepts
//!   → spawns researcher agent
//!   → returns researcher's answer as tool result
//! ```

use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::AgentSpec;
use car_engine::ToolExecutor;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::instrument;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegationRecord {
    pub specialist: String,
    pub subtask: String,
    pub result: String,
    pub success: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegatorResult {
    pub task: String,
    pub final_answer: String,
    pub delegations: Vec<DelegationRecord>,
}

pub struct Delegator {
    pub main: AgentSpec,
    pub specialists: HashMap<String, AgentSpec>,
}

impl Delegator {
    pub fn new(main: AgentSpec, specialists: HashMap<String, AgentSpec>) -> Self {
        Self { main, specialists }
    }

    #[instrument(name = "multi.delegator", skip_all)]
    pub async fn run(
        &self,
        task: &str,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<DelegatorResult, MultiError> {
        let delegations = Arc::new(Mutex::new(Vec::<DelegationRecord>::new()));

        // Create a runtime for the main agent with shared infra
        let rt = infra.make_runtime();

        // Register the delegate tool + main agent's tools
        rt.register_tool("delegate").await;
        for tool in &self.main.tools {
            rt.register_tool(tool).await;
        }

        // Set up the delegating executor
        let executor = Arc::new(DelegatingExecutor {
            specialists: self.specialists.clone(),
            runner: Arc::clone(runner),
            infra_state: Arc::clone(&infra.state),
            infra_log: Arc::clone(&infra.log),
            infra_policies: Arc::clone(&infra.policies),
            delegations: Arc::clone(&delegations),
        });
        rt.set_executor(executor).await;

        // Run the main agent
        let mailbox = Mailbox::default();
        let output = runner
            .run(&self.main, task, &rt, &mailbox)
            .await
            .map_err(|e| MultiError::AgentFailed(self.main.name.clone(), e.to_string()))?;

        let delegations = delegations.lock().await.clone();

        Ok(DelegatorResult {
            task: task.to_string(),
            final_answer: output.answer,
            delegations,
        })
    }
}

/// A ToolExecutor that intercepts `delegate` calls and spawns specialist agents.
struct DelegatingExecutor {
    specialists: HashMap<String, AgentSpec>,
    runner: Arc<dyn AgentRunner>,
    infra_state: Arc<car_state::StateStore>,
    infra_log: Arc<tokio::sync::Mutex<car_eventlog::EventLog>>,
    infra_policies: Arc<tokio::sync::RwLock<car_policy::PolicyEngine>>,
    delegations: Arc<Mutex<Vec<DelegationRecord>>>,
}

#[async_trait::async_trait]
impl ToolExecutor for DelegatingExecutor {
    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
        if tool != "delegate" {
            return Err(format!("unknown tool: {}", tool));
        }

        let specialist_name = params
            .get("specialist")
            .and_then(|v| v.as_str())
            .ok_or("delegate requires 'specialist' parameter")?;

        let subtask = params
            .get("subtask")
            .and_then(|v| v.as_str())
            .ok_or("delegate requires 'subtask' parameter")?;

        let spec = match self.specialists.get(specialist_name) {
            Some(s) => s.clone(),
            None => {
                let available: Vec<&str> = self.specialists.keys().map(|s| s.as_str()).collect();
                let msg = format!(
                    "Unknown specialist '{}'. Available: {}",
                    specialist_name,
                    available.join(", ")
                );
                self.delegations.lock().await.push(DelegationRecord {
                    specialist: specialist_name.to_string(),
                    subtask: subtask.to_string(),
                    result: msg.clone(),
                    success: false,
                });
                return Ok(Value::String(msg));
            }
        };

        // Create a runtime with shared infra for the specialist
        let infra = SharedInfra {
            state: Arc::clone(&self.infra_state),
            log: Arc::clone(&self.infra_log),
            policies: Arc::clone(&self.infra_policies),
        };
        let rt = infra.make_runtime();
        for tool_name in &spec.tools {
            rt.register_tool(tool_name).await;
        }

        let mailbox = Mailbox::default();
        let result = self.runner.run(&spec, subtask, &rt, &mailbox).await;

        match result {
            Ok(output) => {
                self.delegations.lock().await.push(DelegationRecord {
                    specialist: specialist_name.to_string(),
                    subtask: subtask.to_string(),
                    result: output.answer.clone(),
                    success: true,
                });
                Ok(Value::String(output.answer))
            }
            Err(e) => {
                let msg = format!("specialist '{}' failed: {}", specialist_name, e);
                self.delegations.lock().await.push(DelegationRecord {
                    specialist: specialist_name.to_string(),
                    subtask: subtask.to_string(),
                    result: msg.clone(),
                    success: false,
                });
                Ok(Value::String(msg))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{AgentOutput, AgentSpec};
    use car_engine::Runtime;

    struct SimpleRunner;

    #[async_trait::async_trait]
    impl crate::runner::AgentRunner for SimpleRunner {
        async fn run(
            &self,
            spec: &AgentSpec,
            task: &str,
            _runtime: &Runtime,
            _mailbox: &Mailbox,
        ) -> Result<AgentOutput, MultiError> {
            Ok(AgentOutput {
                name: spec.name.clone(),
                answer: format!("{} handled: {}", spec.name, &task[..task.len().min(50)]),
                turns: 1,
                tool_calls: 0,
                duration_ms: 5.0,
                error: None,
                outcome: None,
                tokens: None,
            })
        }
    }

    #[tokio::test]
    async fn test_delegator_basic() {
        let main = AgentSpec::new("lead", "You manage the project");
        let mut specialists = HashMap::new();
        specialists.insert(
            "researcher".to_string(),
            AgentSpec::new("researcher", "Find information"),
        );

        let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(SimpleRunner);
        let infra = SharedInfra::new();

        let result = Delegator::new(main, specialists)
            .run("build a CLI tool", &runner, &infra)
            .await
            .unwrap();

        assert!(!result.final_answer.is_empty());
        assert!(result.final_answer.contains("lead"));
    }
}