use std::path::PathBuf;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, Result};
use chrono::Local;
use serde::Serialize;
use crate::guard;
use crate::inspect;
use crate::packet::{self, PacketInputs};
use crate::schemas::{RunResult, TaskState, WorkQueue, WorkerProfile};
use crate::state::{self, write_str, Workspace};
use crate::{compact, evaluator, routing, telemetry, workers};
#[derive(Clone)]
pub struct ChainHandle {
pub prev_task_id: String,
pub worker_id: String,
pub session: String,
pub length: u32,
}
pub const CHAIN_CAP: u32 = 3;
pub struct RunOptions {
pub execute: bool,
pub worker_override: Option<String>,
pub target: Option<String>,
pub answer: Option<String>,
pub full_access: bool,
pub accept_ambiguity: bool,
pub chain: Option<ChainHandle>,
}
pub struct RunReport {
pub run_id: String,
pub task_id: String,
pub worker_id: String,
pub run_dir: PathBuf,
pub prepared: bool,
pub executed: bool,
pub lines: Vec<String>,
pub result_state: Option<TaskState>,
pub session: Option<String>,
pub chained: bool,
}
#[derive(Serialize)]
pub(crate) struct RunRecord {
pub schema_version: u32,
pub run_id: String,
pub task_id: String,
pub intent_id: String,
pub worker: String,
pub state: String,
pub started_at: String,
pub worktree: String,
}
pub fn run_next(ws: &Workspace, opts: &RunOptions) -> Result<RunReport> {
let mut queue = ws.load_queue()?;
let workers = ws.load_workers()?;
let billing = ws.load_billing()?;
let intent = ws.load_intent()?;
let config = ws.load_config()?;
if opts.target.is_none() && !opts.accept_ambiguity {
if let Some(i) = &intent {
if crate::planner::intent_gated(i, config.ambiguity_gate) {
return Err(anyhow!(
"the plan is still guessing (ambiguity: high, {} open question(s), \
interview turn {}/{}). Answer with `a` in the TUI or `yardlet answer`, \
or override with --accept-ambiguity.",
i.open_questions.len(),
i.interview_turns,
crate::planner::INTERVIEW_CAP
));
}
}
}
let idx = match &opts.target {
Some(id) => queue
.tasks
.iter()
.position(|t| &t.id == id)
.ok_or_else(|| anyhow!("task {id} not found in the queue"))?,
None => {
select_next(&queue, opts)?.ok_or_else(|| anyhow!("no eligible queued task to run"))?
}
};
let task = queue.tasks[idx].clone();
let prior_question = if opts.answer.is_some() {
latest_question_for(ws, &task.id)
} else {
None
};
let continuation = if task.state == TaskState::Partial {
continuation_context(ws, &task.id)
} else {
None
};
let resolved = routing::resolve_worker_for_task(
ws,
&workers,
&billing,
opts.worker_override.as_deref(),
&task,
);
let candidate_id = opts
.worker_override
.clone()
.filter(|s| !s.is_empty())
.or_else(|| (!task.preferred_worker.is_empty()).then(|| task.preferred_worker.clone()))
.unwrap_or_else(|| workers.routing.default_worker.clone());
let worker_id = resolved
.as_ref()
.map(|r| r.worker_id.clone())
.unwrap_or_else(|_| candidate_id.clone());
let run_id = format!("run-{}", Local::now().format("%Y%m%d-%H%M%S"));
let run_dir = ws.runs_dir().join(&run_id);
std::fs::create_dir_all(run_dir.join("evidence"))?;
let run_dir_rel = format!(".agents/runs/{run_id}");
let mut lines = Vec::new();
lines.push(format!("selected task {} ({})", task.id, task.title));
if let Some(rat) = &task.worker_rationale {
lines.push(format!("planner rationale: {rat}"));
}
lines.push(format!("run dir: {run_dir_rel}"));
let summary = inspect::summarize(&ws.root);
write_str(
&run_dir.join("evidence").join("repo-summary.md"),
&inspect::to_markdown(&summary),
)?;
let lang_sample = intent
.as_ref()
.map(|i| {
if !i.raw_request.is_empty() {
i.raw_request.clone()
} else {
i.summary.clone()
}
})
.unwrap_or_else(|| task.title.clone());
let language = packet::resolve_language(&config.language, &lang_sample);
let images: Vec<String> = intent
.as_ref()
.map(|i| i.images.clone())
.unwrap_or_default();
let role_notes = packet::load_role_notes(&ws.root, packet::role_for(&task.kind));
let harness = packet::discover_harness(&ws.root, config.harness_discovery);
let chained_from = opts.chain.as_ref().map(|c| c.prev_task_id.clone());
let packet_text = packet::compile(&PacketInputs {
worker_id: &worker_id,
task: &task,
intent: intent.as_ref(),
repo: &summary,
run_dir_rel: &run_dir_rel,
prior_question: prior_question.as_deref(),
user_answer: opts.answer.as_deref(),
continuation: continuation.as_deref(),
chained_from: chained_from.as_deref(),
language: &language,
images: &images,
role_notes: &role_notes,
harness: &harness,
});
write_str(&workers::packet_path(&run_dir), &packet_text)?;
let record = RunRecord {
schema_version: 1,
run_id: run_id.clone(),
task_id: task.id.clone(),
intent_id: queue.intent_id.clone(),
worker: worker_id.clone(),
state: if opts.execute { "running" } else { "prepared" }.to_string(),
started_at: Local::now().to_rfc3339(),
worktree: ".".to_string(),
};
state::save_yaml(&run_dir.join("run.yaml"), &record)?;
let billing_present = guard::present_billing_env(&billing.blocked_worker_env_names);
if !billing_present.is_empty() {
lines.push(format!(
"billing env present in parent ({}); will be scrubbed before worker runs",
billing_present.len()
));
}
if !opts.execute {
lines.push(String::new());
match &resolved {
Ok(r) => lines.push(format!("will use {} ({})", r.worker_id, r.reason)),
Err(e) => lines.push(format!("no invocable worker: {e}")),
}
lines.push("re-run with --execute to invoke the worker.".to_string());
return Ok(RunReport {
run_id,
task_id: task.id,
worker_id,
run_dir,
prepared: true,
executed: false,
lines,
result_state: None,
session: None,
chained: false,
});
}
if task.approval_required() {
if crate::approvals::is_granted(ws, &task.id) {
crate::approvals::consume(ws, &task.id)?; lines.push(format!("approval consumed for {}", task.id));
} else {
return Err(anyhow!(
"task {} requires approval. Run `yardlet approve {}` first, then \
`yardlet run --task {} --execute`.",
task.id,
task.id,
task.id
));
}
}
let resolved = resolved?; let reason = resolved.reason;
let bin = resolved.bin;
let profile = find_worker(&workers.workers, &worker_id)?;
let mut eff_profile = profile.clone();
if !task.model.trim().is_empty() {
eff_profile.model = task.model.clone();
}
if !task.effort.trim().is_empty() {
eff_profile.effort = task.effort.clone();
}
let full_access = opts.full_access || config.default_access.eq_ignore_ascii_case("full");
let env = guard::sanitized_worker_env_for(&billing, &eff_profile.invocation.pass_env)
.map_err(|e| anyhow!(e))?;
let timeout = Duration::from_secs(profile.limits.max_wall_minutes as u64 * 60);
lines.push(format!("worker: {worker_id} ({reason})"));
let pre = crate::hooks::run_phase(ws, crate::hooks::Phase::Pre, &task.id, &run_dir, &worker_id);
if !pre.ok() {
for f in &pre.failures {
lines.push(format!("pre-run hook blocked the run: {}", f.summary()));
}
queue.tasks[idx].state = TaskState::Failed;
ws.save_queue(&queue)?;
return Ok(RunReport {
run_id: run_id.clone(),
task_id: task.id.clone(),
worker_id: worker_id.clone(),
run_dir: run_dir.clone(),
prepared: true,
executed: false,
lines,
result_state: Some(TaskState::Failed),
session: None,
chained: false,
});
}
queue.tasks[idx].state = TaskState::Running;
ws.save_queue(&queue)?;
let chained = opts
.chain
.as_ref()
.is_some_and(|c| c.worker_id == worker_id);
if chained {
lines.push(format!(
"chaining into {}'s session (task {} of a hot chain)",
worker_id,
opts.chain.as_ref().map(|c| c.length + 1).unwrap_or(1)
));
}
let log_path = run_dir.join("worker-output.log");
let mut session_id: Option<String> = if chained {
opts.chain.as_ref().map(|c| c.session.clone())
} else if worker_id == "claude-code" {
Some(gen_session_uuid(&run_id))
} else {
None
};
let baseline_fp = evaluator::dirty_fingerprints(&ws.root);
let started_sys = std::time::SystemTime::now();
let run_started = std::time::Instant::now();
let mut outcome = workers::spawn(
&eff_profile,
&bin,
&packet_text,
&ws.root,
&env,
&log_path,
timeout,
full_access,
&images,
session_id.as_deref(),
chained,
)?;
if worker_id == "codex" && session_id.is_none() {
session_id = find_codex_session(started_sys);
}
let cancelled_marker = run_dir.join("cancelled");
let max_retries = eff_profile.limits.max_retries as u32;
let mut resumes = 0u32;
while session_id.is_some()
&& !cancelled_marker.exists()
&& is_transient_failure(&outcome, &run_dir)
&& resumes < max_retries
{
resumes += 1;
lines.push(format!(
"transient failure; resuming session ({resumes}/{max_retries})"
));
let cont = "The previous run was interrupted by a connection error before it finished. \
Continue from where you left off, complete the task, and write the result file \
exactly as specified in the original task packet.";
outcome = workers::spawn(
&eff_profile,
&bin,
cont,
&ws.root,
&env,
&log_path,
timeout,
full_access,
&images,
session_id.as_deref(),
true,
)?;
}
let wall_seconds = run_started.elapsed().as_secs();
if cancelled_marker.exists() {
let _ = std::fs::remove_file(&cancelled_marker);
save_task_state_on_latest_queue(ws, &mut queue, &task.id, TaskState::Queued)?;
lines.push(format!("stopped by user; {} requeued", task.id));
return Ok(RunReport {
run_id: run_id.clone(),
task_id: task.id.clone(),
worker_id: worker_id.clone(),
run_dir: run_dir.clone(),
prepared: true,
executed: true,
lines,
result_state: Some(TaskState::Queued),
session: session_id.clone(),
chained,
});
}
lines.push(format!(
"worker outcome: {} (exit_ok={}, timed_out={})",
outcome.note, outcome.exit_ok, outcome.timed_out
));
let evidence: Option<Vec<String>> =
match (&baseline_fp, evaluator::dirty_fingerprints(&ws.root)) {
(Some(base), Some(after)) => Some(evaluator::worker_touched(base, &after)),
_ => None,
};
let mut eval = evaluator::evaluate(&run_dir, &run_id, &task, evidence.as_deref());
let post = crate::hooks::run_phase(
ws,
crate::hooks::Phase::Post,
&task.id,
&run_dir,
&worker_id,
);
if !post.ok() {
for f in &post.failures {
lines.push(format!(
"post-run hook failed (blocks Done): {}",
f.summary()
));
eval.checks
.push(evaluator::fatal_failure("post-run hook", f.summary()));
}
if eval.next_task_state == TaskState::Done {
eval.next_task_state = TaskState::Failed;
}
}
let validation_cmds = validation_commands(&task);
let (validation_ran, validation_passed) =
run_validation_commands(&validation_cmds, &ws.root, &run_dir, &billing);
if (validation_ran && !validation_passed) || (validation_required(&task) && !validation_ran) {
lines.push("validation failed (blocks Done)".to_string());
eval.checks.push(evaluator::fatal_failure(
"validation",
"configured validation did not pass",
));
if eval.next_task_state == TaskState::Done {
eval.next_task_state = TaskState::Failed;
}
}
state::write_str(
&run_dir.join("evaluation.json"),
&serde_json::to_string_pretty(&eval)?,
)?;
let result: Option<crate::schemas::RunResult> =
std::fs::read_to_string(run_dir.join("result.json"))
.ok()
.and_then(|t| serde_json::from_str(&t).ok());
let intent_summary = intent.as_ref().map(|i| i.summary.as_str()).unwrap_or("");
compact::write_checkpoint(&run_dir, &task, &eval, result.as_ref(), intent_summary)?;
compact::write_handoff(&run_dir, &task, &eval, result.as_ref())?;
if let Some(r) = &result {
let learned = crate::skills::record_run_suggestions(ws, &r.harness_suggestions);
if !learned.is_empty() {
lines.push(format!("learned skill(s): {}", learned.join(", ")));
}
let rules = crate::skills::record_run_rules(ws, &r.harness_suggestions);
if !rules.is_empty() {
lines.push(format!("learned rule(s): {}", rules.join(", ")));
}
}
save_task_state_on_latest_queue(ws, &mut queue, &task.id, eval.next_task_state)?;
let user_override = opts.worker_override.as_ref().map(|o| {
let from = if task.preferred_worker.is_empty() {
"(default)".to_string()
} else {
task.preferred_worker.clone()
};
format!("{from}->{o}")
});
let _ = telemetry::append_run(
ws,
&telemetry::RunTelemetry {
ts: Local::now().to_rfc3339(),
task_id: task.id.clone(),
kind: task.kind.clone(),
risk: task.risk.clone(),
worker: worker_id.clone(),
chosen_reason: reason.clone(),
result_status: result
.as_ref()
.map(|r| r.status.clone())
.unwrap_or_else(|| "no-result".to_string()),
eval_state: format!("{:?}", eval.next_task_state),
wall_seconds,
user_override,
skills: task.skills.clone(),
verdict_pass: result.as_ref().and_then(|r| {
(!r.verdict.is_empty())
.then(|| (r.verdict.iter().filter(|v| v.pass).count(), r.verdict.len()))
}),
},
);
lines.push(format!("evaluation status: {}", eval.status));
lines.push(format!("next task state: {:?}", eval.next_task_state));
Ok(RunReport {
run_id,
task_id: task.id,
worker_id,
run_dir,
prepared: true,
executed: true,
lines,
result_state: Some(eval.next_task_state),
session: session_id,
chained,
})
}
#[allow(clippy::too_many_arguments)]
pub fn run_auto<F: FnMut(&str)>(
ws: &Workspace,
bypass: bool,
pause: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
parallel: Option<usize>,
accept_ambiguity: bool,
mut on_event: F,
) -> Result<Vec<String>> {
use std::collections::HashMap;
let max_parallel = parallel
.or_else(|| ws.load_config().ok().map(|c| c.max_parallel))
.unwrap_or(1)
.max(1);
let mut parallel_warned = false;
let mut out = Vec::new();
let mut emit = |s: String| {
on_event(&s);
out.push(s);
};
let mut attempts: HashMap<String, u32> = HashMap::new();
let mut waits: HashMap<String, u32> = HashMap::new();
let mut chain: Option<ChainHandle> = None;
let probe_opts = RunOptions {
execute: false,
worker_override: None,
target: None,
answer: None,
full_access: false,
accept_ambiguity: false,
chain: None,
};
if let Some(m) = crate::planner::recover_unconsumed_plan(ws) {
emit(m);
}
for m in recover_orphans(ws) {
emit(m);
}
if !accept_ambiguity {
let gate_on = ws.load_config().map(|c| c.ambiguity_gate).unwrap_or(true);
if let Ok(Some(i)) = ws.load_intent() {
if crate::planner::intent_gated(&i, gate_on) {
emit(format!(
"stopped: the plan is still guessing (ambiguity high, interview turn \
{}/{}) \u{2014} answer its questions (a) or run with --accept-ambiguity",
i.interview_turns,
crate::planner::INTERVIEW_CAP
));
for q in i.open_questions.iter().take(5) {
emit(format!(" ? {q}"));
}
return Ok(out);
}
}
}
loop {
if pause
.as_ref()
.map(|p| p.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(false)
{
emit("paused: stopped after the current task (run auto again to resume)".to_string());
break;
}
let queue = ws.load_queue()?;
if let Some(t) = queue.tasks.iter().find(|t| t.state == TaskState::Running) {
let task_id = t.id.clone();
for m in recover_orphans(ws) {
if !m.starts_with("adopted:") {
emit(m);
}
}
let still_running = ws
.load_queue()?
.tasks
.iter()
.any(|x| x.state == TaskState::Running);
if still_running {
let n = waits.entry(task_id.clone()).or_default();
*n += 1;
if *n == 1 {
emit(format!(
"waiting for {task_id}'s worker from a previous session\u{2026}"
));
}
if *n > 360 {
emit(format!(
"stopped: {task_id} has run for 30+ minutes \u{2014} kill its worker \
or keep waiting, then run auto again"
));
break;
}
std::thread::sleep(Duration::from_secs(5));
}
continue;
}
if let Some(t) = queue
.tasks
.iter()
.find(|t| matches!(t.state, TaskState::NeedsUser | TaskState::Blocked))
{
emit(format!(
"stopped: {} is {:?} \u{2014} answer (a) or resolve it, then run auto again",
t.id, t.state
));
break;
}
if let Some(t) = queue.tasks.iter().find(|t| t.state == TaskState::Partial) {
if partial_is_conflict(ws, &t.id) {
emit(format!(
"stopped: {} has a merge conflict \u{2014} resolve it (see handoff), then \
run auto again",
t.id
));
break;
}
}
let retry_target = queue
.tasks
.iter()
.find(|t| matches!(t.state, TaskState::Failed | TaskState::Partial))
.map(|t| t.id.clone());
if retry_target.is_none() && max_parallel > 1 {
let ready = crate::parallel::ready_independent(&queue, max_parallel);
if ready.len() >= 2 {
match crate::parallel::git_preflight(&ws.root) {
Ok(()) => {
let mut capped = false;
for &i in &ready {
let n = attempts.entry(queue.tasks[i].id.clone()).or_default();
*n += 1;
capped |= *n > 2;
}
if capped {
emit(
"stopped: a task did not complete after retries \u{2014} needs you"
.to_string(),
);
break;
}
chain = None; crate::parallel::run_batch(ws, &ready, bypass, |s| {
emit(s.to_string());
})?;
continue;
}
Err(why) => {
if !parallel_warned {
emit(format!("parallel off ({why}); running sequentially"));
parallel_warned = true;
}
}
}
}
}
let task_id = match &retry_target {
Some(id) => id.clone(),
None => match select_next(&queue, &probe_opts)? {
Some(idx) => queue.tasks[idx].id.clone(),
None => {
let waiting: Vec<&str> = queue
.tasks
.iter()
.filter(|t| t.state == TaskState::Queued)
.map(|t| t.id.as_str())
.collect();
if waiting.is_empty() {
emit("done: queue drained, all tasks complete".to_string());
} else {
emit(format!(
"stopped: {} waiting on approval or dependencies",
waiting.join(", ")
));
}
break;
}
},
};
let n = attempts.entry(task_id.clone()).or_default();
*n += 1;
if *n > 2 {
emit(format!(
"stopped: {task_id} did not complete after retries \u{2014} needs you"
));
break;
}
let offer = chain
.as_ref()
.filter(|c| {
retry_target.is_none()
&& c.length < CHAIN_CAP
&& queue
.tasks
.iter()
.find(|t| t.id == task_id)
.is_some_and(|t| t.depends_on.contains(&c.prev_task_id))
})
.cloned();
emit(format!("running {task_id}\u{2026}"));
let report = run_next(
ws,
&RunOptions {
execute: true,
worker_override: None,
target: retry_target.clone(),
answer: None,
full_access: bypass,
accept_ambiguity: false,
chain: offer.clone(),
},
)?;
let state = report.result_state.unwrap_or(TaskState::Failed);
emit(format!("{} \u{2192} {:?}", report.task_id, state));
chain = if state == TaskState::Done {
report.session.as_ref().map(|sess| ChainHandle {
prev_task_id: report.task_id.clone(),
worker_id: report.worker_id.clone(),
session: sess.clone(),
length: if report.chained {
offer.map(|o| o.length + 1).unwrap_or(1)
} else {
1
},
})
} else {
None };
match state {
TaskState::Done | TaskState::Queued => continue,
TaskState::Blocked => {
emit(format!(
"stopped: {} blocked \u{2014} see `yardlet handoff`",
report.task_id
));
break;
}
TaskState::NeedsUser => {
emit(format!(
"stopped: {} needs you \u{2014} `yardlet answer \"...\"`",
report.task_id
));
break;
}
TaskState::Partial => {
emit(format!(
"{} is partial \u{2014} continuing from its checkpoint",
report.task_id
));
continue;
}
TaskState::Failed => {
emit(format!("{} failed; retrying", report.task_id));
continue;
}
TaskState::Running => break,
}
}
Ok(out)
}
pub fn select_next(queue: &crate::schemas::WorkQueue, _opts: &RunOptions) -> Result<Option<usize>> {
let pol = &queue.selection_policy;
let mut best: Option<usize> = None;
for (i, t) in queue.tasks.iter().enumerate() {
if t.state != TaskState::Queued {
continue;
}
if pol.skip_if_approval_required && t.approval_required() {
continue;
}
if !queue.deps_met(t) {
continue;
}
match best {
None => best = Some(i),
Some(b) => {
if t.priority < queue.tasks[b].priority {
best = Some(i);
}
}
}
}
Ok(best)
}
pub(crate) fn latest_run_for(ws: &Workspace, task_id: &str) -> Option<(String, PathBuf)> {
let mut best: Option<(String, PathBuf)> = None;
for entry in std::fs::read_dir(ws.runs_dir()).ok()?.flatten() {
let dir = entry.path();
let Some(name) = dir.file_name().and_then(|n| n.to_str()).map(String::from) else {
continue;
};
if !name.starts_with("run-") {
continue;
}
let yaml = std::fs::read_to_string(dir.join("run.yaml")).unwrap_or_default();
let tid = yaml.lines().find_map(|l| {
l.trim()
.strip_prefix("task_id:")
.map(|v| v.trim().to_string())
});
if tid.as_deref() != Some(task_id) {
continue;
}
if best.as_ref().map(|(n, _)| name > *n).unwrap_or(true) {
best = Some((name, dir));
}
}
best
}
pub(crate) fn gen_session_uuid(seed: &str) -> String {
use std::hash::{Hash, Hasher};
let mut h1 = std::collections::hash_map::DefaultHasher::new();
seed.hash(&mut h1);
std::process::id().hash(&mut h1);
let a = h1.finish();
let mut h2 = std::collections::hash_map::DefaultHasher::new();
(a, seed).hash(&mut h2);
let b = h2.finish();
let hex = format!("{a:016x}{b:016x}");
format!(
"{}-{}-{}-{}-{}",
&hex[0..8],
&hex[8..12],
&hex[12..16],
&hex[16..20],
&hex[20..32]
)
}
fn find_codex_session(after: std::time::SystemTime) -> Option<String> {
fn walk(
dir: &std::path::Path,
after: std::time::SystemTime,
best: &mut Option<(std::time::SystemTime, String)>,
) {
let Ok(rd) = std::fs::read_dir(dir) else {
return;
};
for e in rd.flatten() {
let p = e.path();
if p.is_dir() {
walk(&p, after, best);
continue;
}
let Some(stem) = p
.file_name()
.and_then(|n| n.to_str())
.and_then(|n| n.strip_suffix(".jsonl"))
else {
continue;
};
if !stem.starts_with("rollout-") || stem.len() < 36 {
continue;
}
let Ok(mt) = e.metadata().and_then(|m| m.modified()) else {
continue;
};
if mt + std::time::Duration::from_secs(3) < after {
continue;
}
if best.as_ref().map(|(t, _)| mt > *t).unwrap_or(true) {
*best = Some((mt, stem[stem.len() - 36..].to_string()));
}
}
}
let home = std::env::var_os("HOME")?;
let base = std::path::Path::new(&home).join(".codex/sessions");
let mut best = None;
walk(&base, after, &mut best);
best.map(|(_, id)| id)
}
fn is_transient_failure(outcome: &workers::WorkerOutcome, run_dir: &std::path::Path) -> bool {
!outcome.exit_ok && !outcome.timed_out && !run_dir.join("result.json").exists()
}
fn validation_commands(task: &crate::schemas::Task) -> Vec<String> {
let Some(v) = &task.validation else {
return Vec::new();
};
let seq = v
.get("commands")
.and_then(|c| c.as_sequence())
.or_else(|| v.as_sequence());
seq.map(|s| {
s.iter()
.filter_map(|x| x.as_str().map(|t| t.to_string()))
.collect()
})
.unwrap_or_default()
}
fn validation_required(task: &crate::schemas::Task) -> bool {
task.validation
.as_ref()
.and_then(|v| v.get("required"))
.and_then(|r| r.as_bool())
.unwrap_or(false)
}
const VALIDATION_TIMEOUT: Duration = Duration::from_secs(300);
fn kill_validation_child(child: &mut std::process::Child) {
#[cfg(unix)]
{
let pgid = child.id();
let _ = std::process::Command::new("kill")
.arg("-9")
.arg(format!("-{pgid}"))
.status();
}
let _ = child.kill();
let _ = child.wait();
}
fn run_validation_commands(
cmds: &[String],
cwd: &std::path::Path,
run_dir: &std::path::Path,
billing: &crate::schemas::BillingPolicy,
) -> (bool, bool) {
use std::process::{Command, Stdio};
let env = guard::scrub_env(std::env::vars(), &billing.blocked_worker_env_names);
let mut results = Vec::new();
let mut all_passed = true;
for (i, c) in cmds.iter().enumerate() {
let log_rel = format!("validation-{i}.log");
let log = std::fs::File::create(run_dir.join(&log_rel)).ok();
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg(c)
.current_dir(cwd)
.env_clear()
.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())))
.stdin(Stdio::null());
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}
if let Some(f) = &log {
if let (Ok(o), Ok(e)) = (f.try_clone(), f.try_clone()) {
cmd.stdout(Stdio::from(o)).stderr(Stdio::from(e));
}
}
let started = Instant::now();
let (passed, code, timed_out) = match cmd.spawn() {
Ok(mut child) => loop {
match child.try_wait() {
Ok(Some(status)) => break (status.success(), status.code(), false),
Ok(None) => {
if started.elapsed() > VALIDATION_TIMEOUT {
kill_validation_child(&mut child);
break (false, None, true);
}
std::thread::sleep(Duration::from_millis(100));
}
Err(_) => break (false, None, false),
}
},
Err(_) => (false, None, false),
};
if !passed {
all_passed = false;
}
results.push(serde_json::json!({
"command": c,
"passed": passed,
"exit_code": code,
"timed_out": timed_out,
"log": log_rel,
}));
}
let report = serde_json::json!({
"ran": !cmds.is_empty(),
"all_passed": all_passed,
"note": "planner-authored commands, run by Yardlet with a billing-scrubbed env; \
not sandboxed like a worker",
"commands": results,
});
let _ = write_str(
&run_dir.join("validation.json"),
&serde_json::to_string_pretty(&report).unwrap_or_default(),
);
(!cmds.is_empty(), all_passed)
}
fn run_worktree(run_dir: &std::path::Path) -> Option<PathBuf> {
let yaml = std::fs::read_to_string(run_dir.join("run.yaml")).ok()?;
let v = yaml
.lines()
.find_map(|l| l.trim().strip_prefix("worktree:"))
.map(|v| v.trim().trim_matches('"').to_string())?;
(v != "." && !v.is_empty()).then(|| PathBuf::from(v))
}
pub(crate) fn live_worker_pid(run_dir: &std::path::Path) -> Option<u32> {
let pid: u32 = std::fs::read_to_string(run_dir.join("worker.pid"))
.ok()?
.trim()
.parse()
.ok()?;
std::process::Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.ok()?
.success()
.then_some(pid)
}
fn is_orphaned_unfinalized(run_dir: &std::path::Path) -> bool {
run_dir.join("worker.pid").exists()
&& live_worker_pid(run_dir).is_none()
&& run_dir.join("result.json").exists()
}
pub(crate) fn recover_orphans(ws: &Workspace) -> Vec<String> {
let mut msgs = Vec::new();
let Ok(mut q) = ws.load_queue() else {
return msgs;
};
let mut requeued = Vec::new();
let mut finished = Vec::new();
for t in q.tasks.iter_mut() {
let latest = latest_run_for(ws, &t.id);
let recover_this = match t.state {
TaskState::Running => true,
TaskState::Failed => latest
.as_ref()
.map(|(_, rd)| is_orphaned_unfinalized(rd))
.unwrap_or(false),
_ => false,
};
if !recover_this {
continue;
}
match latest {
Some((run_id, run_dir)) if run_dir.join("result.json").exists() => {
let evidence = run_worktree(&run_dir)
.filter(|w| w.exists())
.and_then(|w| evaluator::changed_paths(&w))
.or_else(|| evaluator::changed_paths(&ws.root));
let eval = evaluator::evaluate(&run_dir, &run_id, t, evidence.as_deref());
let _ = std::fs::remove_file(run_dir.join("worker.pid"));
t.state = eval.next_task_state;
if let Some(wt) = run_worktree(&run_dir).filter(|w| w.exists()) {
let branch = format!("yard/{}", t.id.to_lowercase());
if t.state == TaskState::Done {
match crate::parallel::integrate_worktree(&ws.root, &wt, &branch, &t.id) {
Ok(crate::parallel::Integration::Conflict(why)) => {
t.state = TaskState::Partial;
let _ =
write_str(&run_dir.join("partial-reason"), "merge_conflict");
msgs.push(format!(
"{}: merge conflict on recovery ({}); worktree kept at {}",
t.id,
why.trim(),
wt.display()
));
}
Ok(_) => crate::parallel::remove_worktree(&ws.root, &wt, &branch),
Err(e) => {
t.state = TaskState::Partial;
let _ =
write_str(&run_dir.join("partial-reason"), "merge_conflict");
msgs.push(format!("{}: recovery integration error: {e}", t.id));
}
}
}
}
finished.push(format!("{} \u{2192} {:?}", t.id, t.state));
}
run => {
if let Some((_, run_dir)) = &run {
if let Some(pid) = live_worker_pid(run_dir) {
msgs.push(format!(
"adopted: {} still running from a previous session (pid {pid})",
t.id
));
continue;
}
}
if let Some((_, run_dir)) = run {
if let Some(wt) = run_worktree(&run_dir).filter(|w| w.exists()) {
let branch = format!("yard/{}", t.id.to_lowercase());
crate::parallel::remove_worktree(&ws.root, &wt, &branch);
}
}
t.state = TaskState::Queued;
requeued.push(t.id.clone());
}
}
}
if !finished.is_empty() || !requeued.is_empty() {
let _ = ws.save_queue(&q);
if !finished.is_empty() {
msgs.push(format!(
"recovered completed run(s): {}",
finished.join(", ")
));
}
if !requeued.is_empty() {
msgs.push(format!(
"requeued interrupted task(s): {}",
requeued.join(", ")
));
}
}
msgs
}
pub(crate) fn find_worker<'a>(workers: &'a [WorkerProfile], id: &str) -> Result<&'a WorkerProfile> {
workers
.iter()
.find(|w| w.id == id)
.ok_or_else(|| anyhow!("worker '{id}' is not defined in .agents/workers.yaml"))
}
fn save_task_state_on_latest_queue(
ws: &Workspace,
fallback_queue: &mut WorkQueue,
task_id: &str,
state: TaskState,
) -> Result<()> {
let mut latest = ws.load_queue().unwrap_or_else(|_| fallback_queue.clone());
if let Some(t) = latest.tasks.iter_mut().find(|t| t.id == task_id) {
t.state = state;
ws.save_queue(&latest)?;
*fallback_queue = latest;
return Ok(());
}
if let Some(t) = fallback_queue.tasks.iter_mut().find(|t| t.id == task_id) {
t.state = state;
}
ws.save_queue(fallback_queue)
}
pub(crate) fn continuation_context(ws: &Workspace, task_id: &str) -> Option<String> {
let (_, run_dir) = latest_run_for(ws, task_id)?;
let mut s = String::new();
if let Ok(cp) = std::fs::read_to_string(run_dir.join("checkpoint.md")) {
s.push_str(cp.trim());
s.push_str("\n\n");
}
if let Ok(raw) = std::fs::read_to_string(run_dir.join("result.json")) {
if let Ok(r) = serde_json::from_str::<RunResult>(&raw) {
if !r.compact_summary.is_empty() {
s.push_str("Previous run summary: ");
s.push_str(&r.compact_summary);
s.push('\n');
}
if !r.validation.failures.is_empty() {
s.push_str("Unresolved failures:\n");
for f in &r.validation.failures {
s.push_str("- ");
s.push_str(f);
s.push('\n');
}
}
}
}
const CAP: usize = 6 * 1024;
if s.len() > CAP {
let mut end = CAP;
while !s.is_char_boundary(end) {
end -= 1;
}
s.truncate(end);
s.push_str("\n[truncated]");
}
let trimmed = s.trim();
(!trimmed.is_empty()).then(|| trimmed.to_string())
}
pub(crate) fn partial_is_conflict(ws: &Workspace, task_id: &str) -> bool {
latest_run_for(ws, task_id)
.map(|(_, dir)| dir.join("partial-reason").exists())
.unwrap_or(false)
}
pub fn latest_question_for(ws: &Workspace, task_id: &str) -> Option<String> {
let mut best: Option<(SystemTime, String)> = None;
for entry in std::fs::read_dir(ws.runs_dir()).ok()?.flatten() {
let result_path = entry.path().join("result.json");
let Ok(text) = std::fs::read_to_string(&result_path) else {
continue;
};
let Ok(result) = serde_json::from_str::<RunResult>(&text) else {
continue;
};
if result.task_id != task_id {
continue;
}
let Some(q) = result.question_for_user.filter(|q| !q.trim().is_empty()) else {
continue;
};
let mtime = entry
.metadata()
.and_then(|m| m.modified())
.unwrap_or(UNIX_EPOCH);
if best.as_ref().map(|(t, _)| mtime > *t).unwrap_or(true) {
best = Some((mtime, q));
}
}
best.map(|(_, q)| q)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schemas::{SelectionPolicy, Task, WorkQueue};
#[test]
fn validation_runner_blocks_on_failure() {
let dir = std::env::temp_dir().join(format!("yard-valrun-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let billing = crate::schemas::BillingPolicy::default();
let (ran, passed) = run_validation_commands(&["true".to_string()], &dir, &dir, &billing);
assert!(ran && passed);
let (ran, passed) = run_validation_commands(&["false".to_string()], &dir, &dir, &billing);
assert!(ran && !passed);
assert!(dir.join("validation.json").is_file());
let (ran, _) = run_validation_commands(&[], &dir, &dir, &billing);
assert!(!ran);
let _ = std::fs::remove_dir_all(&dir);
}
fn task(id: &str, state: TaskState, priority: i64, needs_approval: bool) -> Task {
Task {
id: id.into(),
title: id.into(),
state,
priority,
risk: String::new(),
kind: String::new(),
preferred_worker: String::new(),
model: String::new(),
effort: String::new(),
depends_on: vec![],
skills: vec![],
required_capabilities: vec![],
allowed_scope: vec![],
acceptance: vec![],
validation: None,
approval: if needs_approval {
Some(crate::yaml::from_str("required: true").unwrap())
} else {
None
},
interaction: None,
worker_rationale: None,
}
}
fn queue(tasks: Vec<Task>) -> WorkQueue {
WorkQueue {
schema_version: 1,
queue_id: "q".into(),
intent_id: String::new(),
selection_policy: SelectionPolicy::default(),
tasks,
}
}
fn opts() -> RunOptions {
RunOptions {
execute: false,
worker_override: None,
target: None,
answer: None,
full_access: false,
accept_ambiguity: false,
chain: None,
}
}
#[test]
fn picks_lowest_priority_queued() {
let q = queue(vec![
task("A", TaskState::Queued, 30, false),
task("B", TaskState::Queued, 10, false),
task("C", TaskState::Queued, 20, false),
]);
assert_eq!(select_next(&q, &opts()).unwrap(), Some(1)); }
#[test]
fn skips_non_queued_and_approval_required() {
let q = queue(vec![
task("done", TaskState::Done, 5, false),
task("gated", TaskState::Queued, 1, true), task("ready", TaskState::Queued, 40, false),
]);
assert_eq!(select_next(&q, &opts()).unwrap(), Some(2)); }
#[test]
fn none_when_no_eligible() {
let q = queue(vec![
task("a", TaskState::Done, 1, false),
task("b", TaskState::Blocked, 2, false),
]);
assert_eq!(select_next(&q, &opts()).unwrap(), None);
}
#[test]
fn recovery_merges_a_finished_orphaned_worktree_run() {
let root = std::env::temp_dir().join(format!("yard-orphan-wt-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
std::fs::create_dir_all(&root).unwrap();
let sh = |args: &[&str]| {
let out = std::process::Command::new("git")
.arg("-C")
.arg(&root)
.args(args)
.output()
.unwrap();
assert!(out.status.success(), "git {args:?}");
};
sh(&["init", "-q"]);
std::fs::write(root.join("base.txt"), "base\n").unwrap();
sh(&["add", "base.txt"]);
sh(&[
"-c",
"user.name=t",
"-c",
"user.email=t@t",
"commit",
"-q",
"-m",
"init",
]);
let ws = Workspace::at(&root);
let mut t = task("YARD-001", TaskState::Running, 10, false);
t.kind = "implementation".into();
ws.save_queue(&queue(vec![t.clone()])).unwrap();
let run_id = "run-20990101-000000-yard-001";
let run_dir = ws.runs_dir().join(run_id);
std::fs::create_dir_all(&run_dir).unwrap();
let wt = ws.agents_dir().join("worktrees").join("yard-001");
sh(&[
"worktree",
"add",
&wt.display().to_string(),
"-b",
"yard/yard-001",
]);
std::fs::write(wt.join("feature.txt"), "from worker\n").unwrap();
let result = crate::schemas::RunResult {
schema_version: 1,
run_id: run_id.into(),
task_id: "YARD-001".into(),
status: "done".into(),
intent_adherence: Default::default(),
changes: Default::default(),
validation: Default::default(),
question_for_user: None,
compact_summary: "ok".into(),
verdict: vec![],
harness_suggestions: vec![],
};
write_str(
&run_dir.join("result.json"),
&serde_json::to_string(&result).unwrap(),
)
.unwrap();
write_str(&run_dir.join("handoff.md"), "# Handoff\n").unwrap();
write_str(
&run_dir.join("run.yaml"),
&format!(
"run_id: {run_id}\ntask_id: YARD-001\nworktree: {}\n",
wt.display()
),
)
.unwrap();
let msgs = recover_orphans(&ws);
assert!(msgs.iter().any(|m| m.contains("recovered")), "{msgs:?}");
let q = ws.load_queue().unwrap();
assert_eq!(q.tasks[0].state, TaskState::Done);
assert_eq!(
std::fs::read_to_string(root.join("feature.txt")).unwrap(),
"from worker\n"
);
assert!(!wt.exists());
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn recovery_adopts_a_live_orphaned_worker() {
let root = std::env::temp_dir().join(format!("yard-adopt-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
let ws = Workspace::at(&root);
ws.save_queue(&queue(vec![task(
"YARD-001",
TaskState::Running,
10,
false,
)]))
.unwrap();
let run_dir = ws.runs_dir().join("run-20990101-000000-yard-001");
std::fs::create_dir_all(&run_dir).unwrap();
write_str(&run_dir.join("run.yaml"), "task_id: YARD-001\n").unwrap();
write_str(&run_dir.join("worker.pid"), &std::process::id().to_string()).unwrap();
let msgs = recover_orphans(&ws);
assert!(msgs.iter().any(|m| m.starts_with("adopted:")), "{msgs:?}");
let q = ws.load_queue().unwrap();
assert_eq!(q.tasks[0].state, TaskState::Running);
std::fs::remove_file(run_dir.join("worker.pid")).unwrap();
let msgs = recover_orphans(&ws);
assert!(msgs.iter().any(|m| m.contains("requeued")), "{msgs:?}");
assert_eq!(ws.load_queue().unwrap().tasks[0].state, TaskState::Queued);
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn recovery_salvages_a_failed_task_whose_orphan_run_actually_finished() {
let root = std::env::temp_dir().join(format!("yard-salvage-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
std::fs::create_dir_all(&root).unwrap();
let _ = std::process::Command::new("git")
.args(["init"])
.current_dir(&root)
.output();
let ws = Workspace::at(&root);
let mut t = task("YARD-001", TaskState::Failed, 10, false);
t.kind = "implementation".into();
ws.save_queue(&queue(vec![t])).unwrap();
let run_id = "run-20990101-000000-yard-001";
let run_dir = ws.runs_dir().join(run_id);
std::fs::create_dir_all(&run_dir).unwrap();
let result = crate::schemas::RunResult {
schema_version: 1,
run_id: run_id.into(),
task_id: "YARD-001".into(),
status: "done".into(),
intent_adherence: Default::default(),
changes: Default::default(),
validation: Default::default(),
question_for_user: None,
compact_summary: "ok".into(),
verdict: vec![],
harness_suggestions: vec![],
};
write_str(
&run_dir.join("result.json"),
&serde_json::to_string(&result).unwrap(),
)
.unwrap();
write_str(&run_dir.join("handoff.md"), "# Handoff\n").unwrap();
write_str(
&run_dir.join("run.yaml"),
&format!("run_id: {run_id}\ntask_id: YARD-001\n"),
)
.unwrap();
write_str(&run_dir.join("worker.pid"), "2147483647").unwrap();
let msgs = recover_orphans(&ws);
assert!(msgs.iter().any(|m| m.contains("recovered")), "{msgs:?}");
assert_eq!(ws.load_queue().unwrap().tasks[0].state, TaskState::Done);
assert!(!run_dir.join("worker.pid").exists());
let again = recover_orphans(&ws);
assert!(
!again.iter().any(|m| m.contains("recovered")),
"second pass should not re-recover: {again:?}"
);
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn recovery_leaves_a_genuinely_failed_task_alone() {
let root = std::env::temp_dir().join(format!("yard-realfail-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
let ws = Workspace::at(&root);
let mut t = task("YARD-001", TaskState::Failed, 10, false);
t.kind = "implementation".into();
ws.save_queue(&queue(vec![t])).unwrap();
let run_id = "run-20990101-000000-yard-001";
let run_dir = ws.runs_dir().join(run_id);
std::fs::create_dir_all(&run_dir).unwrap();
write_str(&run_dir.join("result.json"), "{\"status\":\"done\"}").unwrap();
write_str(
&run_dir.join("run.yaml"),
&format!("run_id: {run_id}\ntask_id: YARD-001\n"),
)
.unwrap();
let msgs = recover_orphans(&ws);
assert!(!msgs.iter().any(|m| m.contains("recovered")), "{msgs:?}");
assert_eq!(ws.load_queue().unwrap().tasks[0].state, TaskState::Failed);
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn final_state_update_preserves_tasks_added_during_run() {
let root =
std::env::temp_dir().join(format!("yard-preserve-queue-edits-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);
let ws = Workspace::at(&root);
let mut stale = queue(vec![task("YARD-010", TaskState::Running, 10, false)]);
ws.save_queue(&queue(vec![
task("YARD-010", TaskState::Done, 10, false),
task("YARD-011", TaskState::Queued, 20, false),
]))
.unwrap();
save_task_state_on_latest_queue(&ws, &mut stale, "YARD-010", TaskState::Partial).unwrap();
let q = ws.load_queue().unwrap();
assert_eq!(q.tasks.len(), 2);
assert_eq!(q.tasks[0].id, "YARD-010");
assert_eq!(q.tasks[0].state, TaskState::Partial);
assert_eq!(q.tasks[1].id, "YARD-011");
assert_eq!(q.tasks[1].state, TaskState::Queued);
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn skips_tasks_with_unmet_dependencies() {
let mut a = task("A", TaskState::Queued, 10, false);
let mut b = task("B", TaskState::Queued, 20, false);
b.depends_on = vec!["A".into()];
let q = queue(vec![a.clone(), b.clone()]);
assert_eq!(select_next(&q, &opts()).unwrap(), Some(0));
a.state = TaskState::Done;
let q = queue(vec![a, b.clone()]);
assert_eq!(select_next(&q, &opts()).unwrap(), Some(1));
b.depends_on = vec!["GHOST".into()];
let q = queue(vec![b]);
assert_eq!(select_next(&q, &opts()).unwrap(), Some(0));
}
}