car-multi 0.32.0

Multi-agent coordination patterns for Common Agent Runtime
//! Shared infrastructure — creates Runtimes that share state, log, and policies.

use crate::budget::{BudgetError, BudgetLimits, CoordinationBudget};
use crate::concurrency::ConcurrencyControl;
use crate::types::AgentOutput;
use car_engine::Runtime;
use car_eventlog::EventLog;
use car_policy::PolicyEngine;
use car_state::StateStore;
use std::sync::Arc;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};

/// Factory for creating Runtime instances with shared state, event log, and policies.
///
/// In a multi-agent system, all agents see the same state store and write to the
/// same event log. Each agent gets its own tool set and executor.
///
/// A [`CoordinationBudget`] is always present (unbounded by default). Patterns
/// gate each spawn through [`begin_agent`](Self::begin_agent) and report spend
/// through [`record_output`](Self::record_output), so budget enforcement is a
/// uniform, always-on code path that costs nothing when no limits are set.
pub struct SharedInfra {
    pub state: Arc<StateStore>,
    pub log: Arc<TokioMutex<EventLog>>,
    pub policies: Arc<TokioRwLock<PolicyEngine>>,
    pub budget: Arc<CoordinationBudget>,
    /// Opt-in concurrency-anomaly gating for the cross-agent commit barrier
    /// (A5). `None` (the default) leaves coordination behaving exactly as
    /// before; patterns that support gating consult this only when it is set.
    pub concurrency: Option<ConcurrencyControl>,
}

impl SharedInfra {
    pub fn new() -> Self {
        Self {
            state: Arc::new(StateStore::new()),
            log: Arc::new(TokioMutex::new(EventLog::new())),
            policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
            budget: Arc::new(CoordinationBudget::unbounded()),
            concurrency: None,
        }
    }

    /// Build from EXISTING shared state/log/policies — e.g. a daemon session's
    /// `Runtime` parts (`runtime.state`, `runtime.log`, `runtime.policies`), so a
    /// coordination pattern's gate consults the operator's registered policies
    /// and its audit events land in the session's event log, instead of a fresh
    /// empty engine. Budget defaults to unbounded; chain `with_budget` to cap.
    pub fn with_shared(
        state: Arc<StateStore>,
        log: Arc<TokioMutex<EventLog>>,
        policies: Arc<TokioRwLock<PolicyEngine>>,
    ) -> Self {
        Self {
            state,
            log,
            policies,
            budget: Arc::new(CoordinationBudget::unbounded()),
            concurrency: None,
        }
    }

    /// Attach a coordination budget built from the given limits. Patterns run
    /// against this infra will refuse to start agents once a limit is crossed.
    pub fn with_budget(mut self, limits: BudgetLimits) -> Self {
        self.budget = Arc::new(CoordinationBudget::new(limits));
        self
    }

    /// Attach a pre-built (possibly shared) coordination budget.
    pub fn with_shared_budget(mut self, budget: Arc<CoordinationBudget>) -> Self {
        self.budget = budget;
        self
    }

    /// Enable concurrency-anomaly gating (A5) at the cross-agent commit barrier
    /// with the given control. The isolated parallel swarm will instrument its
    /// agents into an `AgentOp` schedule and gate the merge; a detected
    /// causal-cascade aborts the batch, a stale generation rejects the offending
    /// commit, and a write reorder is serialized deterministically.
    pub fn with_concurrency_control(mut self, control: ConcurrencyControl) -> Self {
        self.concurrency = Some(control);
        self
    }

    /// Enable concurrency gating with the default policy (abort on causal
    /// cascade, require-approval on stale generation, auto-remediate reorders).
    pub fn with_concurrency_gating(self) -> Self {
        self.with_concurrency_control(ConcurrencyControl::with_default_policy())
    }

    /// Reserve a budget slot for one agent. `Ok(())` means the agent may run;
    /// `Err` carries why it was denied. Patterns call this immediately before a
    /// spawn and record a [`budget_skipped_output`](crate::budget::budget_skipped_output)
    /// on denial.
    pub fn begin_agent(&self) -> Result<(), BudgetError> {
        self.budget.try_begin_agent()
    }

    /// Record an agent's reported token/cost spend against the budget.
    pub fn record_output(&self, out: &AgentOutput) {
        self.budget.record_output(out);
    }

    /// Record spend against the budget AND emit a per-agent `InferenceMetered`
    /// event (EPIC G / G3) so cost is attributable per agent
    /// (`EventLog::cost_by_agent`) and a run's spend is traceable to the agent
    /// that incurred it. The event stamps `agent` (+ `tools` provenance) in
    /// `data` and the token/cost/latency in the standardized metric keys, so
    /// `events.query {data_matches:{agent}}` also surfaces it. A no-op emit when
    /// the runner reported no token accounting.
    pub async fn record_output_metered(&self, out: &AgentOutput) {
        self.record_output(out);
        let Some(tokens) = &out.tokens else {
            return;
        };
        let mut data: std::collections::HashMap<String, serde_json::Value> =
            std::collections::HashMap::new();
        data.insert(
            "agent".to_string(),
            serde_json::Value::from(out.name.clone()),
        );
        if !out.tools_used.is_empty() {
            data.insert(
                "tools".to_string(),
                serde_json::Value::from(out.tools_used.join(",")),
            );
        }
        let metrics = car_eventlog::Metrics {
            duration_ms: Some(out.duration_ms),
            tokens_in: Some(tokens.input_tokens),
            tokens_out: Some(tokens.output_tokens),
            cost_usd: Some(tokens.cost_usd),
        };
        let mut log = self.log.lock().await;
        log.append_metered(
            car_eventlog::EventKind::InferenceMetered,
            None,
            None,
            data,
            metrics,
        );
    }

    /// Create a Runtime that shares this infra's state, log, and policies.
    ///
    /// Each runtime gets its own tool set, executor, and idempotency cache.
    pub fn make_runtime(&self) -> Runtime {
        Runtime::with_shared(
            Arc::clone(&self.state),
            Arc::clone(&self.log),
            Arc::clone(&self.policies),
        )
    }

    /// Create a Runtime with per-agent isolated state overlay.
    /// Writes go to a local StateStore; reads fall through to shared state.
    /// Call `AgentContext::merge_to_parent()` after the agent completes.
    pub fn make_isolated_runtime(
        &self,
        agent_name: &str,
    ) -> (Runtime, crate::task_context::AgentContext) {
        let ctx = crate::task_context::AgentContext::new(agent_name, Arc::clone(&self.state));
        let rt = Runtime::with_shared(
            Arc::clone(&ctx.local_state),
            Arc::clone(&ctx.local_log),
            Arc::clone(&self.policies),
        );
        (rt, ctx)
    }
}

impl Default for SharedInfra {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn with_shared_reuses_the_exact_provided_arcs() {
        // The operator's policy engine + audit log must be the SAME instances the
        // gate then consults — not copies — so `policy.register`'d rules apply to
        // foreman merges and gate events land in the session log.
        let state = Arc::new(StateStore::new());
        let log = Arc::new(TokioMutex::new(EventLog::new()));
        let policies = Arc::new(TokioRwLock::new(PolicyEngine::new()));
        let infra =
            SharedInfra::with_shared(Arc::clone(&state), Arc::clone(&log), Arc::clone(&policies));
        assert!(Arc::ptr_eq(&infra.state, &state));
        assert!(Arc::ptr_eq(&infra.log, &log));
        assert!(Arc::ptr_eq(&infra.policies, &policies));
    }
}