use std::future::Future;
use std::path::Path;
use super::harness::{
integrate_and_verify, regional_replan, run_farm_out, FarmOutConfig, IntegrationResult, Subtask,
SubtaskOutcome, WorktreeAgent,
};
use super::planner::{decompose, DecomposeResult};
use crate::shared::SharedInfra;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RunMode {
Parallel,
SingleSession,
ParallelThenSingleSession,
RegionalReplan,
}
#[derive(Debug)]
pub struct ForemanRunOutcome {
pub plan: DecomposeResult,
pub mode: RunMode,
pub outcomes: Vec<SubtaskOutcome>,
pub integration: Option<IntegrationResult>,
}
impl ForemanRunOutcome {
pub fn delivered(&self) -> bool {
match self.mode {
RunMode::Parallel => self
.integration
.as_ref()
.is_some_and(|i| i.integrated_cleanly()),
RunMode::SingleSession
| RunMode::ParallelThenSingleSession
| RunMode::RegionalReplan => self.outcomes.first().is_some_and(|o| o.is_accepted()),
}
}
}
async fn single_session(
repo_root: &Path,
goal: &str,
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
) -> Vec<SubtaskOutcome> {
let goal_check = config
.union_verify_command
.clone()
.or_else(|| config.verify_command.clone());
let single_cfg = FarmOutConfig {
verify_command: goal_check,
allowed_tools: config.allowed_tools.clone(),
mcp_endpoint: config.mcp_endpoint.clone(),
..Default::default()
};
let whole = Subtask::files_only("__single_session__", goal.to_string(), Vec::new());
let outcomes = run_farm_out(repo_root, &[whole], agent, &single_cfg, infra)
.await
.outcomes;
debug_assert_eq!(outcomes.len(), 1);
outcomes
}
pub async fn run_foreman<F, Fut>(
repo_root: &Path,
goal: &str,
max_attempts: u32,
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
generate: F,
) -> ForemanRunOutcome
where
F: Fn(String) -> Fut,
Fut: Future<Output = Result<String, String>>,
{
let plan = decompose(repo_root, goal, max_attempts, generate).await;
if !plan.is_valid() || plan.prefer_single_session {
return ForemanRunOutcome {
plan,
mode: RunMode::SingleSession,
outcomes: single_session(repo_root, goal, agent, config, infra).await,
integration: None,
};
}
let farmed = run_farm_out(repo_root, &plan.subtasks, agent, config, infra).await;
let accepted: Vec<(String, String)> = farmed
.outcomes
.iter()
.filter(|o| o.is_accepted())
.filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
.collect();
let integration = if accepted.is_empty() {
None
} else {
integrate_and_verify(repo_root, "foreman-run", &accepted, config, infra)
.await
.ok()
};
if integration.as_ref().is_some_and(|i| i.integrated_cleanly())
|| !config.recover_via_single_session
{
return ForemanRunOutcome {
plan,
mode: RunMode::Parallel,
outcomes: farmed.outcomes,
integration,
};
}
let clean_patches: Vec<(String, String)> = match &integration {
Some(i) => {
let implicated = i
.blame
.as_ref()
.map(|b| b.implicated_subtasks())
.unwrap_or_default();
farmed
.outcomes
.iter()
.filter(|o| o.is_accepted() && !implicated.contains(&o.subtask_id))
.filter_map(|o| o.patch.clone().map(|p| (o.subtask_id.clone(), p)))
.collect()
}
None => Vec::new(),
};
if !clean_patches.is_empty() && clean_patches.len() < accepted.len() {
if let Some(outcome) =
regional_replan(repo_root, goal, &clean_patches, agent, config, infra).await
{
if outcome.is_accepted() {
return ForemanRunOutcome {
plan,
mode: RunMode::RegionalReplan,
outcomes: vec![outcome],
integration,
};
}
}
}
ForemanRunOutcome {
plan,
mode: RunMode::ParallelThenSingleSession,
outcomes: single_session(repo_root, goal, agent, config, infra).await,
integration,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::patterns::foreman::harness::{AgentRunSummary, ForemanError, WorktreeAgentRequest};
use async_trait::async_trait;
use std::path::Path as StdPath;
use std::process::Command;
struct StubAgent;
#[async_trait]
impl WorktreeAgent for StubAgent {
async fn run_in(
&self,
req: &WorktreeAgentRequest<'_>,
) -> Result<AgentRunSummary, ForemanError> {
match &req.subtask.footprint {
Some(fp) => {
for w in &fp.writes {
std::fs::write(req.cwd.join(&w.file), format!("pub fn {}() {{}}\n", w.symbol))
.map_err(|e| ForemanError::Agent(e.to_string()))?;
}
}
None => {
std::fs::write(req.cwd.join("good.txt"), &req.subtask.prompt)
.map_err(|e| ForemanError::Agent(e.to_string()))?;
}
}
Ok(AgentRunSummary::default())
}
}
fn git(cwd: &StdPath, args: &[&str]) {
Command::new("git").args(args).current_dir(cwd).output().unwrap();
}
fn repo() -> tempfile::TempDir {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
git(root, &["init", "-q", "-b", "main"]);
git(root, &["config", "user.email", "t@t.t"]);
git(root, &["config", "user.name", "t"]);
std::fs::write(root.join("seed.txt"), "seed\n").unwrap();
std::fs::write(root.join("x.rs"), "pub fn x() {}\n").unwrap();
std::fs::write(root.join("y.rs"), "pub fn y() {}\n").unwrap();
git(root, &["add", "-A"]);
git(root, &["commit", "-qm", "base"]);
dir
}
fn cfg() -> FarmOutConfig {
FarmOutConfig {
verify_command: Some(vec!["true".into()]),
..Default::default()
}
}
#[tokio::test]
async fn invalid_plan_falls_back_to_single_session_and_delivers() {
let dir = repo();
let infra = SharedInfra::new();
let outcome = run_foreman(dir.path(), "do the thing", 2, &StubAgent, &cfg(), &infra, |_p| async move {
Ok("not json at all".to_string())
})
.await;
assert_eq!(outcome.mode, RunMode::SingleSession, "{:?}", outcome.plan.issues);
assert!(outcome.delivered(), "single-session fallback delivered: {outcome:?}");
assert!(!outcome.plan.is_valid());
}
#[tokio::test]
async fn disjoint_plan_runs_parallel_and_delivers() {
let dir = repo();
let infra = SharedInfra::new();
let plan = r#"{"subtasks":[
{"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
{"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
]}"#;
let outcome = run_foreman(dir.path(), "two things", 2, &StubAgent, &cfg(), &infra, |_p| {
let plan = plan.to_string();
async move { Ok(plan) }
})
.await;
assert_eq!(outcome.mode, RunMode::Parallel, "{:?}", outcome.plan.issues);
assert!(outcome.delivered(), "parallel union delivered: {outcome:?}");
}
#[tokio::test]
async fn parallel_integration_failure_recovers_via_single_session() {
let dir = repo();
let infra = SharedInfra::new();
let cfg = FarmOutConfig {
verify_command: Some(vec!["true".into()]),
union_verify_command: Some(vec!["sh".into(), "-c".into(), "test -f good.txt".into()]),
recover_via_single_session: true, ..Default::default()
};
let plan = r#"{"subtasks":[
{"id":"x","prompt":"x","writes":[{"file":"x.rs","symbol":"x"}]},
{"id":"y","prompt":"y","writes":[{"file":"y.rs","symbol":"y"}]}
]}"#;
let outcome = run_foreman(dir.path(), "make good.txt", 2, &StubAgent, &cfg, &infra, |_p| {
let plan = plan.to_string();
async move { Ok(plan) }
})
.await;
assert_eq!(
outcome.mode,
RunMode::ParallelThenSingleSession,
"parallel failed → recovered: {outcome:?}"
);
assert!(outcome.delivered(), "single-session recovery delivered: {outcome:?}");
assert!(outcome.integration.is_some());
}
#[tokio::test]
async fn parallel_failure_recovers_via_regional_replan_when_blame_localizes() {
let dir = repo();
let infra = SharedInfra::new();
let cfg = FarmOutConfig {
verify_command: Some(vec!["true".into()]),
union_verify_command: Some(vec![
"sh".into(),
"-c".into(),
"echo 'compile error in xx.rs'; test -f good.txt".into(),
]),
recover_via_single_session: true,
..Default::default()
};
let plan = r#"{"subtasks":[
{"id":"x","prompt":"x","writes":[{"file":"xx.rs","symbol":"fx"}]},
{"id":"y","prompt":"y","writes":[{"file":"yy.rs","symbol":"fy"}]}
]}"#;
let outcome = run_foreman(dir.path(), "make good.txt", 2, &StubAgent, &cfg, &infra, |_p| {
let plan = plan.to_string();
async move { Ok(plan) }
})
.await;
assert_eq!(
outcome.mode,
RunMode::RegionalReplan,
"localized blame → regional, not whole-goal: {outcome:?}"
);
assert!(outcome.delivered(), "regional replan delivered: {outcome:?}");
let patch = outcome.outcomes[0].patch.as_ref().unwrap();
assert!(patch.contains("yy.rs"), "y's clean work preserved: {patch}");
assert!(outcome.integration.is_some(), "failed parallel retained as evidence");
}
}