car-multi 0.26.0

Multi-agent coordination patterns for Common Agent Runtime
//! `run_foreman` — the full pipeline as one reusable call.
//!
//! Decompose a goal, then EITHER farm the subtasks out and verify the integrated
//! union, OR — when the plan doesn't decompose (invalid, or no parallelism worth
//! it) — **fall back to a single whole-goal session**.
//!
//! Optionally (off by default; `FarmOutConfig::recover_via_single_session`), when
//! the plan DID decompose but the integrated union was rejected — an integration
//! failure the symbol-footprint planner structurally can't foresee — it
//! **recovers** by re-running the whole goal as one session. So a delivery-first
//! caller never just gives up; a cost-sensitive caller leaves recovery off and
//! inspects `delivered()` + the retained `integration` evidence. The recovery
//! costs a full extra (serial) session on top of the failed parallel spend —
//! hence opt-in. The recovery is graded by the SAME merge-verify gate, so it can
//! only turn a non-delivery into a *gated* delivery, never weaken soundness.
//!
//! The daemon's `foreman.run` is a thin caller of this, and the B7 eval exercises
//! the exact same path (including the fallback), so what the eval measures is
//! what production does.

use std::future::Future;
use std::path::Path;

use super::harness::{
    integrate_and_verify, regional_replan, run_farm_out, FarmOutConfig, IntegrationResult, Subtask,
    SubtaskOutcome, WorktreeAgent,
};
use super::planner::{decompose, DecomposeResult};
use crate::shared::SharedInfra;

/// How `run_foreman` executed the goal.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RunMode {
    /// The goal decomposed and the integrated union was accepted.
    Parallel,
    /// The plan didn't decompose (invalid or no real parallelism), so the whole
    /// goal ran as one session.
    SingleSession,
    /// The goal decomposed and was farmed out, but the integrated union was
    /// rejected (an integration failure the planner didn't foresee) — so it
    /// recovered by re-running the whole goal as one session.
    ParallelThenSingleSession,
    /// The integrated union was rejected, but blame localized the failure to a
    /// proper subset of the accepted subtasks — so it recovered by RESUMING from
    /// the clean (non-implicated) patches and completing the goal in one session,
    /// redoing only the failing region while preserving the successful parallel
    /// work. Cheaper than `ParallelThenSingleSession`; tried first.
    RegionalReplan,
}

/// Outcome of a full `run_foreman` pass.
#[derive(Debug)]
pub struct ForemanRunOutcome {
    pub plan: DecomposeResult,
    pub mode: RunMode,
    /// Per-subtask outcomes (one entry in `SingleSession` mode).
    pub outcomes: Vec<SubtaskOutcome>,
    /// The integrated-union verdict (only in `Parallel` mode).
    pub integration: Option<IntegrationResult>,
}

impl ForemanRunOutcome {
    /// The pipeline delivered a sound result: the integrated union was accepted
    /// (parallel), or the (recovery/fallback) single session was accepted.
    pub fn delivered(&self) -> bool {
        match self.mode {
            RunMode::Parallel => self
                .integration
                .as_ref()
                .is_some_and(|i| i.integrated_cleanly()),
            // `outcomes` holds the (single-session / regional) result in these
            // modes — exactly one entry, the gated recovery attempt.
            RunMode::SingleSession
            | RunMode::ParallelThenSingleSession
            | RunMode::RegionalReplan => self.outcomes.first().is_some_and(|o| o.is_accepted()),
        }
    }
}

/// Run the whole goal as one session and return its gate outcomes. The single
/// worktree IS the whole solution → graded by the GOAL check (union, falling
/// back to per-worktree).
async fn single_session(
    repo_root: &Path,
    goal: &str,
    agent: &dyn WorktreeAgent,
    config: &FarmOutConfig,
    infra: &SharedInfra,
) -> Vec<SubtaskOutcome> {
    let goal_check = config
        .union_verify_command
        .clone()
        .or_else(|| config.verify_command.clone());
    let single_cfg = FarmOutConfig {
        verify_command: goal_check,
        allowed_tools: config.allowed_tools.clone(),
        mcp_endpoint: config.mcp_endpoint.clone(),
        ..Default::default()
    };
    let whole = Subtask::files_only("__single_session__", goal.to_string(), Vec::new());
    let outcomes = run_farm_out(repo_root, &[whole], agent, &single_cfg, infra)
        .await
        .outcomes;
    // Exactly one subtask in → exactly one outcome out; `delivered()` reads
    // `outcomes.first()` on the strength of this invariant.
    debug_assert_eq!(outcomes.len(), 1);
    outcomes
}

/// Run the full Foreman pipeline. `generate` drives the planner (decompose);
/// `agent` executes each farmed-out subtask (or the single session). `config`'s
/// `verify_command` is the per-worktree regression check and `union_verify_command`
/// the integrated-union goal check; the single-session fallback is graded by the
/// goal check (union, falling back to per-worktree).
pub async fn run_foreman<F, Fut>(
    repo_root: &Path,
    goal: &str,
    max_attempts: u32,
    agent: &dyn WorktreeAgent,
    config: &FarmOutConfig,
    infra: &SharedInfra,
    generate: F,
) -> ForemanRunOutcome
where
    F: Fn(String) -> Fut,
    Fut: Future<Output = Result<String, String>>,
{
    let plan = decompose(repo_root, goal, max_attempts, generate).await;

    // Fall back to a single whole-goal session when the plan didn't decompose:
    // either the planner couldn't produce a valid plan, or it found no
    // parallelism worth the coordination (coupled work). Don't farm out pieces
    // that can't be correct in isolation; don't just give up either.
    if !plan.is_valid() || plan.prefer_single_session {
        return ForemanRunOutcome {
            plan,
            mode: RunMode::SingleSession,
            outcomes: single_session(repo_root, goal, agent, config, infra).await,
            integration: None,
        };
    }

    // Decomposable: farm out, then integrate the accepted patches and gate the
    // union.
    let farmed = run_farm_out(repo_root, &plan.subtasks, agent, config, infra).await;
    let accepted: Vec<(String, String)> = farmed
        .outcomes
        .iter()
        .filter(|o| o.is_accepted())
        .filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
        .collect();
    let integration = if accepted.is_empty() {
        None
    } else {
        integrate_and_verify(repo_root, "foreman-run", &accepted, config, infra)
            .await
            .ok()
    };

    // The parallel path delivered a sound integrated result, OR the caller
    // didn't opt into recovery — either way return the parallel outcome. When it
    // didn't integrate cleanly, `delivered()` is false and `integration` carries
    // the failure evidence, so the caller can decide whether to pay for a retry.
    if integration.as_ref().is_some_and(|i| i.integrated_cleanly())
        || !config.recover_via_single_session
    {
        return ForemanRunOutcome {
            plan,
            mode: RunMode::Parallel,
            outcomes: farmed.outcomes,
            integration,
        };
    }

    // Opted-in recovery. The parallel union was rejected (an integration failure
    // the planner didn't foresee). Prefer a REGIONAL replan: if blame localized
    // the failure to a proper subset of the accepted subtasks, resume from the
    // clean (non-implicated) patches and complete the goal in one session —
    // redoing only the failing region while preserving the successful parallel
    // work. Fall back to a whole-goal session when the failure can't be localized
    // (build/test implicates everything, nothing accepted) or the regional
    // attempt doesn't deliver. Either recovery is graded by the same gate.
    let clean_patches: Vec<(String, String)> = match &integration {
        Some(i) => {
            let implicated = i
                .blame
                .as_ref()
                .map(|b| b.implicated_subtasks())
                .unwrap_or_default();
            farmed
                .outcomes
                .iter()
                .filter(|o| o.is_accepted() && !implicated.contains(&o.subtask_id))
                .filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
                .collect()
        }
        None => Vec::new(),
    };

    // A regional replan only makes sense when there is clean work to preserve AND
    // the implicated set is a proper subset (some accepted patch was dropped) —
    // otherwise it degenerates to the whole-goal session below. (`accepted` was
    // computed once above; reuse its length rather than re-filtering.)
    if !clean_patches.is_empty() && clean_patches.len() < accepted.len() {
        if let Some(outcome) =
            regional_replan(repo_root, goal, &clean_patches, agent, config, infra).await
        {
            if outcome.is_accepted() {
                return ForemanRunOutcome {
                    plan,
                    mode: RunMode::RegionalReplan,
                    outcomes: vec![outcome],
                    integration,
                };
            }
        }
    }

    // Regional wasn't applicable or didn't deliver — re-run the whole goal as one
    // session. NB: this spends a full extra session on top of the failed parallel
    // (and any regional) spend. The failed parallel `integration` is retained.
    ForemanRunOutcome {
        plan,
        mode: RunMode::ParallelThenSingleSession,
        outcomes: single_session(repo_root, goal, agent, config, infra).await,
        integration,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::patterns::foreman::harness::{AgentRunSummary, ForemanError, WorktreeAgentRequest};
    use async_trait::async_trait;
    use std::path::Path as StdPath;
    use std::process::Command;

    struct StubAgent;

    #[async_trait]
    impl WorktreeAgent for StubAgent {
        async fn run_in(
            &self,
            req: &WorktreeAgentRequest<'_>,
        ) -> Result<AgentRunSummary, ForemanError> {
            // Write exactly the declared symbols (stays inside the footprint so
            // the gate's containment accepts); fall back to a free file when no
            // footprint was declared (the single-session subtask).
            match &req.subtask.footprint {
                Some(fp) => {
                    for w in &fp.writes {
                        std::fs::write(req.cwd.join(&w.file), format!("pub fn {}() {{}}\n", w.symbol))
                            .map_err(|e| ForemanError::Agent(e.to_string()))?;
                    }
                }
                None => {
                    // The single-session subtask has no footprint. Write the
                    // marker the goal check can look for.
                    std::fs::write(req.cwd.join("good.txt"), &req.subtask.prompt)
                        .map_err(|e| ForemanError::Agent(e.to_string()))?;
                }
            }
            Ok(AgentRunSummary::default())
        }
    }

    fn git(cwd: &StdPath, args: &[&str]) {
        Command::new("git").args(args).current_dir(cwd).output().unwrap();
    }

    fn repo() -> tempfile::TempDir {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path();
        git(root, &["init", "-q", "-b", "main"]);
        git(root, &["config", "user.email", "t@t.t"]);
        git(root, &["config", "user.name", "t"]);
        std::fs::write(root.join("seed.txt"), "seed\n").unwrap();
        // Pre-existing symbols so the footprint analyzer can resolve them (an
        // unknown symbol is fail-closed to uncertain → serialized).
        std::fs::write(root.join("x.rs"), "pub fn x() {}\n").unwrap();
        std::fs::write(root.join("y.rs"), "pub fn y() {}\n").unwrap();
        git(root, &["add", "-A"]);
        git(root, &["commit", "-qm", "base"]);
        dir
    }

    fn cfg() -> FarmOutConfig {
        FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            ..Default::default()
        }
    }

    #[tokio::test]
    async fn invalid_plan_falls_back_to_single_session_and_delivers() {
        let dir = repo();
        let infra = SharedInfra::new();
        // Generator always returns an unparseable plan → decompose gives up →
        // run_foreman must NOT bail; it runs one session and delivers.
        let outcome = run_foreman(dir.path(), "do the thing", 2, &StubAgent, &cfg(), &infra, |_p| async move {
            Ok("not json at all".to_string())
        })
        .await;
        assert_eq!(outcome.mode, RunMode::SingleSession, "{:?}", outcome.plan.issues);
        assert!(outcome.delivered(), "single-session fallback delivered: {outcome:?}");
        assert!(!outcome.plan.is_valid());
    }

    #[tokio::test]
    async fn disjoint_plan_runs_parallel_and_delivers() {
        let dir = repo();
        let infra = SharedInfra::new();
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
            {"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
        ]}"#;
        let outcome = run_foreman(dir.path(), "two things", 2, &StubAgent, &cfg(), &infra, |_p| {
            let plan = plan.to_string();
            async move { Ok(plan) }
        })
        .await;
        assert_eq!(outcome.mode, RunMode::Parallel, "{:?}", outcome.plan.issues);
        assert!(outcome.delivered(), "parallel union delivered: {outcome:?}");
    }

    #[tokio::test]
    async fn parallel_integration_failure_recovers_via_single_session() {
        let dir = repo();
        let infra = SharedInfra::new();
        // Disjoint plan → runs parallel. Per-worktree gate ("true") accepts each
        // subtask, but the GOAL check requires a `good.txt` the parallel subtasks
        // never produce (they write x.rs/y.rs) → the union fails → recover with a
        // single session, whose stub DOES write good.txt → delivers.
        let cfg = FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            union_verify_command: Some(vec!["sh".into(), "-c".into(), "test -f good.txt".into()]),
            recover_via_single_session: true, // opt in to recovery
            ..Default::default()
        };
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
            {"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
        ]}"#;
        let outcome = run_foreman(dir.path(), "make good.txt", 2, &StubAgent, &cfg, &infra, |_p| {
            let plan = plan.to_string();
            async move { Ok(plan) }
        })
        .await;
        assert_eq!(
            outcome.mode,
            RunMode::ParallelThenSingleSession,
            "parallel failed → recovered: {outcome:?}"
        );
        assert!(outcome.delivered(), "single-session recovery delivered: {outcome:?}");
        // The failed parallel attempt is retained as evidence.
        assert!(outcome.integration.is_some());
    }

    #[tokio::test]
    async fn parallel_failure_recovers_via_regional_replan_when_blame_localizes() {
        let dir = repo();
        let infra = SharedInfra::new();
        // x and y run parallel (disjoint NEW files, planner-accepted) and both
        // pass the per-worktree gate. The union GOAL check fails AND its output
        // names `xx.rs` (as a real compiler error would name the broken file) →
        // blame localizes the failure to subtask x. So clean = {y} ⊊ accepted →
        // REGIONAL replan: resume from y's preserved work; the stub completes the
        // goal (writes good.txt) → the goal check now passes. No whole-goal rerun.
        let cfg = FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            union_verify_command: Some(vec![
                "sh".into(),
                "-c".into(),
                "echo 'compile error in xx.rs'; test -f good.txt".into(),
            ]),
            recover_via_single_session: true,
            ..Default::default()
        };
        // New files (not the repo's pre-existing x.rs/y.rs) so the stub's writes
        // are real, contained patches the blame map can attribute.
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"xx.rs","symbol":"fx"}]},
            {"id":"y","prompt":"y","writes":[{"file":"yy.rs","symbol":"fy"}]}
        ]}"#;
        let outcome = run_foreman(dir.path(), "make good.txt", 2, &StubAgent, &cfg, &infra, |_p| {
            let plan = plan.to_string();
            async move { Ok(plan) }
        })
        .await;
        assert_eq!(
            outcome.mode,
            RunMode::RegionalReplan,
            "localized blame → regional, not whole-goal: {outcome:?}"
        );
        assert!(outcome.delivered(), "regional replan delivered: {outcome:?}");
        // Clean work (y) was preserved into the regional result.
        let patch = outcome.outcomes[0].patch.as_ref().unwrap();
        assert!(patch.contains("yy.rs"), "y's clean work preserved: {patch}");
        assert!(outcome.integration.is_some(), "failed parallel retained as evidence");
    }
}