pub mod sweep;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crate::agent::{Agent, AgentEvent, AgentRequest, Role, StopReason};
use crate::config::Config;
use crate::deferred::{self, DeferredDoc};
use crate::git::{self, CommitId, Git};
use crate::plan::{self, PhaseId, Plan, Snapshot};
use crate::prompts;
use crate::state::{self, RunState, TokenUsage};
use crate::tests as project_tests;
use crate::util::{paths, write_atomic};
const DEFAULT_AGENT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
pub const EVENT_CHANNEL_CAPACITY: usize = 256;
const AGENT_EVENT_CHANNEL_CAPACITY: usize = 64;
pub const STALE_ITEMS_PROMPT_CAP: usize = 10;
pub const STALE_ITEMS_DISPLAY_CAP: usize = 5;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HaltReason {
PlanTampered,
DeferredInvalid(String),
TestsFailed(String),
AgentFailure(String),
BudgetExceeded(String),
}
impl std::fmt::Display for HaltReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HaltReason::PlanTampered => f.write_str("plan.md was modified by the agent"),
HaltReason::DeferredInvalid(msg) => write!(f, "deferred.md is invalid: {msg}"),
HaltReason::TestsFailed(summary) => write!(f, "tests failed: {summary}"),
HaltReason::AgentFailure(msg) => write!(f, "agent failure: {msg}"),
HaltReason::BudgetExceeded(msg) => write!(f, "budget exceeded: {msg}"),
}
}
}
pub fn budget_totals(config: &Config, usage: &TokenUsage) -> (u64, f64) {
let total_tokens = usage.input.saturating_add(usage.output);
let mut total_usd = 0.0;
let role_models: [(&str, &str); 4] = [
("planner", config.models.planner.as_str()),
("implementer", config.models.implementer.as_str()),
("auditor", config.models.auditor.as_str()),
("fixer", config.models.fixer.as_str()),
];
for (role_key, model) in role_models {
let Some(role_usage) = usage.by_role.get(role_key) else {
continue;
};
let Some(price) = config.budgets.pricing.get(model) else {
continue;
};
total_usd += price.cost_usd(role_usage.input, role_usage.output);
}
(total_tokens, total_usd)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditContextKind {
Phase,
Sweep,
}
#[derive(Debug, Clone)]
pub struct AuditContext {
pub phase_id: PhaseId,
pub kind: AuditContextKind,
}
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(strum::EnumDiscriminants))]
#[cfg_attr(
test,
strum_discriminants(name(EventDiscriminants), derive(strum::EnumIter, Hash))
)]
pub enum Event {
PhaseStarted {
phase_id: PhaseId,
title: String,
attempt: u32,
},
FixerStarted {
phase_id: PhaseId,
fixer_attempt: u32,
attempt: u32,
},
AuditorStarted {
context: AuditContext,
attempt: u32,
},
AuditorSkippedNoChanges {
context: AuditContext,
},
AgentStdout(String),
AgentStderr(String),
AgentToolUse(String),
TestStarted,
TestFinished {
passed: bool,
summary: String,
},
TestsSkipped,
PhaseCommitted {
phase_id: PhaseId,
commit: Option<CommitId>,
},
PhaseHalted {
phase_id: PhaseId,
reason: HaltReason,
},
RunFinished,
UsageUpdated(crate::state::TokenUsage),
SweepStarted {
after: PhaseId,
items_pending: usize,
attempt: u32,
},
SweepCompleted {
after: PhaseId,
resolved: usize,
commit: Option<CommitId>,
},
SweepHalted {
after: PhaseId,
reason: HaltReason,
},
DeferredItemStale {
text: String,
attempts: u32,
},
}
#[derive(Debug, Clone)]
pub enum PhaseResult {
Advanced {
phase_id: PhaseId,
next_phase: Option<PhaseId>,
commit: Option<CommitId>,
},
Halted {
phase_id: PhaseId,
reason: HaltReason,
},
}
#[derive(Debug, Clone)]
pub enum RunSummary {
Finished,
Halted {
phase_id: PhaseId,
reason: HaltReason,
},
}
pub struct Runner<A: Agent, G: Git> {
workspace: PathBuf,
config: Config,
plan: Plan,
deferred: DeferredDoc,
state: RunState,
agent: A,
git: G,
events_tx: broadcast::Sender<Event>,
skip_tests: bool,
sweep_override: SweepOverride,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SweepOverride {
None,
Skip,
Force,
}
impl<A: Agent, G: Git> Runner<A, G> {
#[allow(clippy::too_many_arguments)]
pub fn new(
workspace: impl Into<PathBuf>,
config: Config,
plan: Plan,
deferred: DeferredDoc,
state: RunState,
agent: A,
git: G,
) -> Self {
let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
workspace: workspace.into(),
config,
plan,
deferred,
state,
agent,
git,
events_tx,
skip_tests: false,
sweep_override: SweepOverride::None,
}
}
pub fn skip_tests(mut self, skip: bool) -> Self {
self.skip_tests = skip;
self
}
pub fn skip_sweep(mut self, skip: bool) -> Self {
if skip {
self.sweep_override = SweepOverride::Skip;
self.state.pending_sweep = false;
}
self
}
pub fn force_sweep(mut self, force: bool) -> Self {
if force {
self.sweep_override = SweepOverride::Force;
self.state.pending_sweep = true;
}
self
}
pub fn workspace(&self) -> &Path {
&self.workspace
}
pub fn plan(&self) -> &Plan {
&self.plan
}
pub fn deferred(&self) -> &DeferredDoc {
&self.deferred
}
pub fn state(&self) -> &RunState {
&self.state
}
#[doc(hidden)]
pub fn state_mut(&mut self) -> &mut RunState {
&mut self.state
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn agent(&self) -> &A {
&self.agent
}
pub fn git_handle(&self) -> &G {
&self.git
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.events_tx.subscribe()
}
pub fn stale_items(&self) -> Vec<prompts::StaleItem> {
let escalate = self.config.sweep.escalate_after.max(1);
let mut items: Vec<prompts::StaleItem> = self
.state
.deferred_item_attempts
.iter()
.filter(|(_, &n)| n >= escalate)
.map(|(text, &attempts)| prompts::StaleItem {
text: text.clone(),
attempts,
})
.collect();
items.sort_by(|a, b| b.attempts.cmp(&a.attempts).then(a.text.cmp(&b.text)));
items.truncate(STALE_ITEMS_PROMPT_CAP);
items
}
pub async fn run(&mut self) -> Result<RunSummary> {
if self.is_post_final_phase_state() {
return self.finish_or_run_final_sweep_loop().await;
}
loop {
let result = self.run_phase().await?;
match result {
PhaseResult::Halted { phase_id, reason } => {
let _ = self.events_tx.send(Event::PhaseHalted {
phase_id: phase_id.clone(),
reason: reason.clone(),
});
return Ok(RunSummary::Halted { phase_id, reason });
}
PhaseResult::Advanced {
next_phase: None, ..
} => {
return self.finish_or_run_final_sweep_loop().await;
}
PhaseResult::Advanced { .. } => {}
}
}
}
fn is_post_final_phase_state(&self) -> bool {
if self.state.post_final_phase {
return true;
}
let Some(last_completed) = self.state.completed.last() else {
return false;
};
last_completed == &self.plan.current_phase
&& self.next_phase_id_after(last_completed).is_none()
}
async fn finish_or_run_final_sweep_loop(&mut self) -> Result<RunSummary> {
let after = self
.state
.completed
.last()
.cloned()
.unwrap_or_else(|| self.plan.current_phase.clone());
if self.should_run_final_sweep_loop() {
return self.run_final_sweep_loop(after).await;
}
if self.state.pending_sweep {
self.state.pending_sweep = false;
state::save(&self.workspace, Some(&self.state))
.context("runner: clearing pending_sweep at run finish")?;
}
let _ = self.events_tx.send(Event::RunFinished);
Ok(RunSummary::Finished)
}
fn should_run_final_sweep_loop(&self) -> bool {
if matches!(self.sweep_override, SweepOverride::Skip) {
return false;
}
if !self.config.sweep.enabled {
return false;
}
if !self.config.sweep.final_sweep_enabled {
return false;
}
sweep::unchecked_count(&self.deferred) > 0
}
async fn run_final_sweep_loop(&mut self, after: PhaseId) -> Result<RunSummary> {
self.state.consecutive_sweeps = 0;
let max_iters = self.config.sweep.final_sweep_max_iterations.max(1);
for _iter in 1..=max_iters {
let pre_unchecked = sweep::unchecked_count(&self.deferred);
if pre_unchecked == 0 {
break;
}
self.state.pending_sweep = true;
state::save(&self.workspace, Some(&self.state))
.context("runner: persisting pending_sweep for final-sweep iter")?;
let result = self.run_sweep_step(after.clone()).await?;
match result {
PhaseResult::Halted { reason, .. } => {
state::save(&self.workspace, Some(&self.state))
.context("runner: persisting state at final-sweep halt")?;
let _ = self.events_tx.send(Event::PhaseHalted {
phase_id: after.clone(),
reason: reason.clone(),
});
return Ok(RunSummary::Halted {
phase_id: after,
reason,
});
}
PhaseResult::Advanced { .. } => {
let post_unchecked = sweep::unchecked_count(&self.deferred);
let resolved = pre_unchecked.saturating_sub(post_unchecked);
if resolved == 0 {
break;
}
}
}
}
if self.state.pending_sweep {
self.state.pending_sweep = false;
state::save(&self.workspace, Some(&self.state))
.context("runner: clearing pending_sweep after final-sweep loop")?;
}
let _ = self.events_tx.send(Event::RunFinished);
Ok(RunSummary::Finished)
}
pub async fn run_phase(&mut self) -> Result<PhaseResult> {
let result = self.run_phase_inner().await;
if let Err(e) = state::save(&self.workspace, Some(&self.state)) {
tracing::error!("runner: failed to persist state.json: {e:#}");
}
result
}
async fn run_phase_inner(&mut self) -> Result<PhaseResult> {
if self.state.pending_sweep {
let deferred_path = paths::deferred_path(&self.workspace);
let on_disk = match std::fs::read_to_string(&deferred_path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(e) => {
return Err(anyhow::Error::new(e)
.context(format!("runner: reading {:?}", &deferred_path)))
}
};
let parsed = deferred::parse(&on_disk).unwrap_or_else(|_| self.deferred.clone());
let allow = match self.sweep_override {
SweepOverride::Skip => false,
SweepOverride::Force => true,
SweepOverride::None => sweep::should_run_deferred_sweep(
&parsed,
&self.config.sweep,
self.state.consecutive_sweeps,
),
};
if allow {
if let Some(prompt_after) = self.state.completed.last().cloned() {
self.deferred = parsed;
let result = self.run_sweep_step(prompt_after).await?;
if matches!(result, PhaseResult::Advanced { .. })
&& matches!(self.sweep_override, SweepOverride::Force)
{
self.sweep_override = SweepOverride::None;
}
return Ok(result);
}
tracing::info!("skipping forced sweep: no completed phases yet to anchor on");
self.state.pending_sweep = false;
state::save(&self.workspace, Some(&self.state))
.context("runner: persisting state.json after fresh-run force-sweep no-op")?;
self.deferred = parsed;
} else {
self.state.pending_sweep = false;
state::save(&self.workspace, Some(&self.state))
.context("runner: persisting state.json after sweep gate cleared")?;
self.deferred = parsed;
}
}
let phase = self
.plan
.phase(&self.plan.current_phase)
.cloned()
.ok_or_else(|| {
anyhow!(
"plan.current_phase {:?} is not present in plan.phases",
self.plan.current_phase.as_str()
)
})?;
let phase_id = phase.id.clone();
if let Some(reason) = self.check_budget() {
return Ok(PhaseResult::Halted { phase_id, reason });
}
let attempt = self.bump_attempts(&phase_id);
let _ = self.events_tx.send(Event::PhaseStarted {
phase_id: phase_id.clone(),
title: phase.title.clone(),
attempt,
});
let plan_path = paths::plan_path(&self.workspace);
let deferred_path = paths::deferred_path(&self.workspace);
let exclude: [&Path; 1] = [Path::new(".pitboss")];
let spec = DispatchSpec {
request: self.implementer_request(&phase, attempt),
phase_id: phase_id.clone(),
phase: Some(&phase),
plan_path: &plan_path,
deferred_path: &deferred_path,
exclude_paths: &exclude,
audit: self
.config
.audit
.enabled
.then_some(AuditKind::Phase { phase: &phase }),
};
let has_changes = match self.run_dispatch_pipeline(spec).await? {
PipelineOutcome::Halted(reason) => return Ok(PhaseResult::Halted { phase_id, reason }),
PipelineOutcome::Staged { has_changes } => has_changes,
};
let commit = if has_changes {
let id = self
.git
.commit(&git::commit_message(&phase_id, &phase.title))
.await
.context("runner: committing phase")?;
Some(id)
} else {
warn!(phase = %phase_id, "phase produced no code changes; skipping commit");
None
};
self.deferred.sweep();
write_atomic(
&deferred_path,
deferred::serialize(&self.deferred).as_bytes(),
)
.context("runner: writing deferred.md after sweep")?;
self.state.completed.push(phase_id.clone());
self.state.consecutive_sweeps = 0;
let next_phase = self.next_phase_id_after(&phase_id);
if next_phase.is_none() {
self.state.post_final_phase = true;
}
if let Some(ref next) = next_phase {
self.plan.set_current_phase(next.clone());
write_atomic(&plan_path, plan::serialize(&self.plan).as_bytes())
.context("runner: writing plan.md with advanced current_phase")?;
if !matches!(self.sweep_override, SweepOverride::Skip)
&& (matches!(self.sweep_override, SweepOverride::Force)
|| sweep::should_run_deferred_sweep(
&self.deferred,
&self.config.sweep,
self.state.consecutive_sweeps,
))
{
self.state.pending_sweep = true;
}
}
state::save(&self.workspace, Some(&self.state)).context("runner: persisting state.json")?;
let _ = self.events_tx.send(Event::PhaseCommitted {
phase_id: phase_id.clone(),
commit: commit.clone(),
});
Ok(PhaseResult::Advanced {
phase_id,
next_phase,
commit,
})
}
fn implementer_request(&self, phase: &crate::plan::Phase, attempt: u32) -> AgentRequest {
AgentRequest {
role: Role::Implementer,
model: self.config.models.implementer.clone(),
system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
user_prompt: prompts::implementer(&self.plan, &self.deferred, phase),
workdir: self.workspace.clone(),
log_path: self.attempt_log_path(&phase.id, "implementer", attempt),
timeout: DEFAULT_AGENT_TIMEOUT,
env: std::collections::HashMap::new(),
}
}
async fn run_dispatch_pipeline(&mut self, spec: DispatchSpec<'_>) -> Result<PipelineOutcome> {
let DispatchSpec {
request,
phase_id,
phase,
plan_path,
deferred_path,
exclude_paths,
audit,
} = spec;
let test_runner = match self
.run_dispatch_through_fixer(request, phase, plan_path, deferred_path, &phase_id)
.await?
{
DispatchOutcome::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
DispatchOutcome::Continue { test_runner } => test_runner,
};
self.run_audit_and_stage(
audit,
test_runner.as_ref(),
plan_path,
deferred_path,
exclude_paths,
&phase_id,
)
.await
}
async fn run_dispatch_through_fixer(
&mut self,
request: AgentRequest,
phase: Option<&crate::plan::Phase>,
plan_path: &Path,
deferred_path: &Path,
phase_id: &PhaseId,
) -> Result<DispatchOutcome> {
let role = request.role;
match self
.dispatch_and_validate(request, role, plan_path, deferred_path)
.await?
{
ValidationResult::Continue => {}
ValidationResult::Halt(reason) => return Ok(DispatchOutcome::Halted(reason)),
}
let test_runner = if self.skip_tests {
debug!("dry-run: skipping test detection and execution");
None
} else {
project_tests::detect(&self.workspace, self.config.tests.command.as_deref())
};
if let Some(runner) = &test_runner {
let attempt = self.state.attempts.get(phase_id).copied().unwrap_or(0);
let outcome = self.run_tests(runner, phase_id, "tests", attempt).await?;
if !outcome.passed {
match self
.run_fixer_loop(
phase_id,
phase,
runner,
plan_path,
deferred_path,
outcome.summary,
)
.await?
{
FixerLoopResult::Passed => {}
FixerLoopResult::Halted(reason) => {
return Ok(DispatchOutcome::Halted(reason));
}
}
}
} else {
if !self.skip_tests {
debug!("no test runner detected and no override configured; skipping tests");
}
let _ = self.events_tx.send(Event::TestsSkipped);
}
Ok(DispatchOutcome::Continue { test_runner })
}
async fn run_audit_and_stage(
&mut self,
audit: Option<AuditKind<'_>>,
test_runner: Option<&project_tests::TestRunner>,
plan_path: &Path,
deferred_path: &Path,
exclude_paths: &[&Path],
phase_id: &PhaseId,
) -> Result<PipelineOutcome> {
if let Some(audit) = audit {
match self
.run_auditor_pass(
audit,
test_runner,
plan_path,
deferred_path,
exclude_paths,
phase_id,
)
.await?
{
AuditPassResult::Continue => {}
AuditPassResult::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
}
}
self.git
.stage_changes(exclude_paths)
.await
.context("runner: staging code-only changes")?;
let has_changes = self
.git
.has_staged_changes()
.await
.context("runner: checking for staged changes")?;
Ok(PipelineOutcome::Staged { has_changes })
}
fn check_budget(&self) -> Option<HaltReason> {
let (tokens, usd) = budget_totals(&self.config, &self.state.token_usage);
if let Some(cap) = self.config.budgets.max_total_tokens {
if tokens >= cap {
return Some(HaltReason::BudgetExceeded(format!(
"token budget reached: {tokens} >= cap {cap}"
)));
}
}
if let Some(cap) = self.config.budgets.max_total_usd {
if usd >= cap {
return Some(HaltReason::BudgetExceeded(format!(
"USD budget reached: ${usd:.4} >= cap ${cap:.4}"
)));
}
}
None
}
fn next_phase_id_after(&self, current: &PhaseId) -> Option<PhaseId> {
self.plan
.phases
.iter()
.find(|p| p.id > *current)
.map(|p| p.id.clone())
}
fn bump_attempts(&mut self, phase_id: &PhaseId) -> u32 {
let entry = self.state.attempts.entry(phase_id.clone()).or_insert(0);
*entry += 1;
*entry
}
fn attempt_log_path(&self, phase_id: &PhaseId, role: &str, attempt: u32) -> PathBuf {
paths::play_logs_dir(&self.workspace)
.join(format!("phase-{}-{}-{}.log", phase_id, role, attempt))
}
fn sweep_log_path(&self, after: &PhaseId, role: &str, attempt: u32) -> PathBuf {
paths::play_logs_dir(&self.workspace)
.join(format!("sweep-after-{}-{}-{}.log", after, role, attempt))
}
pub async fn run_standalone_sweep(
&mut self,
after: Option<PhaseId>,
max_items: Option<usize>,
persist_state: bool,
) -> Result<PhaseResult> {
let accounting = after
.clone()
.unwrap_or_else(|| self.plan.current_phase.clone());
self.run_sweep_step_inner(accounting, after, max_items, persist_state)
.await
}
async fn run_sweep_step(&mut self, after: PhaseId) -> Result<PhaseResult> {
self.run_sweep_step_inner(after.clone(), Some(after), None, true)
.await
}
async fn run_sweep_step_inner(
&mut self,
accounting: PhaseId,
prompt_after: Option<PhaseId>,
max_items: Option<usize>,
persist_state: bool,
) -> Result<PhaseResult> {
let pre_unchecked = sweep::unchecked_count(&self.deferred);
let pre_texts: HashSet<String> = self
.deferred
.items
.iter()
.filter(|i| !i.done)
.map(|i| i.text.clone())
.collect();
let pre_phases = phases_block_canonical(&self.deferred);
let plan_path = paths::plan_path(&self.workspace);
let deferred_path = paths::deferred_path(&self.workspace);
let pre_deferred_bytes = match std::fs::read(&deferred_path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Vec::new(),
Err(e) => {
return Err(anyhow::Error::new(e)
.context(format!("runner: reading {:?} before sweep", &deferred_path)))
}
};
if let Some(reason) = self.check_budget() {
return Ok(PhaseResult::Halted {
phase_id: accounting,
reason,
});
}
let attempt = self.bump_attempts(&accounting);
let _ = self.events_tx.send(Event::SweepStarted {
after: accounting.clone(),
items_pending: pre_unchecked,
attempt,
});
let stale = self.stale_items();
let prompt_doc = match max_items {
Some(n) => clamp_pending_items(&self.deferred, n),
None => self.deferred.clone(),
};
let request = AgentRequest {
role: Role::Implementer,
model: self.config.models.implementer.clone(),
system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
user_prompt: prompts::sweep(&self.plan, &prompt_doc, prompt_after.as_ref(), &stale),
workdir: self.workspace.clone(),
log_path: self.sweep_log_path(&accounting, "implementer", attempt),
timeout: DEFAULT_AGENT_TIMEOUT,
env: std::collections::HashMap::new(),
};
let exclude: [&Path; 1] = [Path::new(".pitboss")];
let halt_with_staleness =
|this: &mut Self, reason: HaltReason, pre_texts: &HashSet<String>| {
this.apply_sweep_staleness(pre_texts);
let _ = this.events_tx.send(Event::SweepHalted {
after: accounting.clone(),
reason: reason.clone(),
});
PhaseResult::Halted {
phase_id: accounting.clone(),
reason,
}
};
let test_runner = match self
.run_dispatch_through_fixer(request, None, &plan_path, &deferred_path, &accounting)
.await?
{
DispatchOutcome::Halted(reason) => {
return Ok(halt_with_staleness(self, reason, &pre_texts));
}
DispatchOutcome::Continue { test_runner } => test_runner,
};
let audit = self.config.sweep.audit_enabled.then(|| {
let resolved: Vec<String> = self
.deferred
.items
.iter()
.filter(|i| i.done && pre_texts.contains(&i.text))
.map(|i| i.text.clone())
.collect();
let remaining: Vec<String> = self
.deferred
.items
.iter()
.filter(|i| !i.done)
.map(|i| i.text.clone())
.collect();
AuditKind::Sweep {
after: accounting.clone(),
resolved,
remaining,
}
});
let has_changes = match self
.run_audit_and_stage(
audit,
test_runner.as_ref(),
&plan_path,
&deferred_path,
&exclude,
&accounting,
)
.await?
{
PipelineOutcome::Halted(reason) => {
return Ok(halt_with_staleness(self, reason, &pre_texts));
}
PipelineOutcome::Staged { has_changes } => has_changes,
};
let post_phases = phases_block_canonical(&self.deferred);
if post_phases != pre_phases {
warn!(
after = %accounting,
"sweep modified ## Deferred phases; restoring deferred.md"
);
self.restore_deferred(&deferred_path, &pre_deferred_bytes, true)?;
let restored = std::fs::read_to_string(&deferred_path).unwrap_or_default();
if let Ok(parsed) = deferred::parse(&restored) {
self.deferred = parsed;
}
self.apply_sweep_staleness(&pre_texts);
let reason = HaltReason::DeferredInvalid("sweep modified Deferred phases".into());
let _ = self.events_tx.send(Event::SweepHalted {
after: accounting.clone(),
reason: reason.clone(),
});
return Ok(PhaseResult::Halted {
phase_id: accounting,
reason,
});
}
let resolved = pre_unchecked.saturating_sub(sweep::unchecked_count(&self.deferred));
let commit = if has_changes {
let id = self
.git
.commit(&git::commit_message_sweep(&accounting, resolved))
.await
.context("runner: committing sweep")?;
Some(id)
} else {
warn!(after = %accounting, "sweep produced no code changes; skipping commit");
None
};
self.apply_sweep_staleness(&pre_texts);
self.deferred.sweep();
write_atomic(
&deferred_path,
deferred::serialize(&self.deferred).as_bytes(),
)
.context("runner: writing deferred.md after sweep step")?;
self.state.pending_sweep = false;
self.state.consecutive_sweeps = self.state.consecutive_sweeps.saturating_add(1);
if persist_state {
state::save(&self.workspace, Some(&self.state))
.context("runner: persisting state.json after sweep")?;
}
let _ = self.events_tx.send(Event::SweepCompleted {
after: accounting.clone(),
resolved,
commit: commit.clone(),
});
Ok(PhaseResult::Advanced {
phase_id: accounting,
next_phase: Some(self.plan.current_phase.clone()),
commit,
})
}
async fn dispatch_and_validate(
&mut self,
request: AgentRequest,
role: Role,
plan_path: &Path,
deferred_path: &Path,
) -> Result<ValidationResult> {
let plan_pre =
std::fs::read(plan_path).with_context(|| format!("runner: reading {:?}", plan_path))?;
let plan_hash = Snapshot::of_bytes(&plan_pre);
let (deferred_pre, deferred_existed) = match std::fs::read(deferred_path) {
Ok(b) => (b, true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (Vec::new(), false),
Err(e) => {
return Err(
anyhow::Error::new(e).context(format!("runner: reading {:?}", deferred_path))
)
}
};
let dispatch = self.dispatch_agent(request).await?;
self.fold_token_usage(role, &dispatch);
match &dispatch.stop_reason {
StopReason::Completed => {}
StopReason::Timeout => {
return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
"agent {:?} timed out after {:?}",
self.agent.name(),
DEFAULT_AGENT_TIMEOUT
))));
}
StopReason::Cancelled => {
return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
"agent {:?} was cancelled",
self.agent.name()
))));
}
StopReason::Error(msg) => {
return Ok(ValidationResult::Halt(HaltReason::AgentFailure(
msg.clone(),
)));
}
}
let plan_post = std::fs::read(plan_path)
.with_context(|| format!("runner: reading {:?} after agent", plan_path))?;
if Snapshot::of_bytes(&plan_post) != plan_hash {
warn!(role = %role, "agent modified plan.md; restoring from snapshot");
write_atomic(plan_path, &plan_pre).with_context(|| {
format!(
"runner: restoring {:?} from snapshot after tamper",
plan_path
)
})?;
return Ok(ValidationResult::Halt(HaltReason::PlanTampered));
}
let deferred_text = match std::fs::read_to_string(deferred_path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(e) => {
return Err(anyhow::Error::new(e)
.context(format!("runner: reading {:?} after agent", deferred_path)))
}
};
match deferred::parse(&deferred_text) {
Ok(parsed) => {
self.deferred = parsed;
}
Err(e) => {
let msg = format!("{e}");
warn!(role = %role, error = %msg, "deferred.md is invalid; restoring");
self.restore_deferred(deferred_path, &deferred_pre, deferred_existed)?;
return Ok(ValidationResult::Halt(HaltReason::DeferredInvalid(msg)));
}
}
Ok(ValidationResult::Continue)
}
async fn run_tests(
&self,
runner: &project_tests::TestRunner,
phase_id: &PhaseId,
log_role: &str,
attempt: u32,
) -> Result<project_tests::TestOutcome> {
let _ = self.events_tx.send(Event::TestStarted);
let test_log = self.attempt_log_path(phase_id, log_role, attempt);
let outcome = runner
.run(test_log)
.await
.context("runner: running project tests")?;
let _ = self.events_tx.send(Event::TestFinished {
passed: outcome.passed,
summary: outcome.summary.clone(),
});
Ok(outcome)
}
async fn run_fixer_loop(
&mut self,
phase_id: &PhaseId,
phase: Option<&crate::plan::Phase>,
test_runner: &project_tests::TestRunner,
plan_path: &Path,
deferred_path: &Path,
initial_summary: String,
) -> Result<FixerLoopResult> {
let budget = self.config.retries.fixer_max_attempts;
if budget == 0 {
return Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
initial_summary,
)));
}
let mut last_summary = initial_summary;
for fixer_attempt in 1..=budget {
if let Some(reason) = self.check_budget() {
return Ok(FixerLoopResult::Halted(reason));
}
let total_attempt = self.bump_attempts(phase_id);
let _ = self.events_tx.send(Event::FixerStarted {
phase_id: phase_id.clone(),
fixer_attempt,
attempt: total_attempt,
});
let user_prompt = match phase {
Some(p) => {
prompts::fixer_with_deferred(&self.plan, p, &last_summary, &self.deferred)
}
None => prompts::fixer_for_sweep(&self.plan, &self.deferred, &last_summary),
};
let log_path = self.attempt_log_path(phase_id, "fix", fixer_attempt);
let request = AgentRequest {
role: Role::Fixer,
model: self.config.models.fixer.clone(),
system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
user_prompt,
workdir: self.workspace.clone(),
log_path,
timeout: DEFAULT_AGENT_TIMEOUT,
env: std::collections::HashMap::new(),
};
match self
.dispatch_and_validate(request, Role::Fixer, plan_path, deferred_path)
.await?
{
ValidationResult::Continue => {}
ValidationResult::Halt(reason) => return Ok(FixerLoopResult::Halted(reason)),
}
let outcome = self
.run_tests(test_runner, phase_id, "tests", total_attempt)
.await?;
if outcome.passed {
return Ok(FixerLoopResult::Passed);
}
last_summary = outcome.summary;
}
Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
last_summary,
)))
}
async fn run_auditor_pass(
&mut self,
audit: AuditKind<'_>,
test_runner: Option<&project_tests::TestRunner>,
plan_path: &Path,
deferred_path: &Path,
exclude: &[&Path],
phase_id: &PhaseId,
) -> Result<AuditPassResult> {
self.git
.stage_changes(exclude)
.await
.context("runner: staging for audit diff")?;
let diff = self
.git
.staged_diff()
.await
.context("runner: capturing staged diff for auditor")?;
let kind = match &audit {
AuditKind::Phase { .. } => AuditContextKind::Phase,
AuditKind::Sweep { .. } => AuditContextKind::Sweep,
};
let context = AuditContext {
phase_id: phase_id.clone(),
kind,
};
if diff.trim().is_empty() {
let _ = self.events_tx.send(Event::AuditorSkippedNoChanges {
context: context.clone(),
});
return Ok(AuditPassResult::Continue);
}
if let Some(reason) = self.check_budget() {
return Ok(AuditPassResult::Halted(reason));
}
let total_attempt = self.bump_attempts(phase_id);
let _ = self.events_tx.send(Event::AuditorStarted {
context: context.clone(),
attempt: total_attempt,
});
let (user_prompt, log_path) = match audit {
AuditKind::Phase { phase } => (
prompts::auditor_with_deferred(
&self.plan,
phase,
&diff,
&self.deferred,
self.config.audit.small_fix_line_limit,
),
self.attempt_log_path(phase_id, "audit", 1),
),
AuditKind::Sweep {
after,
resolved,
remaining,
} => {
let stale = self.stale_items();
(
prompts::sweep_auditor(prompts::SweepAuditorPrompt {
plan: &self.plan,
deferred: &self.deferred,
after: &after,
diff: &diff,
resolved: &resolved,
remaining: &remaining,
stale_items: &stale,
small_fix_line_limit: self.config.audit.small_fix_line_limit,
}),
self.sweep_log_path(&after, "audit", 1),
)
}
};
let request = AgentRequest {
role: Role::Auditor,
model: self.config.models.auditor.clone(),
system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
user_prompt,
workdir: self.workspace.clone(),
log_path,
timeout: DEFAULT_AGENT_TIMEOUT,
env: std::collections::HashMap::new(),
};
match self
.dispatch_and_validate(request, Role::Auditor, plan_path, deferred_path)
.await?
{
ValidationResult::Continue => {}
ValidationResult::Halt(reason) => return Ok(AuditPassResult::Halted(reason)),
}
if let Some(test_runner) = test_runner {
let outcome = self
.run_tests(test_runner, phase_id, "tests", total_attempt)
.await?;
if !outcome.passed {
return Ok(AuditPassResult::Halted(HaltReason::TestsFailed(
outcome.summary,
)));
}
}
Ok(AuditPassResult::Continue)
}
fn apply_sweep_staleness(&mut self, pre_texts: &HashSet<String>) {
let post_unchecked_texts: HashSet<String> = self
.deferred
.items
.iter()
.filter(|i| !i.done)
.map(|i| i.text.clone())
.collect();
let crossed = sweep::update_sweep_staleness(
&mut self.state.deferred_item_attempts,
pre_texts,
&post_unchecked_texts,
self.config.sweep.escalate_after,
);
for (text, attempts) in crossed {
let _ = self
.events_tx
.send(Event::DeferredItemStale { text, attempts });
}
}
fn fold_token_usage(&mut self, role: Role, dispatch: &AgentDispatch) {
let tokens = &dispatch.outcome_tokens;
self.state.token_usage.input += tokens.input;
self.state.token_usage.output += tokens.output;
let entry = self
.state
.token_usage
.by_role
.entry(role.as_str().to_string())
.or_default();
entry.input += tokens.input;
entry.output += tokens.output;
for (k, v) in &tokens.by_role {
let e = self.state.token_usage.by_role.entry(k.clone()).or_default();
e.input += v.input;
e.output += v.output;
}
let _ = self
.events_tx
.send(Event::UsageUpdated(self.state.token_usage.clone()));
}
fn restore_deferred(
&self,
deferred_path: &Path,
pre_bytes: &[u8],
existed: bool,
) -> Result<()> {
if existed {
write_atomic(deferred_path, pre_bytes).with_context(|| {
format!(
"runner: restoring {:?} from snapshot after parse failure",
deferred_path
)
})?;
} else if deferred_path.exists() {
std::fs::remove_file(deferred_path).with_context(|| {
format!(
"runner: removing agent-created {:?} after parse failure",
deferred_path
)
})?;
}
Ok(())
}
async fn dispatch_agent(&self, request: AgentRequest) -> Result<AgentDispatch> {
let role = request.role;
let (mpsc_tx, mpsc_rx) = mpsc::channel(AGENT_EVENT_CHANNEL_CAPACITY);
let cancel = CancellationToken::new();
let events_tx = self.events_tx.clone();
let forward = tokio::spawn(forward_agent_events(mpsc_rx, events_tx));
let outcome = self
.agent
.run(request, mpsc_tx, cancel)
.await
.with_context(|| format!("runner: agent {:?} failed to run", self.agent.name()))?;
let _ = forward.await;
Ok(AgentDispatch {
stop_reason: outcome.stop_reason,
outcome_tokens: outcome.tokens,
_role: role,
})
}
}
fn clamp_pending_items(doc: &DeferredDoc, max: usize) -> DeferredDoc {
let mut out = DeferredDoc {
items: Vec::with_capacity(doc.items.len().min(max + 1)),
phases: doc.phases.clone(),
};
let mut pending_kept = 0usize;
for item in &doc.items {
if !item.done {
if pending_kept >= max {
continue;
}
pending_kept += 1;
}
out.items.push(item.clone());
}
out
}
fn phases_block_canonical(doc: &DeferredDoc) -> String {
let phases_only = DeferredDoc {
items: Vec::new(),
phases: doc.phases.clone(),
};
deferred::serialize(&phases_only)
}
struct AgentDispatch {
stop_reason: StopReason,
outcome_tokens: crate::state::TokenUsage,
_role: Role,
}
enum ValidationResult {
Continue,
Halt(HaltReason),
}
enum FixerLoopResult {
Passed,
Halted(HaltReason),
}
enum AuditPassResult {
Continue,
Halted(HaltReason),
}
struct DispatchSpec<'a> {
request: AgentRequest,
phase_id: PhaseId,
phase: Option<&'a crate::plan::Phase>,
plan_path: &'a Path,
deferred_path: &'a Path,
exclude_paths: &'a [&'a Path],
audit: Option<AuditKind<'a>>,
}
enum AuditKind<'a> {
Phase {
phase: &'a crate::plan::Phase,
},
Sweep {
after: PhaseId,
resolved: Vec<String>,
remaining: Vec<String>,
},
}
enum DispatchOutcome {
Halted(HaltReason),
Continue {
test_runner: Option<project_tests::TestRunner>,
},
}
enum PipelineOutcome {
Halted(HaltReason),
Staged {
has_changes: bool,
},
}
async fn forward_agent_events(mut rx: mpsc::Receiver<AgentEvent>, tx: broadcast::Sender<Event>) {
while let Some(ev) = rx.recv().await {
match ev {
AgentEvent::Stdout(line) => {
let _ = tx.send(Event::AgentStdout(line));
}
AgentEvent::Stderr(line) => {
let _ = tx.send(Event::AgentStderr(line));
}
AgentEvent::ToolUse(name) => {
let _ = tx.send(Event::AgentToolUse(name));
}
AgentEvent::TokenDelta(_) => {
}
}
}
}
pub fn fresh_run_state(plan: &Plan, config: &Config, now: chrono::DateTime<Utc>) -> RunState {
let run_id = now.format("%Y%m%dT%H%M%SZ").to_string();
let branch = git::branch_name(&config.git.branch_prefix, now);
let mut s = RunState::new(run_id, branch, plan.current_phase.clone());
s.started_at = now;
s
}
pub async fn log_events(mut rx: broadcast::Receiver<Event>) {
use broadcast::error::RecvError;
loop {
match rx.recv().await {
Ok(event) => {
let terminal = matches!(event, Event::RunFinished | Event::PhaseHalted { .. });
log_event_line(&event);
if terminal {
return;
}
}
Err(RecvError::Closed) => return,
Err(RecvError::Lagged(n)) => {
eprintln!("[pitboss] (logger lagged: dropped {n} events)");
}
}
}
}
fn log_event_line(event: &Event) {
use crate::style::{self, col};
let c = style::use_color_stderr();
let fm = col(c, style::BOLD_CYAN, "[pitboss]");
match event {
Event::PhaseStarted {
phase_id,
title,
attempt,
} => {
let rule = col(c, style::DARK_GRAY, &"─".repeat(60));
if c {
eprintln!("{rule}");
}
eprintln!(
"{} {}",
col(c, style::BOLD_CYAN, "[pitboss]"),
col(
c,
style::BOLD_WHITE,
&format!("phase {phase_id} ({title}), attempt {attempt}")
)
);
if c {
eprintln!("{rule}");
}
}
Event::FixerStarted {
phase_id,
fixer_attempt,
attempt,
} => {
eprintln!(
"{fm} {}",
col(
c,
style::YELLOW,
&format!(
"phase {phase_id} fixer attempt {fixer_attempt} (total dispatch {attempt})"
)
)
);
}
Event::AuditorStarted { context, attempt } => {
let label = match context.kind {
AuditContextKind::Phase => format!(
"phase {} auditor (total dispatch {attempt})",
context.phase_id
),
AuditContextKind::Sweep => format!(
"sweep after phase {} auditor (total dispatch {attempt})",
context.phase_id
),
};
eprintln!("{fm} {}", col(c, style::BLUE, &label));
}
Event::AuditorSkippedNoChanges { context } => {
let label = match context.kind {
AuditContextKind::Phase => format!(
"phase {} auditor skipped: no code changes to audit",
context.phase_id
),
AuditContextKind::Sweep => format!(
"sweep after phase {} auditor skipped: no code changes to audit",
context.phase_id
),
};
eprintln!("{fm} {}", col(c, style::DIM, &label));
}
Event::AgentStdout(line) => {
eprintln!("{} {line}", col(c, style::DIM, "[agent]"));
}
Event::AgentStderr(line) => {
eprintln!(
"{} {}",
col(c, style::RED, "[agent:err]"),
col(c, style::RED, line)
);
}
Event::AgentToolUse(name) => {
eprintln!(
"{}",
col(c, style::DARK_GRAY, &format!("[agent:tool] {name}"))
);
}
Event::TestStarted => {
eprintln!("{fm} {}", col(c, style::MAGENTA, "running tests"));
}
Event::TestFinished { passed, summary } => {
if *passed {
eprintln!(
"{fm} {}",
col(c, style::BOLD_GREEN, &format!("tests passed: {summary}"))
);
} else {
eprintln!(
"{fm} {}",
col(c, style::BOLD_RED, &format!("tests failed: {summary}"))
);
}
}
Event::TestsSkipped => {
eprintln!(
"{fm} {}",
col(c, style::DIM, "no test runner detected; skipping")
);
}
Event::PhaseCommitted {
phase_id,
commit: Some(hash),
} => {
eprintln!(
"{fm} {}",
col(
c,
style::GREEN,
&format!("phase {phase_id} committed: {hash}")
)
);
}
Event::PhaseCommitted {
phase_id,
commit: None,
} => {
eprintln!(
"{fm} {}",
col(
c,
style::DIM,
&format!("phase {phase_id} produced no code changes; no commit")
)
);
}
Event::PhaseHalted { phase_id, reason } => {
eprintln!(
"{} {}",
col(c, style::BOLD_RED, "[pitboss]"),
col(
c,
style::BOLD_RED,
&format!("phase {phase_id} halted: {reason}")
)
);
}
Event::RunFinished => {
let rule = col(c, style::BOLD_GREEN, &"─".repeat(60));
if c {
eprintln!("{rule}");
}
eprintln!("{}", col(c, style::BOLD_GREEN, "[pitboss] run finished"));
if c {
eprintln!("{rule}");
}
}
Event::UsageUpdated(_) => {
}
Event::SweepStarted {
after,
items_pending,
attempt,
} => {
eprintln!(
"{fm} {}",
col(
c,
style::BOLD_CYAN,
&format!(
"sweep after phase {after} ({items_pending} pending, total dispatch {attempt})"
)
)
);
}
Event::SweepCompleted {
after,
resolved,
commit: Some(hash),
} => {
eprintln!(
"{fm} {}",
col(
c,
style::GREEN,
&format!("sweep after phase {after} committed: {resolved} resolved ({hash})")
)
);
}
Event::SweepCompleted {
after,
resolved,
commit: None,
} => {
eprintln!(
"{fm} {}",
col(
c,
style::DIM,
&format!(
"sweep after phase {after}: {resolved} resolved; no code changes to commit"
)
)
);
}
Event::SweepHalted { after, reason } => {
eprintln!(
"{} {}",
col(c, style::BOLD_RED, "[pitboss]"),
col(
c,
style::BOLD_RED,
&format!("sweep after phase {after} halted: {reason}")
)
);
}
Event::DeferredItemStale { text, attempts } => {
eprintln!(
"{fm} {}",
col(
c,
style::BOLD_YELLOW,
&format!(
"deferred item stale ({attempts} sweep attempts; needs human attention): {text}"
)
)
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn pid(s: &str) -> PhaseId {
PhaseId::parse(s).unwrap()
}
#[test]
fn fresh_run_state_uses_branch_prefix_and_timestamp() {
let plan = Plan::new(
pid("01"),
vec![crate::plan::Phase {
id: pid("01"),
title: "First".into(),
body: String::new(),
}],
);
let cfg = Config::default();
let now = chrono::DateTime::parse_from_rfc3339("2026-04-29T14:30:22Z")
.unwrap()
.with_timezone(&Utc);
let state = fresh_run_state(&plan, &cfg, now);
assert_eq!(state.run_id, "20260429T143022Z");
assert_eq!(state.branch, "pitboss/play/20260429T143022Z");
assert_eq!(state.started_phase, pid("01"));
assert_eq!(state.started_at, now);
assert!(state.completed.is_empty());
}
#[test]
fn halt_reason_display_summaries_are_human_readable() {
assert_eq!(
HaltReason::PlanTampered.to_string(),
"plan.md was modified by the agent"
);
assert!(HaltReason::DeferredInvalid("bad".into())
.to_string()
.contains("deferred.md"));
assert!(HaltReason::TestsFailed("nope".into())
.to_string()
.contains("tests failed"));
assert!(HaltReason::AgentFailure("boom".into())
.to_string()
.contains("boom"));
}
}