use std::collections::{BTreeSet, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::Arc;
use async_trait::async_trait;
use super::gate::{
verify_changes, BuildTestStatus, DeclaredFootprint, FileChange, GateConfig, MergeVerdict,
NoVerifyWaiver,
};
use crate::shared::SharedInfra;
use crate::workspace::{AgentWorkspace, WorkspaceConfig};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ForemanProgress {
SubtaskStarted {
subtask_id: String,
index: usize,
level: usize,
total: usize,
},
SubtaskVerifying { subtask_id: String },
SubtaskGated {
subtask_id: String,
accepted: bool,
status: String,
},
}
pub type ForemanProgressSink = Arc<dyn Fn(ForemanProgress) + Send + Sync>;
fn verdict_status(verdict: &MergeVerdict) -> &'static str {
match verdict {
MergeVerdict::Accepted { .. } => "accepted",
MergeVerdict::Rejected { .. } => "rejected",
MergeVerdict::Inconclusive { .. } => "inconclusive",
}
}
#[inline]
fn emit(sink: &Option<ForemanProgressSink>, event: ForemanProgress) {
if let Some(sink) = sink {
sink(event);
}
}
#[derive(Debug, thiserror::Error)]
pub enum ForemanError {
#[error("workspace provisioning failed: {0}")]
Workspace(String),
#[error("agent execution failed: {0}")]
Agent(String),
#[error("git error: {0}")]
Git(String),
}
#[derive(Debug, Clone)]
pub struct Subtask {
pub id: String,
pub prompt: String,
pub files: Vec<String>,
pub footprint: Option<car_ast::SymbolFootprint>,
}
impl Subtask {
pub fn files_only(id: impl Into<String>, prompt: impl Into<String>, files: Vec<String>) -> Self {
Self {
id: id.into(),
prompt: prompt.into(),
files,
footprint: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct AgentRunSummary {
pub answer: String,
}
#[derive(Debug)]
pub struct WorktreeAgentRequest<'a> {
pub subtask: &'a Subtask,
pub cwd: &'a Path,
pub allowed_tools: Option<Vec<String>>,
pub mcp_endpoint: Option<String>,
}
#[async_trait]
pub trait WorktreeAgent: Send + Sync {
async fn run_in(
&self,
req: &WorktreeAgentRequest<'_>,
) -> Result<AgentRunSummary, ForemanError>;
}
#[derive(Debug, Clone, Default)]
pub struct FarmOutConfig {
pub verify_command: Option<Vec<String>>,
pub union_verify_command: Option<Vec<String>>,
pub allowed_tools: Option<Vec<String>>,
pub mcp_endpoint: Option<String>,
pub recover_via_single_session: bool,
pub worktree_base: Option<PathBuf>,
pub no_verify_waiver: Option<NoVerifyWaiver>,
}
fn default_worktree_base(repo_root: &Path) -> PathBuf {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
repo_root.hash(&mut hasher);
std::env::temp_dir()
.join("car-foreman-worktrees")
.join(format!("{:016x}", hasher.finish()))
}
fn worktree_workspace_config(repo_root: &Path, config: &FarmOutConfig) -> WorkspaceConfig {
let base = config
.worktree_base
.clone()
.unwrap_or_else(|| default_worktree_base(repo_root));
WorkspaceConfig::git_worktree_at(repo_root, base)
}
pub fn partition_by_files(subtasks: &[Subtask]) -> Vec<Vec<usize>> {
struct Level {
ids: Vec<usize>,
claimed: HashSet<String>,
open: bool,
}
let mut levels: Vec<Level> = Vec::new();
for (i, st) in subtasks.iter().enumerate() {
let files: HashSet<String> = st.files.iter().cloned().collect();
if files.is_empty() {
levels.push(Level {
ids: vec![i],
claimed: HashSet::new(),
open: false,
});
continue;
}
match levels
.iter_mut()
.find(|l| l.open && l.claimed.is_disjoint(&files))
{
Some(level) => {
level.ids.push(i);
level.claimed.extend(files);
}
None => levels.push(Level {
ids: vec![i],
claimed: files,
open: true,
}),
}
}
levels.into_iter().map(|l| l.ids).collect()
}
pub(crate) const FOOTPRINT_BLAST_DEPTH: usize = 3;
fn schedule(repo_root: &Path, subtasks: &[Subtask]) -> Vec<Vec<usize>> {
if subtasks.is_empty() || !subtasks.iter().all(|s| s.footprint.is_some()) {
return partition_by_files(subtasks);
}
let mut seen = HashSet::new();
if !subtasks.iter().all(|s| seen.insert(s.id.as_str())) {
return partition_by_files(subtasks);
}
let index = car_ast::ProjectIndex::build(repo_root);
let fsubs: Vec<car_ast::FootprintSubtask> = subtasks
.iter()
.map(|s| car_ast::FootprintSubtask {
id: s.id.clone(),
footprint: car_ast::expand_footprint(
&index,
s.footprint.as_ref().expect("all footprints present"),
FOOTPRINT_BLAST_DEPTH,
),
})
.collect();
let plan = car_ast::analyze(&fsubs);
plan.levels
.iter()
.map(|level| {
level
.iter()
.map(|id| subtasks.iter().position(|s| &s.id == id).unwrap())
.collect()
})
.collect()
}
#[derive(Debug)]
pub struct SubtaskOutcome {
pub subtask_id: String,
pub verdict: Option<MergeVerdict>,
pub changes: Vec<FileChange>,
pub patch: Option<String>,
pub error: Option<String>,
}
impl SubtaskOutcome {
pub fn is_accepted(&self) -> bool {
self.verdict.as_ref().is_some_and(|v| v.is_accepted())
}
}
#[derive(Debug)]
pub struct FarmOutResult {
pub levels: Vec<Vec<usize>>,
pub outcomes: Vec<SubtaskOutcome>,
}
impl FarmOutResult {
pub fn accepted_count(&self) -> usize {
self.outcomes.iter().filter(|o| o.is_accepted()).count()
}
}
pub async fn run_farm_out(
repo_root: &Path,
subtasks: &[Subtask],
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
) -> FarmOutResult {
run_farm_out_inner(repo_root, subtasks, agent, config, infra, None).await
}
pub async fn run_farm_out_with_progress(
repo_root: &Path,
subtasks: &[Subtask],
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
progress: ForemanProgressSink,
) -> FarmOutResult {
run_farm_out_inner(repo_root, subtasks, agent, config, infra, Some(progress)).await
}
async fn run_farm_out_inner(
repo_root: &Path,
subtasks: &[Subtask],
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
progress: Option<ForemanProgressSink>,
) -> FarmOutResult {
let levels = schedule(repo_root, subtasks);
let total = subtasks.len();
let mut outcomes = Vec::with_capacity(subtasks.len());
for (level_idx, level) in levels.iter().enumerate() {
let level_futs = level.iter().map(|&i| {
run_one_subtask(
repo_root,
i,
&subtasks[i],
agent,
config,
infra,
level_idx,
total,
progress.as_ref(),
)
});
outcomes.extend(futures::future::join_all(level_futs).await);
}
FarmOutResult { levels, outcomes }
}
#[allow(clippy::too_many_arguments)]
async fn run_one_subtask(
repo_root: &Path,
index: usize,
subtask: &Subtask,
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
level: usize,
total: usize,
progress: Option<&ForemanProgressSink>,
) -> SubtaskOutcome {
let progress = progress.cloned();
emit(
&progress,
ForemanProgress::SubtaskStarted {
subtask_id: subtask.id.clone(),
index,
level,
total,
},
);
let fail = |error: ForemanError| {
emit(
&progress,
ForemanProgress::SubtaskGated {
subtask_id: subtask.id.clone(),
accepted: false,
status: "error".into(),
},
);
SubtaskOutcome {
subtask_id: subtask.id.clone(),
verdict: None,
changes: Vec::new(),
patch: None,
error: Some(error.to_string()),
}
};
let ws_name = format!("{index:04}-{}", subtask.id);
let workspace =
match AgentWorkspace::provision(&worktree_workspace_config(repo_root, config), &ws_name) {
Ok(ws) => ws,
Err(e) => return fail(ForemanError::Workspace(e)),
};
let cwd = workspace.path().to_path_buf();
let req = WorktreeAgentRequest {
subtask,
cwd: &cwd,
allowed_tools: config.allowed_tools.clone(),
mcp_endpoint: config.mcp_endpoint.clone(),
};
if let Err(e) = agent.run_in(&req).await {
return fail(e);
}
let cwd_for_blocking = cwd.clone();
let (changes, patch) = match tokio::task::spawn_blocking(move || {
let changes = collect_file_changes(&cwd_for_blocking)?;
let patch = capture_patch(&cwd_for_blocking)?;
Ok::<_, ForemanError>((changes, patch))
})
.await
{
Ok(Ok(v)) => v,
Ok(Err(e)) => return fail(e),
Err(e) => return fail(ForemanError::Git(format!("collect task panicked: {e}"))),
};
emit(
&progress,
ForemanProgress::SubtaskVerifying {
subtask_id: subtask.id.clone(),
},
);
let mut gate_config = GateConfig::new(subtask.id.clone(), &cwd);
gate_config.verify_command = config.verify_command.clone();
gate_config.no_verify_waiver = config.no_verify_waiver.clone();
let footprint = match &subtask.footprint {
Some(fp) => DeclaredFootprint::from_refs(fp.writes.iter().cloned()),
None => DeclaredFootprint::unconstrained(),
};
let verdict = verify_changes(&gate_config, &changes, &footprint, infra).await;
emit(
&progress,
ForemanProgress::SubtaskGated {
subtask_id: subtask.id.clone(),
accepted: verdict.is_accepted(),
status: verdict_status(&verdict).into(),
},
);
SubtaskOutcome {
subtask_id: subtask.id.clone(),
verdict: Some(verdict),
changes,
patch: Some(patch),
error: None,
}
}
#[derive(Debug)]
pub struct IntegrationResult {
pub applied: usize,
pub apply_conflicts: Vec<String>,
pub verdict: Option<MergeVerdict>,
pub blame: Option<IntegrationBlame>,
}
impl IntegrationResult {
pub fn integrated_cleanly(&self) -> bool {
self.apply_conflicts.is_empty()
&& self.verdict.as_ref().is_some_and(|v| v.is_accepted())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApplyConflict {
pub subtask_id: String,
pub files: Vec<String>,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DuplicateBlame {
pub file: String,
pub symbol: String,
pub candidate_subtask_ids: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BuildTestFailure {
pub code: Option<i32>,
pub output_tail: String,
pub candidate_subtask_ids: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct IntegrationBlame {
pub apply_conflicts: Vec<ApplyConflict>,
pub duplicate_conflicts: Vec<DuplicateBlame>,
pub build_test: Option<BuildTestFailure>,
}
impl IntegrationBlame {
pub fn is_empty(&self) -> bool {
self.apply_conflicts.is_empty()
&& self.duplicate_conflicts.is_empty()
&& self.build_test.is_none()
}
pub fn implicated_subtasks(&self) -> BTreeSet<String> {
let mut ids = BTreeSet::new();
for c in &self.apply_conflicts {
ids.insert(c.subtask_id.clone());
}
for d in &self.duplicate_conflicts {
ids.extend(d.candidate_subtask_ids.iter().cloned());
}
if let Some(bt) = &self.build_test {
ids.extend(bt.candidate_subtask_ids.iter().cloned());
}
ids
}
}
fn localize_build_failure(
output: &str,
file_to_subtasks: &std::collections::HashMap<String, Vec<String>>,
) -> Vec<String> {
let mut ids = BTreeSet::new();
for (file, subtasks) in file_to_subtasks {
if output.contains(file.as_str()) {
ids.extend(subtasks.iter().cloned());
}
}
ids.into_iter().collect()
}
fn files_in_patch(patch: &str) -> Vec<String> {
let mut files = Vec::new();
for line in patch.lines() {
if let Some(rest) = line.strip_prefix("diff --git ") {
if let Some(pos) = rest.rfind(" b/") {
let file = &rest[pos + 3..];
if !file.is_empty() && !files.iter().any(|f| f == file) {
files.push(file.to_string());
}
}
}
}
files
}
pub async fn integrate_and_verify(
repo_root: &Path,
subtask_label: &str,
accepted_patches: &[(String, String)], config: &FarmOutConfig,
infra: &SharedInfra,
) -> Result<IntegrationResult, ForemanError> {
let staging = AgentWorkspace::provision(
&worktree_workspace_config(repo_root, config),
&format!("integrate-{subtask_label}"),
)
.map_err(ForemanError::Workspace)?;
let staging_path = staging.path().to_path_buf();
let patches: Vec<(String, String)> = accepted_patches.to_vec();
let patch_count = patches.len();
let verify_command = config
.union_verify_command
.clone()
.or_else(|| config.verify_command.clone());
let label = subtask_label.to_string();
let mut file_to_subtasks: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
let mut union_members: Vec<String> = Vec::new();
for (id, patch) in &patches {
if !union_members.contains(id) {
union_members.push(id.clone());
}
for file in files_in_patch(patch) {
file_to_subtasks.entry(file).or_default().push(id.clone());
}
}
let staging_for_blocking = staging_path.clone();
let (conflicts, changes) = tokio::task::spawn_blocking(move || {
let mut conflicts: Vec<ApplyConflict> = Vec::new();
for (id, patch) in &patches {
if patch.trim().is_empty() {
continue;
}
if let Err(e) = git_apply(&staging_for_blocking, patch) {
conflicts.push(ApplyConflict {
subtask_id: id.clone(),
files: files_in_patch(patch),
detail: e.to_string(),
});
}
}
let changes = collect_file_changes(&staging_for_blocking)?;
Ok::<_, ForemanError>((conflicts, changes))
})
.await
.map_err(|e| ForemanError::Git(format!("integrate task panicked: {e}")))??;
if !conflicts.is_empty() {
let apply_conflicts: Vec<String> = conflicts
.iter()
.map(|c| format!("{}: {}", c.subtask_id, c.detail))
.collect();
return Ok(IntegrationResult {
applied: patch_count - conflicts.len(),
apply_conflicts,
verdict: None,
blame: Some(IntegrationBlame {
apply_conflicts: conflicts,
..Default::default()
}),
});
}
let mut gate_config = GateConfig::new(format!("union:{label}"), &staging_path);
gate_config.verify_command = verify_command;
gate_config.no_verify_waiver = config.no_verify_waiver.clone();
let verdict =
verify_changes(&gate_config, &changes, &DeclaredFootprint::unconstrained(), infra).await;
let blame = if verdict.is_accepted() {
None
} else {
let ev = verdict.evidence();
let duplicate_conflicts = ev
.semantic_conflicts
.iter()
.map(|d| DuplicateBlame {
file: d.file.clone(),
symbol: d.symbol.clone(),
candidate_subtask_ids: file_to_subtasks.get(&d.file).cloned().unwrap_or_default(),
})
.collect();
let build_test = match &ev.build_test {
BuildTestStatus::Failed { code, output } => {
let localized = localize_build_failure(output, &file_to_subtasks);
let candidate_subtask_ids = if localized.is_empty() {
union_members.clone()
} else {
localized
};
Some(BuildTestFailure {
code: *code,
output_tail: output.clone(), candidate_subtask_ids,
})
}
_ => None,
};
Some(IntegrationBlame {
apply_conflicts: Vec::new(),
duplicate_conflicts,
build_test,
})
};
Ok(IntegrationResult {
applied: patch_count,
apply_conflicts: Vec::new(),
verdict: Some(verdict),
blame,
})
}
pub async fn regional_replan(
repo_root: &Path,
goal: &str,
clean_patches: &[(String, String)],
agent: &dyn WorktreeAgent,
config: &FarmOutConfig,
infra: &SharedInfra,
) -> Option<SubtaskOutcome> {
let workspace =
AgentWorkspace::provision(&worktree_workspace_config(repo_root, config), "regional-replan")
.ok()?;
let cwd = workspace.path().to_path_buf();
let cwd_for_apply = cwd.clone();
let clean = clean_patches.to_vec();
let staged = tokio::task::spawn_blocking(move || {
for (_id, patch) in &clean {
if patch.trim().is_empty() {
continue;
}
if git_apply(&cwd_for_apply, patch).is_err() {
return false;
}
}
true
})
.await
.ok()?;
if !staged {
return None;
}
let subtask = Subtask::files_only("__regional_replan__", goal.to_string(), Vec::new());
let req = WorktreeAgentRequest {
subtask: &subtask,
cwd: &cwd,
allowed_tools: config.allowed_tools.clone(),
mcp_endpoint: config.mcp_endpoint.clone(),
};
if agent.run_in(&req).await.is_err() {
return None;
}
let cwd_for_blocking = cwd.clone();
let (changes, patch) = tokio::task::spawn_blocking(move || {
let changes = collect_file_changes(&cwd_for_blocking)?;
let patch = capture_patch(&cwd_for_blocking)?;
Ok::<_, ForemanError>((changes, patch))
})
.await
.ok()?
.ok()?;
let goal_check = config
.union_verify_command
.clone()
.or_else(|| config.verify_command.clone());
let mut gate_config = GateConfig::new("__regional_replan__", &cwd);
gate_config.verify_command = goal_check;
gate_config.no_verify_waiver = config.no_verify_waiver.clone();
let verdict =
verify_changes(&gate_config, &changes, &DeclaredFootprint::unconstrained(), infra).await;
Some(SubtaskOutcome {
subtask_id: "__regional_replan__".into(),
verdict: Some(verdict),
changes,
patch: Some(patch),
error: None,
})
}
fn collect_file_changes(worktree: &Path) -> Result<Vec<FileChange>, ForemanError> {
let porcelain = git(
worktree,
&["status", "--porcelain", "-z", "--untracked-files=all"],
)?;
let mut changes = Vec::new();
let mut fields = porcelain.split('\0');
while let Some(entry) = fields.next() {
if entry.len() < 4 {
continue;
}
let x = entry.as_bytes()[0];
let path = entry[3..].to_string();
if x == b'R' || x == b'C' {
if let Some(old) = fields.next() {
if !old.is_empty() {
changes.push(FileChange {
path: old.to_string(),
before: read_head(worktree, old),
after: None,
});
}
}
changes.push(FileChange {
path: path.clone(),
before: None,
after: read_worktree(worktree, &path),
});
continue;
}
let before = read_head(worktree, &path);
let after = read_worktree(worktree, &path);
if before.is_none() && after.is_none() {
continue;
}
changes.push(FileChange {
path,
before,
after,
});
}
Ok(changes)
}
fn read_head(worktree: &Path, path: &str) -> Option<String> {
git(worktree, &["show", &format!("HEAD:./{path}")]).ok()
}
fn read_worktree(worktree: &Path, path: &str) -> Option<String> {
std::fs::read(worktree.join(path))
.ok()
.map(|bytes| String::from_utf8_lossy(&bytes).into_owned())
}
fn capture_patch(worktree: &Path) -> Result<String, ForemanError> {
git(worktree, &["add", "-AN"])?;
git(
worktree,
&[
"-c",
"core.autocrlf=false",
"diff",
"HEAD",
"--binary",
"--no-textconv",
],
)
}
fn git(cwd: &Path, args: &[&str]) -> Result<String, ForemanError> {
let out = Command::new("git")
.args(args)
.current_dir(cwd)
.output()
.map_err(|e| ForemanError::Git(format!("spawn git: {e}")))?;
if !out.status.success() {
return Err(ForemanError::Git(
String::from_utf8_lossy(&out.stderr).trim().to_string(),
));
}
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
}
fn git_apply(cwd: &Path, patch: &str) -> Result<(), ForemanError> {
let mut child = Command::new("git")
.args(["apply", "--whitespace=nowarn"])
.current_dir(cwd)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| ForemanError::Git(format!("spawn git apply: {e}")))?;
child
.stdin
.take()
.ok_or_else(|| ForemanError::Git("no stdin for git apply".into()))?
.write_all(patch.as_bytes())
.map_err(|e| ForemanError::Git(format!("write patch: {e}")))?;
let out = child
.wait_with_output()
.map_err(|e| ForemanError::Git(format!("git apply: {e}")))?;
if out.status.success() {
Ok(())
} else {
Err(ForemanError::Git(
String::from_utf8_lossy(&out.stderr).trim().to_string(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn st(id: &str, files: &[&str]) -> Subtask {
Subtask::files_only(id, format!("do {id}"), files.iter().map(|s| s.to_string()).collect())
}
#[test]
fn disjoint_files_pack_into_one_level() {
let levels = partition_by_files(&[st("a", &["src/a.rs"]), st("b", &["src/b.rs"])]);
assert_eq!(levels.len(), 1);
assert_eq!(levels[0].len(), 2);
}
#[test]
fn shared_file_forces_separate_levels() {
let levels =
partition_by_files(&[st("a", &["src/shared.rs"]), st("b", &["src/shared.rs"])]);
assert_eq!(levels.len(), 2);
}
#[test]
fn no_files_subtask_gets_its_own_isolated_level() {
let levels = partition_by_files(&[st("a", &["x.rs"]), st("nofiles", &[]), st("b", &["y.rs"])]);
let nofiles_level = levels.iter().find(|l| l.contains(&1)).unwrap();
assert_eq!(nofiles_level, &vec![1], "no-files subtask is isolated");
}
fn git_ok(cwd: &Path, args: &[&str]) {
let out = Command::new("git").args(args).current_dir(cwd).output().unwrap();
assert!(out.status.success(), "git {args:?}: {}", String::from_utf8_lossy(&out.stderr));
}
fn init_repo() -> tempfile::TempDir {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
git_ok(root, &["init", "-q", "-b", "main"]);
git_ok(root, &["config", "user.email", "t@t.t"]);
git_ok(root, &["config", "user.name", "t"]);
std::fs::create_dir_all(root.join("src")).unwrap();
std::fs::write(root.join("src/lib.rs"), "pub fn original() {}\n").unwrap();
git_ok(root, &["add", "-A"]);
git_ok(root, &["commit", "-q", "-m", "init"]);
dir
}
#[test]
fn collect_changes_handles_rename_with_spaces() {
let repo = init_repo();
let root = repo.path();
std::fs::write(root.join("old name.rs"), "pub fn moved() {}\n").unwrap();
git_ok(root, &["add", "-A"]);
git_ok(root, &["commit", "-q", "-m", "add"]);
git_ok(root, &["mv", "old name.rs", "new name.rs"]);
let changes = collect_file_changes(root).unwrap();
let paths: Vec<_> = changes.iter().map(|c| c.path.as_str()).collect();
assert!(paths.contains(&"old name.rs"), "rename deletion side present: {paths:?}");
assert!(paths.contains(&"new name.rs"), "rename addition side present: {paths:?}");
let old = changes.iter().find(|c| c.path == "old name.rs").unwrap();
assert!(old.before.is_some() && old.after.is_none(), "old path is a deletion");
}
struct WriteAgent {
path: String,
content: String,
}
#[async_trait]
impl WorktreeAgent for WriteAgent {
async fn run_in(
&self,
req: &WorktreeAgentRequest<'_>,
) -> Result<AgentRunSummary, ForemanError> {
let target = req.cwd.join(&self.path);
if let Some(parent) = target.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(target, &self.content).map_err(|e| ForemanError::Agent(e.to_string()))?;
Ok(AgentRunSummary::default())
}
}
fn cfg(verify: &[&str]) -> FarmOutConfig {
FarmOutConfig {
verify_command: Some(verify.iter().map(|s| s.to_string()).collect()),
..Default::default()
}
}
#[tokio::test]
async fn clean_edit_is_verified_through_harness() {
let repo = init_repo();
let agent = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn original() {}\npub fn added() {}\n".into(),
};
let infra = SharedInfra::new();
let result = run_farm_out(
repo.path(),
&[st("edit", &["src/lib.rs"])],
&agent,
&cfg(&["true"]),
&infra,
)
.await;
let o = &result.outcomes[0];
assert!(o.error.is_none(), "{o:?}");
assert!(o.verdict.as_ref().unwrap().is_verified());
assert!(o.patch.as_ref().unwrap().contains("added"), "patch retained");
}
#[test]
fn default_worktree_base_is_outside_the_repo() {
let repo = init_repo();
let root = repo.path();
let base = default_worktree_base(root);
assert!(
!base.starts_with(root),
"worktree base {base:?} must not be inside repo {root:?}"
);
assert!(base.starts_with(std::env::temp_dir()));
assert_eq!(base, default_worktree_base(root));
let cfg = FarmOutConfig::default();
let _ = worktree_workspace_config(root, &cfg); }
#[tokio::test]
async fn worktrees_are_provisioned_outside_the_repo() {
struct CwdProbe {
seen: Arc<std::sync::Mutex<Option<PathBuf>>>,
}
#[async_trait]
impl WorktreeAgent for CwdProbe {
async fn run_in(
&self,
req: &WorktreeAgentRequest<'_>,
) -> Result<AgentRunSummary, ForemanError> {
*self.seen.lock().unwrap() = Some(req.cwd.to_path_buf());
std::fs::write(req.cwd.join("src/added.rs"), "pub fn a() {}\n")
.map_err(|e| ForemanError::Agent(e.to_string()))?;
Ok(AgentRunSummary::default())
}
}
let repo = init_repo();
let repo_root = repo.path().canonicalize().unwrap();
let seen = Arc::new(std::sync::Mutex::new(None));
let agent = CwdProbe { seen: Arc::clone(&seen) };
let infra = SharedInfra::new();
let _ = run_farm_out(
repo.path(),
&[st("edit", &["src/added.rs"])],
&agent,
&cfg(&["true"]),
&infra,
)
.await;
let cwd = seen.lock().unwrap().clone().expect("agent ran");
let cwd = cwd.canonicalize().unwrap_or(cwd);
assert!(
!cwd.starts_with(&repo_root),
"worktree {cwd:?} must be OUTSIDE repo {repo_root:?}"
);
}
#[tokio::test]
async fn no_verify_waiver_accepts_when_no_build_command() {
let repo = init_repo();
let agent = WriteAgent {
path: "src/added.rs".into(),
content: "pub fn added() {}\n".into(),
};
let infra = SharedInfra::new();
let r1 = run_farm_out(
repo.path(),
&[st("edit", &["src/added.rs"])],
&agent,
&FarmOutConfig::default(),
&infra,
)
.await;
assert!(
!r1.outcomes[0].is_accepted(),
"no command + no waiver must not be accepted: {:?}",
r1.outcomes[0]
);
let waived = FarmOutConfig {
no_verify_waiver: Some(NoVerifyWaiver {
class: "no-build-gate".into(),
reason: "no reliable build command for this project".into(),
}),
..Default::default()
};
let r2 = run_farm_out(
repo.path(),
&[st("edit", &["src/added.rs"])],
&agent,
&waived,
&infra,
)
.await;
let o = &r2.outcomes[0];
assert!(o.is_accepted(), "waiver must yield acceptance: {o:?}");
assert!(
!o.verdict.as_ref().unwrap().is_verified(),
"waiver-based acceptance is not build-verified"
);
}
#[tokio::test]
async fn progress_streams_started_then_gated_per_subtask() {
let repo = init_repo();
let agent = WriteAgent { path: "src/added.rs".into(), content: "pub fn a() {}\n".into() };
let events: Arc<std::sync::Mutex<Vec<ForemanProgress>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let sink: ForemanProgressSink = {
let events = Arc::clone(&events);
Arc::new(move |ev| events.lock().unwrap().push(ev))
};
let infra = SharedInfra::new();
let result = run_farm_out_with_progress(
repo.path(),
&[st("only", &["src/added.rs"])],
&agent,
&cfg(&["true"]),
&infra,
sink,
)
.await;
assert!(result.outcomes[0].is_accepted());
let events = events.lock().unwrap();
assert_eq!(events.len(), 3, "started + verifying + gated: {events:?}");
assert!(matches!(
&events[0],
ForemanProgress::SubtaskStarted { subtask_id, index: 0, level: 0, total: 1 } if subtask_id == "only"
), "first event is started: {:?}", events[0]);
assert!(matches!(
&events[1],
ForemanProgress::SubtaskVerifying { subtask_id } if subtask_id == "only"
), "second event is verifying: {:?}", events[1]);
assert!(matches!(
&events[2],
ForemanProgress::SubtaskGated { subtask_id, accepted: true, status } if subtask_id == "only" && status == "accepted"
), "third event is an accepted gate: {:?}", events[2]);
}
#[tokio::test]
async fn progress_reports_error_status_when_agent_fails() {
let repo = init_repo();
struct FailAgent;
#[async_trait]
impl WorktreeAgent for FailAgent {
async fn run_in(&self, _: &WorktreeAgentRequest<'_>) -> Result<AgentRunSummary, ForemanError> {
Err(ForemanError::Agent("boom".into()))
}
}
let events: Arc<std::sync::Mutex<Vec<ForemanProgress>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let sink: ForemanProgressSink = {
let events = Arc::clone(&events);
Arc::new(move |ev| events.lock().unwrap().push(ev))
};
let infra = SharedInfra::new();
let _ = run_farm_out_with_progress(
repo.path(),
&[st("boom", &["src/x.rs"])],
&FailAgent,
&cfg(&["true"]),
&infra,
sink,
)
.await;
let events = events.lock().unwrap();
assert_eq!(events.len(), 2, "started + gated(error): {events:?}");
assert!(matches!(
&events[1],
ForemanProgress::SubtaskGated { accepted: false, status, .. } if status == "error"
), "agent failure surfaces as an error gate: {:?}", events[1]);
}
#[tokio::test]
async fn declared_footprint_containment_rejects_out_of_scope_edit() {
let repo = init_repo(); std::fs::write(
repo.path().join("src/lib.rs"),
"pub fn foo() {}\npub fn other() {}\n",
)
.unwrap();
git_ok(repo.path(), &["commit", "-qam", "two fns"]);
let mut subtask = Subtask::files_only("a", "edit foo", vec!["src/lib.rs".into()]);
subtask.footprint = Some(car_ast::SymbolFootprint::writing([car_ast::SymbolRef::new(
"src/lib.rs",
"foo",
)]));
let agent = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn foo() -> u8 { 1 }\npub fn other() -> u8 { 2 }\n".into(),
};
let infra = SharedInfra::new();
let result = run_farm_out(repo.path(), &[subtask], &agent, &cfg(&["true"]), &infra).await;
let verdict = result.outcomes[0].verdict.as_ref().unwrap();
assert!(
matches!(verdict, MergeVerdict::Rejected { .. }),
"out-of-footprint edit must be rejected: {verdict:?}"
);
assert!(verdict
.evidence()
.containment_violations
.iter()
.any(|v| v.changed.symbol == "other"));
}
#[tokio::test]
async fn workspace_failure_is_captured_not_propagated() {
let dir = tempfile::tempdir().unwrap(); let agent = WriteAgent { path: "x".into(), content: String::new() };
let infra = SharedInfra::new();
let result = run_farm_out(dir.path(), &[st("x", &["a.rs"])], &agent, &cfg(&["true"]), &infra).await;
assert!(result.outcomes[0].verdict.is_none());
assert!(result.outcomes[0].error.is_some());
}
#[tokio::test]
async fn union_integration_catches_cross_subtask_duplicate() {
let repo = init_repo(); let infra = SharedInfra::new();
let agent_a = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn foo() {}\npub fn original() {}\n".into(), };
let agent_b = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn original() {}\npub fn foo() {}\n".into(), };
let a = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent_a, &cfg(&["true"]), &infra).await;
let b = run_farm_out(repo.path(), &[st("b", &["src/lib.rs"])], &agent_b, &cfg(&["true"]), &infra).await;
assert!(a.outcomes[0].is_accepted(), "A alone: {:?}", a.outcomes[0].verdict);
assert!(b.outcomes[0].is_accepted(), "B alone: {:?}", b.outcomes[0].verdict);
let patches = vec![
("a".to_string(), a.outcomes[0].patch.clone().unwrap()),
("b".to_string(), b.outcomes[0].patch.clone().unwrap()),
];
let integ = integrate_and_verify(repo.path(), "ab", &patches, &cfg(&["true"]), &infra)
.await
.unwrap();
assert!(
!integ.integrated_cleanly(),
"union of two subtasks both adding foo must be rejected: {integ:?}"
);
}
#[tokio::test]
async fn union_surfaces_overlapping_edit_as_apply_conflict() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent_a = WriteAgent { path: "src/lib.rs".into(), content: "pub fn original() -> u8 { 1 }\n".into() };
let agent_b = WriteAgent { path: "src/lib.rs".into(), content: "pub fn original() -> u16 { 2 }\n".into() };
let a = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent_a, &cfg(&["true"]), &infra).await;
let b = run_farm_out(repo.path(), &[st("b", &["src/lib.rs"])], &agent_b, &cfg(&["true"]), &infra).await;
let patches = vec![
("a".to_string(), a.outcomes[0].patch.clone().unwrap()),
("b".to_string(), b.outcomes[0].patch.clone().unwrap()),
];
let integ = integrate_and_verify(repo.path(), "ab", &patches, &cfg(&["true"]), &infra)
.await
.unwrap();
assert!(!integ.apply_conflicts.is_empty(), "overlapping edit must conflict loudly: {integ:?}");
assert!(!integ.integrated_cleanly());
}
#[tokio::test]
async fn union_of_disjoint_subtasks_integrates_cleanly() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent_a = WriteAgent { path: "a.rs".into(), content: "pub fn a() {}\n".into() };
let agent_b = WriteAgent { path: "b.rs".into(), content: "pub fn b() {}\n".into() };
let a = run_farm_out(repo.path(), &[st("a", &["a.rs"])], &agent_a, &cfg(&["true"]), &infra).await;
let b = run_farm_out(repo.path(), &[st("b", &["b.rs"])], &agent_b, &cfg(&["true"]), &infra).await;
let patches = vec![
("a".to_string(), a.outcomes[0].patch.clone().unwrap()),
("b".to_string(), b.outcomes[0].patch.clone().unwrap()),
];
let integ = integrate_and_verify(repo.path(), "ab", &patches, &cfg(&["true"]), &infra)
.await
.unwrap();
assert!(integ.integrated_cleanly(), "disjoint union integrates: {integ:?}");
}
#[tokio::test]
async fn union_uses_union_verify_command_not_worktree_command() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn original() {}\npub fn added() {}\n".into(),
};
let config = FarmOutConfig {
verify_command: Some(vec!["true".into()]), union_verify_command: Some(vec!["false".into()]), ..Default::default()
};
let r = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent, &config, &infra).await;
assert!(r.outcomes[0].is_accepted(), "per-worktree (true) accepts");
let patches = vec![("a".to_string(), r.outcomes[0].patch.clone().unwrap())];
let integ = integrate_and_verify(repo.path(), "u", &patches, &config, &infra)
.await
.unwrap();
assert!(
!integ.integrated_cleanly(),
"union must run union_verify_command (false) and reject: {integ:?}"
);
}
#[tokio::test]
async fn regional_replan_resumes_from_clean_and_delivers() {
let repo = init_repo();
let infra = SharedInfra::new();
let keeper = WriteAgent { path: "keep.rs".into(), content: "pub fn keep() {}\n".into() };
let k = run_farm_out(repo.path(), &[st("keep", &["keep.rs"])], &keeper, &cfg(&["true"]), &infra).await;
let clean = vec![("keep".to_string(), k.outcomes[0].patch.clone().unwrap())];
let agent = WriteAgent { path: "good.txt".into(), content: "done".into() };
let config = FarmOutConfig {
union_verify_command: Some(vec![
"sh".into(),
"-c".into(),
"test -f good.txt && test -f keep.rs".into(),
]),
..Default::default()
};
let outcome = regional_replan(repo.path(), "finish it", &clean, &agent, &config, &infra)
.await
.expect("regional ran");
assert!(outcome.is_accepted(), "regional delivered clean+region: {outcome:?}");
let patch = outcome.patch.unwrap();
assert!(patch.contains("keep.rs"), "clean work preserved in result: {patch}");
assert!(patch.contains("good.txt"), "region work present: {patch}");
}
#[tokio::test]
async fn regional_replan_bails_when_a_clean_patch_does_not_apply() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent = WriteAgent { path: "good.txt".into(), content: "done".into() };
let clean = vec![("broken".to_string(), "this is not a valid patch\n".to_string())];
let outcome =
regional_replan(repo.path(), "finish it", &clean, &agent, &cfg(&["true"]), &infra).await;
assert!(outcome.is_none(), "unappliable clean set bails to fallback");
}
#[test]
fn localize_build_failure_picks_subtasks_whose_files_are_named() {
let mut map = std::collections::HashMap::new();
map.insert("src/a.rs".to_string(), vec!["a".to_string()]);
map.insert("src/b.rs".to_string(), vec!["b".to_string()]);
let ids = localize_build_failure("error[E0277]: in src/a.rs:42:5\n", &map);
assert_eq!(ids, vec!["a".to_string()], "localized to the named file's subtask");
assert!(localize_build_failure("linker error, no file named\n", &map).is_empty());
}
#[test]
fn files_in_patch_parses_target_paths() {
let patch = "diff --git a/src/foo.rs b/src/foo.rs\n\
index e69de29..abc1234 100644\n\
--- a/src/foo.rs\n+++ b/src/foo.rs\n\
@@ -0,0 +1 @@\n+pub fn foo() {}\n\
diff --git a/bar.rs b/bar.rs\n--- a/bar.rs\n+++ b/bar.rs\n";
assert_eq!(
files_in_patch(patch),
vec!["src/foo.rs".to_string(), "bar.rs".to_string()]
);
}
#[tokio::test]
async fn blame_attributes_apply_conflict_to_subtask_and_files() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent_a = WriteAgent { path: "src/lib.rs".into(), content: "pub fn original() -> u8 { 1 }\n".into() };
let agent_b = WriteAgent { path: "src/lib.rs".into(), content: "pub fn original() -> u16 { 2 }\n".into() };
let a = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent_a, &cfg(&["true"]), &infra).await;
let b = run_farm_out(repo.path(), &[st("b", &["src/lib.rs"])], &agent_b, &cfg(&["true"]), &infra).await;
let patches = vec![
("a".to_string(), a.outcomes[0].patch.clone().unwrap()),
("b".to_string(), b.outcomes[0].patch.clone().unwrap()),
];
let integ = integrate_and_verify(repo.path(), "ab", &patches, &cfg(&["true"]), &infra)
.await
.unwrap();
let blame = integ.blame.expect("apply conflict produces blame");
assert_eq!(blame.apply_conflicts.len(), 1, "{blame:?}");
let c = &blame.apply_conflicts[0];
assert_eq!(c.subtask_id, "b", "the second patch is the one that conflicts");
assert!(c.files.contains(&"src/lib.rs".to_string()), "files attributed: {c:?}");
}
#[tokio::test]
async fn blame_carries_union_build_test_failure() {
let repo = init_repo();
let infra = SharedInfra::new();
let agent = WriteAgent {
path: "src/lib.rs".into(),
content: "pub fn original() {}\npub fn added() {}\n".into(),
};
let config = FarmOutConfig {
verify_command: Some(vec!["true".into()]),
union_verify_command: Some(vec!["false".into()]), ..Default::default()
};
let r = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent, &config, &infra).await;
let patches = vec![("a".to_string(), r.outcomes[0].patch.clone().unwrap())];
let integ = integrate_and_verify(repo.path(), "u", &patches, &config, &infra)
.await
.unwrap();
let blame = integ.blame.expect("rejected union produces blame");
let bt = blame.build_test.expect("union build/test failure recorded");
assert_eq!(bt.code, Some(1), "`false` exits 1: {bt:?}");
assert_eq!(bt.candidate_subtask_ids, vec!["a".to_string()], "region named: {bt:?}");
}
#[tokio::test]
async fn blame_attributes_duplicate_declaration_to_both_subtasks() {
let repo = init_repo();
let pad = "pub fn original() {}\npub fn p1() {}\npub fn p2() {}\npub fn p3() {}\n";
std::fs::write(repo.path().join("src/lib.rs"), pad).unwrap();
git_ok(repo.path(), &["commit", "-qam", "pad"]);
let infra = SharedInfra::new();
let agent_a = WriteAgent { path: "src/lib.rs".into(), content: format!("pub fn dup() {{}}\n{pad}") };
let agent_b = WriteAgent { path: "src/lib.rs".into(), content: format!("{pad}pub fn dup() {{}}\n") };
let a = run_farm_out(repo.path(), &[st("a", &["src/lib.rs"])], &agent_a, &cfg(&["true"]), &infra).await;
let b = run_farm_out(repo.path(), &[st("b", &["src/lib.rs"])], &agent_b, &cfg(&["true"]), &infra).await;
let patches = vec![
("a".to_string(), a.outcomes[0].patch.clone().unwrap()),
("b".to_string(), b.outcomes[0].patch.clone().unwrap()),
];
let integ = integrate_and_verify(repo.path(), "ab", &patches, &cfg(&["true"]), &infra)
.await
.unwrap();
assert!(integ.apply_conflicts.is_empty(), "disjoint hunks both apply: {integ:?}");
let blame = integ.blame.expect("duplicate union produces blame");
let dup = blame
.duplicate_conflicts
.iter()
.find(|d| d.symbol == "dup")
.unwrap_or_else(|| panic!("duplicate `dup` attributed: {blame:?}"));
assert_eq!(dup.file, "src/lib.rs");
let mut ids = dup.candidate_subtask_ids.clone();
ids.sort();
assert_eq!(ids, vec!["a".to_string(), "b".to_string()], "both subtasks blamed");
}
}