car-multi 0.32.0

Multi-agent coordination patterns for Common Agent Runtime
//! Runtime concurrency-anomaly gating for multi-agent coordination
//! (EPIC A / task A5 — arXiv 2606.17182).
//!
//! `car_verify::concurrency` ships the *pure* detector ([`analyze_concurrency`])
//! and gate ([`gate_concurrency`]) over a schedule of timestamped
//! read → generate → write [`AgentOp`]s, but nothing in the runtime ever
//! *built* that schedule from a live multi-agent run — so the check sat dormant,
//! a library function no coordinator called. This module closes that gap.
//!
//! It instruments a coordination pattern into an [`AgentOp`] schedule and runs
//! the gate at the **cross-agent commit barrier**: the moment isolated agents
//! merge their local write overlays back into the shared parent state. That is
//! the only point where the paper's anomalies can occur in CAR — an isolated
//! agent reads the parent, generates for a while, then commits, and another
//! agent's commit can interleave.
//!
//! ## What the executor does with the gate's verdict
//!
//! [`gate_concurrency`] returns a [`Disposition`] per detected anomaly; the
//! [`ConcurrencyControl::guard`] here maps those onto concrete commit-barrier
//! actions, fail-closed:
//!
//! - **Abort** (default: `L0` causal-cascade) — the whole batch aborts; *no*
//!   agent's writes are merged. A broken causal order can't be repaired by
//!   dropping one writer.
//! - **RequireApproval** (default: `L1` stale-generation) — without a human
//!   approval sink at the library layer, the offending op's commit is
//!   **rejected** (its writes are not merged; its output is marked errored with
//!   the remediation reason). Siblings still commit. This is the safe reading of
//!   a lost update: don't commit a write computed against a value another op has
//!   already overwritten.
//! - **AutoRemediate** (default: `L2` phantom-tool, `L3` reorder) — the batch
//!   proceeds, but survivors are merged in a **deterministic order** (op id
//!   ascending), which realizes the `SerializeWriters` remediation: the same
//!   inputs always produce the same shared-state result instead of a
//!   nondeterministic last-writer-wins race.
//!
//! Every gate run emits one `AdmissionGateDecision` event (`gate:"concurrency"`)
//! to the shared event log — the audit trail that makes the check inspectable as
//! live enforcement, mirroring the per-proposal admission gates in `car-engine`.

use car_eventlog::{EventKind, EventLog};
use car_verify::concurrency::{
    analyze as analyze_concurrency, gate_concurrency, AgentOp, ConcurrencyGate,
    ConcurrencyGatePolicy, ConcurrencyReport, Disposition, Remediation,
};
use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;

/// Opt-in runtime enforcement of concurrency-anomaly gating for a coordination
/// group. Attach one to [`crate::SharedInfra`] to have the isolated parallel
/// swarm gate its merge barrier; absent, coordination behaves exactly as before
/// (purely additive, matching the empty-default-gate-list stance in
/// `car-engine`).
///
/// Carries the gate [`policy`](ConcurrencyGatePolicy) and a monotonic logical
/// clock used to stamp each op's `read_at`/`commit_at`, so the detector can tell
/// which generate windows overlapped.
#[derive(Clone)]
pub struct ConcurrencyControl {
    policy: ConcurrencyGatePolicy,
    clock: Arc<AtomicU64>,
}

impl ConcurrencyControl {
    /// Build with an explicit gate policy.
    pub fn new(policy: ConcurrencyGatePolicy) -> Self {
        Self {
            policy,
            clock: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Build with the default policy: abort on `L0` (causal-cascade), require
    /// approval on `L1` (stale-generation), auto-remediate the rest.
    pub fn with_default_policy() -> Self {
        Self::new(ConcurrencyGatePolicy::default())
    }

    /// The gate policy in force.
    pub fn policy(&self) -> &ConcurrencyGatePolicy {
        &self.policy
    }

    /// Next logical timestamp on the shared monotonic clock. Agents stamp
    /// `read_at` before they run and `commit_at` when they finish, so two agents
    /// that ran concurrently get overlapping `[read_at, commit_at]` windows.
    pub fn tick(&self) -> u64 {
        self.clock.fetch_add(1, Ordering::SeqCst)
    }

    /// Analyze `ops`, gate the report under this control's policy, emit the
    /// audit event, and return a [`ConcurrencyGuard`] telling the caller what to
    /// do at the commit barrier. Pure aside from the single log append.
    pub async fn guard(
        &self,
        ops: &[AgentOp],
        log: &Arc<TokioMutex<EventLog>>,
    ) -> ConcurrencyGuard {
        let report = analyze_concurrency(ops);
        let gate = gate_concurrency(&report, &self.policy);
        let guard = ConcurrencyGuard::from_gate(report, gate);
        guard.audit(log).await;
        guard
    }
}

/// The commit-barrier decision derived from a gated [`ConcurrencyReport`].
#[derive(Debug, Clone)]
pub struct ConcurrencyGuard {
    /// The raw detector report (level + every anomaly found).
    pub report: ConcurrencyReport,
    /// The gate's per-anomaly remediations and dispositions.
    pub gate: ConcurrencyGate,
    /// True when the whole batch must abort (any `Abort` disposition). No op is
    /// committed when set.
    pub abort: bool,
    /// Op ids whose individual commit must be rejected (fail-closed
    /// `RequireApproval`, no approval sink). Their writes are not merged.
    pub rejected_ops: HashSet<String>,
}

impl ConcurrencyGuard {
    fn from_gate(report: ConcurrencyReport, gate: ConcurrencyGate) -> Self {
        let mut abort = false;
        let mut rejected_ops = HashSet::new();
        for r in &gate.remediations {
            match r.disposition {
                Disposition::Abort => abort = true,
                Disposition::RequireApproval => {
                    if let Some(op) = remediation_primary_op(&r.remediation) {
                        rejected_ops.insert(op);
                    }
                }
                Disposition::AutoRemediate => {}
            }
        }
        Self {
            report,
            gate,
            abort,
            rejected_ops,
        }
    }

    /// True when the schedule was serializable — nothing to gate.
    pub fn is_clean(&self) -> bool {
        self.gate.safe
    }

    /// Should the op with this id have its writes committed? False when the
    /// whole batch aborts or this specific op was rejected for approval.
    pub fn may_commit(&self, op_id: &str) -> bool {
        !self.abort && !self.rejected_ops.contains(op_id)
    }

    /// A human-readable reason a specific op was held back, for the errored
    /// output surfaced to the caller.
    pub fn rejection_reason(&self, op_id: &str) -> Option<String> {
        if self.abort {
            return Some(format!(
                "concurrency gate aborted the batch at consistency level {:?}: {}",
                self.report.level,
                self.anomaly_summary()
            ));
        }
        if self.rejected_ops.contains(op_id) {
            return Some(format!(
                "concurrency gate rejected this commit (lost-update/stale generation) — {}",
                self.anomaly_summary()
            ));
        }
        None
    }

    /// One-line summary of the anomalies found, for logs and errors.
    pub fn anomaly_summary(&self) -> String {
        if self.report.anomalies.is_empty() {
            return "no anomalies".to_string();
        }
        self.report
            .anomalies
            .iter()
            .map(|a| a.explanation.clone())
            .collect::<Vec<_>>()
            .join("; ")
    }

    /// The serde-stable decision label for the audit event.
    fn decision_label(&self) -> &'static str {
        if self.abort {
            "reject"
        } else if !self.rejected_ops.is_empty() {
            "needs_approval"
        } else if self.gate.remediations.is_empty() {
            "allow"
        } else {
            // Auto-remediated only — proceeds, but not untouched.
            "allow"
        }
    }

    /// Emit the single `AdmissionGateDecision` audit event for this gate run.
    async fn audit(&self, log: &Arc<TokioMutex<EventLog>>) {
        let mut data: HashMap<String, Value> = HashMap::new();
        data.insert("gate".to_string(), Value::from("concurrency"));
        // This gate runs at the CROSS-AGENT COMMIT BARRIER, not at
        // pre-execution proposal admission — same event kind, different
        // phase. Consumers assuming pre-action semantics filter on this
        // (neo review: two emitters, two phase semantics, one event name).
        data.insert("phase".to_string(), Value::from("commit_barrier"));
        data.insert("decision".to_string(), Value::from(self.decision_label()));
        data.insert(
            "level".to_string(),
            serde_json::to_value(self.report.level).unwrap_or(Value::Null),
        );
        data.insert("abort".to_string(), Value::from(self.abort));
        if !self.report.anomalies.is_empty() {
            data.insert("reason".to_string(), Value::from(self.anomaly_summary()));
            data.insert(
                "anomalies".to_string(),
                serde_json::to_value(&self.report.anomalies).unwrap_or(Value::Null),
            );
        }
        if !self.rejected_ops.is_empty() {
            let mut blocked: Vec<String> = self.rejected_ops.iter().cloned().collect();
            blocked.sort();
            data.insert(
                "blocked".to_string(),
                serde_json::to_value(blocked).unwrap_or(Value::Null),
            );
        }
        if !self.gate.remediations.is_empty() {
            data.insert(
                "remediations".to_string(),
                serde_json::to_value(&self.gate.remediations).unwrap_or(Value::Null),
            );
        }
        let mut log = log.lock().await;
        log.append(EventKind::AdmissionGateDecision, None, None, data);
    }
}

/// The op an approval-gated remediation is primarily about — the one whose
/// commit we hold back when we can't ask a human.
fn remediation_primary_op(r: &Remediation) -> Option<String> {
    match r {
        Remediation::RereadAndRegenerate { op, .. } => Some(op.clone()),
        Remediation::PinToolRegistry { op, .. } => Some(op.clone()),
        Remediation::EnforceCausalOrder { dependent, .. } => Some(dependent.clone()),
        Remediation::SerializeWriters { ops, .. } => ops.first().cloned(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use car_eventlog::EventLog;

    fn op(id: &str, read_at: u64, commit_at: u64) -> AgentOp {
        AgentOp {
            id: id.to_string(),
            read_at,
            commit_at,
            ..Default::default()
        }
    }

    fn log() -> Arc<TokioMutex<EventLog>> {
        Arc::new(TokioMutex::new(EventLog::new()))
    }

    #[tokio::test]
    async fn clean_schedule_permits_all_commits() {
        let ctrl = ConcurrencyControl::with_default_policy();
        let mut a = op("a", 0, 1);
        a.write_set = vec!["x".into()];
        let mut b = op("b", 2, 3);
        b.write_set = vec!["y".into()];
        let g = ctrl.guard(&[a, b], &log()).await;
        assert!(g.is_clean());
        assert!(!g.abort);
        assert!(g.may_commit("a") && g.may_commit("b"));
    }

    #[tokio::test]
    async fn causal_cascade_aborts_whole_batch() {
        let ctrl = ConcurrencyControl::with_default_policy();
        // d depends on c but commits (1) before c (5) — L0.
        let mut c = op("c", 0, 5);
        c.write_set = vec!["k".into()];
        let mut d = op("d", 0, 1);
        d.depends_on = vec!["c".into()];
        let g = ctrl.guard(&[c, d], &log()).await;
        assert!(g.abort);
        // Nothing commits under an abort.
        assert!(!g.may_commit("c"));
        assert!(!g.may_commit("d"));
        assert!(g.rejection_reason("c").is_some());
    }

    #[tokio::test]
    async fn stale_generation_rejects_offending_op_only() {
        let ctrl = ConcurrencyControl::with_default_policy();
        // a reads+writes k over [0,2]; b overwrites k at 1 → a is stale (L1).
        let mut a = op("a", 0, 2);
        a.read_set = vec!["k".into()];
        a.write_set = vec!["k".into()];
        let mut b = op("b", 1, 1);
        b.write_set = vec!["k".into()];
        let g = ctrl.guard(&[a, b], &log()).await;
        assert!(!g.abort, "stale generation must not abort the whole batch");
        assert!(!g.may_commit("a"), "the stale writer is held back");
        assert!(g.may_commit("b"), "the other writer still commits");
        assert!(g.rejection_reason("a").is_some());
    }

    #[tokio::test]
    async fn reorder_auto_remediates_all_commit() {
        let ctrl = ConcurrencyControl::with_default_policy();
        // Two unordered overlapping writers to k → reorder (L3), auto-remediate.
        let mut a = op("a", 0, 3);
        a.write_set = vec!["k".into()];
        let mut b = op("b", 1, 4);
        b.write_set = vec!["k".into()];
        let g = ctrl.guard(&[a, b], &log()).await;
        assert!(!g.abort);
        assert!(g.rejected_ops.is_empty());
        assert!(g.may_commit("a") && g.may_commit("b"));
        assert!(!g.is_clean(), "an anomaly was still detected and audited");
    }

    #[tokio::test]
    async fn guard_emits_admission_event() {
        let ctrl = ConcurrencyControl::with_default_policy();
        let l = log();
        let mut a = op("a", 0, 3);
        a.write_set = vec!["k".into()];
        let mut b = op("b", 1, 4);
        b.write_set = vec!["k".into()];
        ctrl.guard(&[a, b], &l).await;
        let guard = l.lock().await;
        let events = guard.events();
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].kind, EventKind::AdmissionGateDecision);
        assert_eq!(
            events[0].data.get("gate").and_then(|v| v.as_str()),
            Some("concurrency")
        );
    }
}