car-multi 0.9.0

Multi-agent coordination patterns for Common Agent Runtime
Documentation
//! Fleet — independent agents with shared knowledge graph.
//!
//! Unlike other patterns:
//! - **Not Swarm**: agents work on *different* problems, not the same one
//! - **Not Pipeline**: no ordering between agents
//! - **Not Supervisor**: no review loop
//!
//! Agents run fully independently but share a `car-memgine` knowledge graph.
//! Each agent can publish facts that others discover via spreading activation.
//! This maps to Hydra's "heads" concept: parallel missions with explicit
//! knowledge sharing.

use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tracing::instrument;

/// Result from a Fleet execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FleetResult {
    /// Outputs from all agents.
    pub outputs: Vec<AgentOutput>,
    /// Total wall-clock duration.
    pub duration_ms: f64,
    /// Number of agents that succeeded.
    pub succeeded: usize,
    /// Number of agents that failed.
    pub failed: usize,
}

/// Fleet orchestrator — runs independent agents concurrently with shared knowledge.
///
/// Each agent gets its own Runtime (via SharedInfra) but they share state,
/// event log, and policies. Agents communicate through the shared state
/// (and optionally via Mailbox for real-time messaging).
///
/// # Example
/// ```ignore
/// let fleet = Fleet::new(vec![
///     AgentSpec::new("researcher", "Research competitor pricing"),
///     AgentSpec::new("analyst", "Analyze market trends"),
/// ]);
/// let result = fleet.run(&runner, &infra).await?;
/// ```
pub struct Fleet {
    /// Agent specifications — each runs independently.
    pub agents: Vec<AgentSpec>,
    /// Optional timeout per agent (seconds).
    pub agent_timeout_secs: Option<u64>,
}

impl Fleet {
    pub fn new(agents: Vec<AgentSpec>) -> Self {
        Self {
            agents,
            agent_timeout_secs: None,
        }
    }

    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
        self.agent_timeout_secs = Some(timeout_secs);
        self
    }

    /// Run all agents concurrently. Returns when all complete (or timeout).
    #[instrument(name = "multi.fleet", skip_all)]
    pub async fn run(
        &self,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<FleetResult, MultiError> {
        let start = Instant::now();
        let mailbox = Arc::new(Mailbox::default());

        // Spawn all agents concurrently
        let mut handles = Vec::new();
        for spec in &self.agents {
            let runner = Arc::clone(runner);
            let rt = infra.make_runtime();
            let spec = spec.clone();
            let mailbox = Arc::clone(&mailbox);
            let timeout = self.agent_timeout_secs;

            // Register with mailbox for optional inter-agent messaging
            let _rx = mailbox.register(&spec.name).await;

            let handle = tokio::spawn(async move {
                let task = spec
                    .metadata
                    .get("task")
                    .and_then(|v| v.as_str())
                    .unwrap_or("")
                    .to_string();

                let result = if let Some(secs) = timeout {
                    match tokio::time::timeout(
                        tokio::time::Duration::from_secs(secs),
                        runner.run(&spec, &task, &rt, &mailbox),
                    )
                    .await
                    {
                        Ok(result) => result,
                        Err(_) => Ok(AgentOutput {
                            name: spec.name.clone(),
                            answer: format!("Timed out after {}s", secs),
                            turns: 0,
                            tool_calls: 0,
                            duration_ms: (secs * 1000) as f64,
                            error: Some("timeout".to_string()),
                            outcome: None,
                            tokens: None,
                        }),
                    }
                } else {
                    runner.run(&spec, &task, &rt, &mailbox).await
                };

                // Unregister from mailbox
                mailbox.unregister(&spec.name).await;

                result
            });

            handles.push(handle);
        }

        // Collect results
        let mut outputs = Vec::new();
        let mut succeeded = 0;
        let mut failed = 0;

        for handle in handles {
            match handle.await {
                Ok(Ok(output)) => {
                    if output.succeeded() {
                        succeeded += 1;
                    } else {
                        failed += 1;
                    }
                    outputs.push(output);
                }
                Ok(Err(e)) => {
                    failed += 1;
                    outputs.push(AgentOutput {
                        name: "unknown".to_string(),
                        answer: String::new(),
                        turns: 0,
                        tool_calls: 0,
                        duration_ms: 0.0,
                        error: Some(e.to_string()),
                        outcome: None,
                        tokens: None,
                    });
                }
                Err(e) => {
                    failed += 1;
                    outputs.push(AgentOutput {
                        name: "unknown".to_string(),
                        answer: String::new(),
                        turns: 0,
                        tool_calls: 0,
                        duration_ms: 0.0,
                        error: Some(format!("Task join error: {}", e)),
                        outcome: None,
                        tokens: None,
                    });
                }
            }
        }

        Ok(FleetResult {
            outputs,
            duration_ms: start.elapsed().as_millis() as f64,
            succeeded,
            failed,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::AgentSpec;

    #[test]
    fn test_fleet_construction() {
        let fleet = Fleet::new(vec![
            AgentSpec::new("a", "Agent A system prompt"),
            AgentSpec::new("b", "Agent B system prompt"),
        ])
        .with_timeout(60);

        assert_eq!(fleet.agents.len(), 2);
        assert_eq!(fleet.agent_timeout_secs, Some(60));
    }
}