car-multi 0.28.0

Multi-agent coordination patterns for Common Agent Runtime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
//! `run_foreman` — the full pipeline as one reusable call.
//!
//! Decompose a goal, then EITHER farm the subtasks out and verify the integrated
//! union, OR — when the plan doesn't decompose (invalid, or no parallelism worth
//! it) — **fall back to a single whole-goal session**.
//!
//! Optionally (off by default; `FarmOutConfig::recover_via_single_session`), when
//! the plan DID decompose but the integrated union was rejected — an integration
//! failure the symbol-footprint planner structurally can't foresee — it
//! **recovers** by re-running the whole goal as one session. So a delivery-first
//! caller never just gives up; a cost-sensitive caller leaves recovery off and
//! inspects `delivered()` + the retained `integration` evidence. The recovery
//! costs a full extra (serial) session on top of the failed parallel spend —
//! hence opt-in. The recovery is graded by the SAME merge-verify gate, so it can
//! only turn a non-delivery into a *gated* delivery, never weaken soundness.
//!
//! The daemon's `foreman.run` is a thin caller of this, and the B7 eval exercises
//! the exact same path (including the fallback), so what the eval measures is
//! what production does.

use std::future::Future;
use std::path::Path;

use super::harness::{
    integrate_and_verify, regional_replan, run_farm_out, FarmOutConfig, IntegrationResult, Subtask,
    SubtaskOutcome, WorktreeAgent,
};
use super::planner::{decompose, DecomposeResult};
use crate::shared::SharedInfra;

/// How `run_foreman` executed the goal.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RunMode {
    /// The goal decomposed and the integrated union was accepted.
    Parallel,
    /// The plan didn't decompose (invalid or no real parallelism), so the whole
    /// goal ran as one session.
    SingleSession,
    /// The goal decomposed and was farmed out, but the integrated union was
    /// rejected (an integration failure the planner didn't foresee) — so it
    /// recovered by re-running the whole goal as one session.
    ParallelThenSingleSession,
    /// The integrated union was rejected, but blame localized the failure to a
    /// proper subset of the accepted subtasks — so it recovered by RESUMING from
    /// the clean (non-implicated) patches and completing the goal in one session,
    /// redoing only the failing region while preserving the successful parallel
    /// work. Cheaper than `ParallelThenSingleSession`; tried first.
    RegionalReplan,
}

/// Outcome of a full `run_foreman` pass.
#[derive(Debug)]
pub struct ForemanRunOutcome {
    pub plan: DecomposeResult,
    pub mode: RunMode,
    /// Per-subtask outcomes (one entry in `SingleSession` mode).
    pub outcomes: Vec<SubtaskOutcome>,
    /// The integrated-union verdict (only in `Parallel` mode).
    pub integration: Option<IntegrationResult>,
}

impl ForemanRunOutcome {
    /// The pipeline delivered a sound result: the integrated union was accepted
    /// (parallel), or the (recovery/fallback) single session was accepted.
    pub fn delivered(&self) -> bool {
        match self.mode {
            RunMode::Parallel => self
                .integration
                .as_ref()
                .is_some_and(|i| i.integrated_cleanly()),
            // `outcomes` holds the (single-session / regional) result in these
            // modes — exactly one entry, the gated recovery attempt.
            RunMode::SingleSession
            | RunMode::ParallelThenSingleSession
            | RunMode::RegionalReplan => self.outcomes.first().is_some_and(|o| o.is_accepted()),
        }
    }
}

/// Run the whole goal as one session and return its gate outcomes. The single
/// worktree IS the whole solution → graded by the GOAL check (union, falling
/// back to per-worktree).
async fn single_session(
    repo_root: &Path,
    goal: &str,
    agent: &dyn WorktreeAgent,
    config: &FarmOutConfig,
    infra: &SharedInfra,
) -> Vec<SubtaskOutcome> {
    let goal_check = config
        .union_verify_command
        .clone()
        .or_else(|| config.verify_command.clone());
    let single_cfg = FarmOutConfig {
        verify_command: goal_check,
        allowed_tools: config.allowed_tools.clone(),
        mcp_endpoint: config.mcp_endpoint.clone(),
        ..Default::default()
    };
    let whole = Subtask::files_only("__single_session__", goal.to_string(), Vec::new());
    let outcomes = run_farm_out(repo_root, &[whole], agent, &single_cfg, infra)
        .await
        .outcomes;
    // Exactly one subtask in → exactly one outcome out; `delivered()` reads
    // `outcomes.first()` on the strength of this invariant.
    debug_assert_eq!(outcomes.len(), 1);
    outcomes
}

/// Run the full Foreman pipeline. `generate` drives the planner (decompose);
/// `agent` executes each farmed-out subtask (or the single session). `config`'s
/// `verify_command` is the per-worktree regression check and `union_verify_command`
/// the integrated-union goal check; the single-session fallback is graded by the
/// goal check (union, falling back to per-worktree).
pub async fn run_foreman<F, Fut>(
    repo_root: &Path,
    goal: &str,
    max_attempts: u32,
    agent: &dyn WorktreeAgent,
    config: &FarmOutConfig,
    infra: &SharedInfra,
    generate: F,
) -> ForemanRunOutcome
where
    F: Fn(String) -> Fut,
    Fut: Future<Output = Result<String, String>>,
{
    let plan = decompose(repo_root, goal, max_attempts, generate).await;

    // Fall back to a single whole-goal session when the plan didn't decompose:
    // either the planner couldn't produce a valid plan, or it found no
    // parallelism worth the coordination (coupled work). Don't farm out pieces
    // that can't be correct in isolation; don't just give up either.
    if !plan.is_valid() || plan.prefer_single_session {
        return ForemanRunOutcome {
            plan,
            mode: RunMode::SingleSession,
            outcomes: single_session(repo_root, goal, agent, config, infra).await,
            integration: None,
        };
    }

    // Decomposable: farm out, then integrate the accepted patches and gate the
    // union.
    let farmed = run_farm_out(repo_root, &plan.subtasks, agent, config, infra).await;
    let accepted: Vec<(String, String)> = farmed
        .outcomes
        .iter()
        .filter(|o| o.is_accepted())
        .filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
        .collect();
    let integration = if accepted.is_empty() {
        None
    } else {
        integrate_and_verify(repo_root, "foreman-run", &accepted, config, infra)
            .await
            .ok()
    };

    // The parallel path delivered a sound integrated result, OR the caller
    // didn't opt into recovery — either way return the parallel outcome. When it
    // didn't integrate cleanly, `delivered()` is false and `integration` carries
    // the failure evidence, so the caller can decide whether to pay for a retry.
    if integration.as_ref().is_some_and(|i| i.integrated_cleanly())
        || !config.recover_via_single_session
    {
        return ForemanRunOutcome {
            plan,
            mode: RunMode::Parallel,
            outcomes: farmed.outcomes,
            integration,
        };
    }

    // Opted-in recovery. The parallel union was rejected (an integration failure
    // the planner didn't foresee). Prefer a REGIONAL replan: if blame localized
    // the failure to a proper subset of the accepted subtasks, resume from the
    // clean (non-implicated) patches and complete the goal in one session —
    // redoing only the failing region while preserving the successful parallel
    // work. Fall back to a whole-goal session when the failure can't be localized
    // (build/test implicates everything, nothing accepted) or the regional
    // attempt doesn't deliver. Either recovery is graded by the same gate.
    let clean_patches: Vec<(String, String)> = match &integration {
        Some(i) => {
            let implicated = i
                .blame
                .as_ref()
                .map(|b| b.implicated_subtasks())
                .unwrap_or_default();
            farmed
                .outcomes
                .iter()
                .filter(|o| o.is_accepted() && !implicated.contains(&o.subtask_id))
                .filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
                .collect()
        }
        None => Vec::new(),
    };

    // A regional replan only makes sense when there is clean work to preserve AND
    // the implicated set is a proper subset (some accepted patch was dropped) —
    // otherwise it degenerates to the whole-goal session below. (`accepted` was
    // computed once above; reuse its length rather than re-filtering.)
    if !clean_patches.is_empty() && clean_patches.len() < accepted.len() {
        if let Some(outcome) =
            regional_replan(repo_root, goal, &clean_patches, agent, config, infra).await
        {
            if outcome.is_accepted() {
                return ForemanRunOutcome {
                    plan,
                    mode: RunMode::RegionalReplan,
                    outcomes: vec![outcome],
                    integration,
                };
            }
        }
    }

    // Regional wasn't applicable or didn't deliver — re-run the whole goal as one
    // session. NB: this spends a full extra session on top of the failed parallel
    // (and any regional) spend. The failed parallel `integration` is retained.
    ForemanRunOutcome {
        plan,
        mode: RunMode::ParallelThenSingleSession,
        outcomes: single_session(repo_root, goal, agent, config, infra).await,
        integration,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::patterns::foreman::harness::{AgentRunSummary, ForemanError, WorktreeAgentRequest};
    use async_trait::async_trait;
    use std::path::Path as StdPath;
    use std::process::Command;

    struct StubAgent;

    #[async_trait]
    impl WorktreeAgent for StubAgent {
        async fn run_in(
            &self,
            req: &WorktreeAgentRequest<'_>,
        ) -> Result<AgentRunSummary, ForemanError> {
            // Write exactly the declared symbols (stays inside the footprint so
            // the gate's containment accepts); fall back to a free file when no
            // footprint was declared (the single-session subtask).
            match &req.subtask.footprint {
                Some(fp) => {
                    for w in &fp.writes {
                        std::fs::write(
                            req.cwd.join(&w.file),
                            format!("pub fn {}() {{}}\n", w.symbol),
                        )
                        .map_err(|e| ForemanError::Agent(e.to_string()))?;
                    }
                }
                None => {
                    // The single-session subtask has no footprint. Write the
                    // marker the goal check can look for.
                    std::fs::write(req.cwd.join("good.txt"), &req.subtask.prompt)
                        .map_err(|e| ForemanError::Agent(e.to_string()))?;
                }
            }
            Ok(AgentRunSummary::default())
        }
    }

    fn git(cwd: &StdPath, args: &[&str]) {
        Command::new("git")
            .args(args)
            .current_dir(cwd)
            .output()
            .unwrap();
    }

    fn repo() -> tempfile::TempDir {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path();
        git(root, &["init", "-q", "-b", "main"]);
        git(root, &["config", "user.email", "t@t.t"]);
        git(root, &["config", "user.name", "t"]);
        std::fs::write(root.join("seed.txt"), "seed\n").unwrap();
        // Pre-existing symbols so the footprint analyzer can resolve them (an
        // unknown symbol is fail-closed to uncertain → serialized).
        std::fs::write(root.join("x.rs"), "pub fn x() {}\n").unwrap();
        std::fs::write(root.join("y.rs"), "pub fn y() {}\n").unwrap();
        git(root, &["add", "-A"]);
        git(root, &["commit", "-qm", "base"]);
        dir
    }

    fn cfg() -> FarmOutConfig {
        FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            ..Default::default()
        }
    }

    #[tokio::test]
    async fn invalid_plan_falls_back_to_single_session_and_delivers() {
        let dir = repo();
        let infra = SharedInfra::new();
        // Generator always returns an unparseable plan → decompose gives up →
        // run_foreman must NOT bail; it runs one session and delivers.
        let outcome = run_foreman(
            dir.path(),
            "do the thing",
            2,
            &StubAgent,
            &cfg(),
            &infra,
            |_p| async move { Ok("not json at all".to_string()) },
        )
        .await;
        assert_eq!(
            outcome.mode,
            RunMode::SingleSession,
            "{:?}",
            outcome.plan.issues
        );
        assert!(
            outcome.delivered(),
            "single-session fallback delivered: {outcome:?}"
        );
        assert!(!outcome.plan.is_valid());
    }

    #[tokio::test]
    async fn disjoint_plan_runs_parallel_and_delivers() {
        let dir = repo();
        let infra = SharedInfra::new();
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
            {"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
        ]}"#;
        let outcome = run_foreman(
            dir.path(),
            "two things",
            2,
            &StubAgent,
            &cfg(),
            &infra,
            |_p| {
                let plan = plan.to_string();
                async move { Ok(plan) }
            },
        )
        .await;
        assert_eq!(outcome.mode, RunMode::Parallel, "{:?}", outcome.plan.issues);
        assert!(outcome.delivered(), "parallel union delivered: {outcome:?}");
    }

    #[tokio::test]
    async fn parallel_integration_failure_recovers_via_single_session() {
        let dir = repo();
        let infra = SharedInfra::new();
        // Disjoint plan → runs parallel. Per-worktree gate ("true") accepts each
        // subtask, but the GOAL check requires a `good.txt` the parallel subtasks
        // never produce (they write x.rs/y.rs) → the union fails → recover with a
        // single session, whose stub DOES write good.txt → delivers.
        let cfg = FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            union_verify_command: Some(vec!["sh".into(), "-c".into(), "test -f good.txt".into()]),
            recover_via_single_session: true, // opt in to recovery
            ..Default::default()
        };
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
            {"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
        ]}"#;
        let outcome = run_foreman(
            dir.path(),
            "make good.txt",
            2,
            &StubAgent,
            &cfg,
            &infra,
            |_p| {
                let plan = plan.to_string();
                async move { Ok(plan) }
            },
        )
        .await;
        assert_eq!(
            outcome.mode,
            RunMode::ParallelThenSingleSession,
            "parallel failed → recovered: {outcome:?}"
        );
        assert!(
            outcome.delivered(),
            "single-session recovery delivered: {outcome:?}"
        );
        // The failed parallel attempt is retained as evidence.
        assert!(outcome.integration.is_some());
    }

    #[tokio::test]
    async fn parallel_failure_recovers_via_regional_replan_when_blame_localizes() {
        let dir = repo();
        let infra = SharedInfra::new();
        // x and y run parallel (disjoint NEW files, planner-accepted) and both
        // pass the per-worktree gate. The union GOAL check fails AND its output
        // names `xx.rs` (as a real compiler error would name the broken file) →
        // blame localizes the failure to subtask x. So clean = {y} ⊊ accepted →
        // REGIONAL replan: resume from y's preserved work; the stub completes the
        // goal (writes good.txt) → the goal check now passes. No whole-goal rerun.
        let cfg = FarmOutConfig {
            verify_command: Some(vec!["true".into()]),
            union_verify_command: Some(vec![
                "sh".into(),
                "-c".into(),
                "echo 'compile error in xx.rs'; test -f good.txt".into(),
            ]),
            recover_via_single_session: true,
            ..Default::default()
        };
        // New files (not the repo's pre-existing x.rs/y.rs) so the stub's writes
        // are real, contained patches the blame map can attribute.
        let plan = r#"{"subtasks":[
            {"id":"x","prompt":"x","writes":[{"file":"xx.rs","symbol":"fx"}]},
            {"id":"y","prompt":"y","writes":[{"file":"yy.rs","symbol":"fy"}]}
        ]}"#;
        let outcome = run_foreman(
            dir.path(),
            "make good.txt",
            2,
            &StubAgent,
            &cfg,
            &infra,
            |_p| {
                let plan = plan.to_string();
                async move { Ok(plan) }
            },
        )
        .await;
        assert_eq!(
            outcome.mode,
            RunMode::RegionalReplan,
            "localized blame → regional, not whole-goal: {outcome:?}"
        );
        assert!(
            outcome.delivered(),
            "regional replan delivered: {outcome:?}"
        );
        // Clean work (y) was preserved into the regional result.
        let patch = outcome.outcomes[0].patch.as_ref().unwrap();
        assert!(patch.contains("yy.rs"), "y's clean work preserved: {patch}");
        assert!(
            outcome.integration.is_some(),
            "failed parallel retained as evidence"
        );
    }
}