pitboss/runner/mod.rs
1//! Orchestration loop and event channel.
2//!
3//! The runner owns the per-phase state machine. It snapshots the planning
4//! artifacts, dispatches the implementer agent, validates the agent's output,
5//! runs the project tests, and lands a per-phase commit. Every observable
6//! transition is broadcast on a [`tokio::sync::broadcast`] channel so the CLI
7//! logger and the (later) TUI can subscribe without changing the runner.
8//!
9//! Phase 12 wired the implementer-only flow: agent → validate → tests → commit.
10//! Phase 13 layers a bounded fixer loop on top: when the project tests fail,
11//! the runner dispatches the fixer agent up to
12//! [`crate::config::RetryBudgets::fixer_max_attempts`] times, re-running tests
13//! after each attempt, before halting. Phase 14 inserts the auditor pass
14//! between the (passing) test run and the per-phase commit: when
15//! [`crate::config::AuditConfig::enabled`] is on the runner stages the
16//! implementer's diff, hands it to the auditor agent, re-validates the
17//! planning artifacts, and re-runs the tests before letting the commit land.
18
19pub mod sweep;
20
21use std::collections::HashSet;
22use std::path::{Path, PathBuf};
23use std::time::Duration;
24
25use anyhow::{anyhow, Context, Result};
26use chrono::Utc;
27use tokio::sync::{broadcast, mpsc};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, warn};
30
31use crate::agent::{Agent, AgentEvent, AgentRequest, Role, StopReason};
32use crate::config::Config;
33use crate::deferred::{self, DeferredDoc};
34use crate::git::{self, CommitId, Git};
35use crate::plan::{self, PhaseId, Plan, Snapshot};
36use crate::prompts;
37use crate::state::{self, RunState, TokenUsage};
38use crate::tests as project_tests;
39use crate::util::{paths, write_atomic};
40
41/// Default agent wall-clock cap. Conservative so a stuck agent does not strand
42/// a run for an unbounded time; phase 18 makes this configurable.
43const DEFAULT_AGENT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
44
45/// Capacity of the broadcast channel events fan out on. Enough that a slow
46/// subscriber falls behind by hundreds of events before lagging; sends are
47/// best-effort so a slow subscriber never blocks the runner.
48pub const EVENT_CHANNEL_CAPACITY: usize = 256;
49
50/// Per-dispatch capacity of the mpsc channel between the agent and the
51/// runner's forwarder task. Bounded to apply backpressure on a misbehaving
52/// agent that floods events.
53const AGENT_EVENT_CHANNEL_CAPACITY: usize = 64;
54
55/// Cap on the number of stale `## Deferred items` entries fed into the sweep
56/// and sweep-auditor prompts. Past this, the prompt would balloon without
57/// adding value: the agent can only meaningfully address a handful of
58/// "high-stakes" items per dispatch, and a runaway map (hundreds of stuck
59/// items) almost always means the staleness signal needs operator attention,
60/// not more agent passes.
61pub const STALE_ITEMS_PROMPT_CAP: usize = 10;
62
63/// Cap on the number of stale items each operator-facing surface (the TUI
64/// stale panel, `pitboss status`) renders before truncating with a
65/// `+N more` footer. Lower than [`STALE_ITEMS_PROMPT_CAP`] because vertical
66/// real estate on the operator's screen is tighter than the auditor's
67/// prompt budget.
68pub const STALE_ITEMS_DISPLAY_CAP: usize = 5;
69
70/// Why the runner stopped advancing the plan.
71///
72/// Each variant carries enough context for the CLI logger (and the eventual
73/// TUI) to render a useful single-line message without needing to re-read
74/// log files.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub enum HaltReason {
77 /// The agent modified `plan.md`. The runner restored from the pre-agent
78 /// snapshot before halting.
79 PlanTampered,
80 /// The agent left `deferred.md` in an unparsable state. The runner
81 /// restored from the pre-agent snapshot before halting. The string is
82 /// the parser's diagnostic.
83 DeferredInvalid(String),
84 /// The project's test suite failed. Holds the short summary captured by
85 /// [`crate::tests::TestRunner::run`].
86 TestsFailed(String),
87 /// The agent exited via timeout, cancellation, or an internal error.
88 AgentFailure(String),
89 /// A configured budget was exhausted before the next agent dispatch
90 /// could fire. Carries a human-readable summary (`token budget exceeded:
91 /// 105000 >= cap 100000`, `USD budget exceeded: $5.0234 >= cap $5.0000`).
92 BudgetExceeded(String),
93}
94
95impl std::fmt::Display for HaltReason {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 HaltReason::PlanTampered => f.write_str("plan.md was modified by the agent"),
99 HaltReason::DeferredInvalid(msg) => write!(f, "deferred.md is invalid: {msg}"),
100 HaltReason::TestsFailed(summary) => write!(f, "tests failed: {summary}"),
101 HaltReason::AgentFailure(msg) => write!(f, "agent failure: {msg}"),
102 HaltReason::BudgetExceeded(msg) => write!(f, "budget exceeded: {msg}"),
103 }
104 }
105}
106
107/// Compute `(total_tokens, total_usd)` for a [`TokenUsage`] under the supplied
108/// [`Config`].
109///
110/// `total_tokens` is the simple `input + output` sum from the top-level
111/// counter — that's the figure compared against
112/// [`crate::config::Budgets::max_total_tokens`].
113///
114/// `total_usd` walks each role in [`crate::config::ModelRoles`], looks up the
115/// per-role tokens in `usage.by_role`, and prices them against
116/// [`crate::config::Budgets::pricing`] keyed by the role's configured model
117/// id. Roles whose model is missing from the pricing table contribute zero —
118/// the token-count budget still applies. Agent-supplied `by_role` keys that
119/// don't match a configured role are ignored here, since we have no model
120/// assignment to price them under.
121pub fn budget_totals(config: &Config, usage: &TokenUsage) -> (u64, f64) {
122 let total_tokens = usage.input.saturating_add(usage.output);
123 let mut total_usd = 0.0;
124 let role_models: [(&str, &str); 4] = [
125 ("planner", config.models.planner.as_str()),
126 ("implementer", config.models.implementer.as_str()),
127 ("auditor", config.models.auditor.as_str()),
128 ("fixer", config.models.fixer.as_str()),
129 ];
130 for (role_key, model) in role_models {
131 let Some(role_usage) = usage.by_role.get(role_key) else {
132 continue;
133 };
134 let Some(price) = config.budgets.pricing.get(model) else {
135 continue;
136 };
137 total_usd += price.cost_usd(role_usage.input, role_usage.output);
138 }
139 (total_tokens, total_usd)
140}
141
142/// Discriminator on [`AuditContext`]. Tells subscribers whether the audit
143/// firing belongs to a regular plan phase or a deferred sweep so the TUI and
144/// loggers can render the right header text without inspecting state.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum AuditContextKind {
147 /// Audit pass for a regular plan-phase implementer dispatch.
148 Phase,
149 /// Audit pass for a deferred-sweep dispatch.
150 Sweep,
151}
152
153/// Payload threaded through the auditor events. Carries the phase id under
154/// which the audit's attempts are accounted plus the kind discriminator. Sweep
155/// audits set `phase_id` to the most recently completed real phase (the same
156/// id [`Event::SweepStarted`] uses) so the running attempts counter still keys
157/// on a real phase id.
158#[derive(Debug, Clone)]
159pub struct AuditContext {
160 /// Phase id under which the audit's attempts are tallied.
161 pub phase_id: PhaseId,
162 /// Whether this is a phase audit or a sweep audit.
163 pub kind: AuditContextKind,
164}
165
166/// Streaming events the runner broadcasts to subscribers. Sends are
167/// best-effort: a lagging or absent subscriber never blocks the runner.
168#[derive(Debug, Clone)]
169#[cfg_attr(test, derive(strum::EnumDiscriminants))]
170#[cfg_attr(
171 test,
172 strum_discriminants(name(EventDiscriminants), derive(strum::EnumIter, Hash))
173)]
174pub enum Event {
175 /// A phase began. Emitted once per implementer dispatch — fixer
176 /// re-dispatches inside the same phase emit [`Event::FixerStarted`] instead
177 /// so subscribers can distinguish them.
178 PhaseStarted {
179 /// Phase being entered.
180 phase_id: PhaseId,
181 /// Phase title from the heading.
182 title: String,
183 /// 1-based total agent dispatch counter at this phase, mirrored into
184 /// [`crate::state::RunState::attempts`]. Stays at `1` for the
185 /// implementer dispatch that fires this event.
186 attempt: u32,
187 },
188 /// The runner dispatched the fixer agent after a test failure.
189 FixerStarted {
190 /// Phase the fixer is operating on.
191 phase_id: PhaseId,
192 /// 1-based fixer attempt within this phase
193 /// (`1..=fixer_max_attempts`).
194 fixer_attempt: u32,
195 /// Total agent-dispatch counter at this phase (mirrors
196 /// [`crate::state::RunState::attempts`]).
197 attempt: u32,
198 },
199 /// The runner dispatched the auditor agent after the test suite passed.
200 /// Fires at most once per phase or sweep, and only when the relevant
201 /// audit toggle ([`crate::config::AuditConfig::enabled`] for phases,
202 /// [`crate::config::SweepConfig::audit_enabled`] for sweeps) is on and
203 /// the dispatch produced staged code changes. `context.kind` discriminates
204 /// the two so the TUI can render the right header text.
205 AuditorStarted {
206 /// Audit-pass context: phase id + whether this is a phase or sweep
207 /// audit. Phase id is exposed via [`AuditContext::phase_id`] so plain
208 /// log consumers don't need to match on the kind.
209 context: AuditContext,
210 /// Total agent-dispatch counter at this phase (mirrors
211 /// [`crate::state::RunState::attempts`]).
212 attempt: u32,
213 },
214 /// The auditor dispatched without finding code changes worth auditing
215 /// (the index was empty after staging excluded paths).
216 AuditorSkippedNoChanges {
217 /// Audit-pass context: phase id + whether this is a phase or sweep
218 /// audit.
219 context: AuditContext,
220 },
221 /// One line of agent stdout.
222 AgentStdout(String),
223 /// One line of agent stderr.
224 AgentStderr(String),
225 /// Agent invoked a tool. Carries the tool name.
226 AgentToolUse(String),
227 /// The runner began running the project test suite.
228 TestStarted,
229 /// The test suite finished with the carried summary.
230 TestFinished {
231 /// Whether the run exited zero.
232 passed: bool,
233 /// Short summary suitable for inline display.
234 summary: String,
235 },
236 /// The runner skipped tests because no runner was detected and no
237 /// `[tests] command = "..."` override was configured.
238 TestsSkipped,
239 /// A phase's code changes were committed (or skipped because the only
240 /// changes were to excluded planning artifacts).
241 PhaseCommitted {
242 /// Phase that completed.
243 phase_id: PhaseId,
244 /// Resulting commit, or `None` when only excluded paths changed.
245 commit: Option<CommitId>,
246 },
247 /// The runner stopped without advancing.
248 PhaseHalted {
249 /// Phase that halted.
250 phase_id: PhaseId,
251 /// Why the runner halted.
252 reason: HaltReason,
253 },
254 /// The runner advanced past the final phase. No further phases remain.
255 RunFinished,
256 /// Aggregated token usage was updated after an agent dispatch finished.
257 /// Carries a snapshot of [`crate::state::RunState::token_usage`] so
258 /// subscribers can show running cost / token totals without needing a
259 /// reference to the runner state.
260 UsageUpdated(crate::state::TokenUsage),
261 /// A deferred-sweep dispatch began. Fires once per sweep step, between
262 /// the [`Event::PhaseCommitted`] of the preceding phase and the
263 /// [`Event::PhaseStarted`] of the next regular phase. The sweep does not
264 /// introduce a synthetic phase id — `after` is the most recently completed
265 /// phase under which the sweep's attempts are accounted.
266 SweepStarted {
267 /// Phase the sweep is firing after.
268 after: PhaseId,
269 /// Unchecked-item count observed before the sweep dispatched.
270 items_pending: usize,
271 /// Total agent-dispatch counter under `after` after the sweep's
272 /// implementer dispatch is recorded.
273 attempt: u32,
274 },
275 /// A deferred-sweep dispatch finished and its commit (or empty diff) has
276 /// landed. `commit` is `None` when the sweep produced no code changes.
277 SweepCompleted {
278 /// Phase the sweep fired after.
279 after: PhaseId,
280 /// Number of `## Deferred items` entries the sweep flipped from
281 /// `- [ ]` to `- [x]` (and which the post-sweep `DeferredDoc::sweep`
282 /// then dropped). New items the agent appended don't pollute this
283 /// count.
284 resolved: usize,
285 /// Resulting commit, or `None` for the no-code-changes case.
286 commit: Option<CommitId>,
287 },
288 /// A deferred-sweep dispatch halted before it could land a commit. Mirrors
289 /// [`Event::PhaseHalted`] for the sweep entry point so subscribers can
290 /// distinguish "phase failed" from "sweep failed."
291 SweepHalted {
292 /// Phase the sweep fired after.
293 after: PhaseId,
294 /// Why the sweep halted.
295 reason: HaltReason,
296 },
297 /// A `## Deferred items` entry's per-sweep attempt counter just crossed
298 /// [`crate::config::SweepConfig::escalate_after`]. Transition-only — the
299 /// next sweep that increments the same item's counter further does *not*
300 /// re-emit, so subscribers (the activity log, [`pitboss status`], the TUI)
301 /// can treat each occurrence as a fresh "needs human attention" signal.
302 DeferredItemStale {
303 /// The item text from `deferred.md` (the bytes after `- [ ]`).
304 text: String,
305 /// The new attempt count, equal to or greater than `escalate_after`.
306 attempts: u32,
307 },
308}
309
310/// Outcome of [`Runner::run_phase`].
311#[derive(Debug, Clone)]
312pub enum PhaseResult {
313 /// Phase completed and the runner advanced. `commit` is `None` when the
314 /// agent only modified excluded paths.
315 Advanced {
316 /// Phase that just completed.
317 phase_id: PhaseId,
318 /// Phase the runner advanced to, or `None` if no phases remain.
319 next_phase: Option<PhaseId>,
320 /// Resulting commit, or `None` for the excluded-only case.
321 commit: Option<CommitId>,
322 },
323 /// Runner halted; no phase advance.
324 Halted {
325 /// Phase that was active when the halt fired.
326 phase_id: PhaseId,
327 /// Why the halt fired.
328 reason: HaltReason,
329 },
330}
331
332/// Outcome of [`Runner::run`].
333#[derive(Debug, Clone)]
334pub enum RunSummary {
335 /// All phases completed.
336 Finished,
337 /// The run halted at the carried phase for the carried reason.
338 Halted {
339 /// Phase that halted.
340 phase_id: PhaseId,
341 /// Why the halt fired.
342 reason: HaltReason,
343 },
344}
345
346/// Per-phase orchestrator.
347///
348/// One `Runner` drives a single workspace through its plan. Construct with
349/// [`Runner::new`], subscribe one or more receivers via [`Runner::subscribe`],
350/// then call [`Runner::run`] (or [`Runner::run_phase`] for tests).
351pub struct Runner<A: Agent, G: Git> {
352 workspace: PathBuf,
353 config: Config,
354 plan: Plan,
355 deferred: DeferredDoc,
356 state: RunState,
357 agent: A,
358 git: G,
359 events_tx: broadcast::Sender<Event>,
360 /// When `true`, [`Runner::run_phase`] skips test detection and execution.
361 /// Set by `pitboss play --dry-run`, which dispatches the no-op
362 /// [`crate::agent::dry_run::DryRunAgent`]: since the agent never modifies
363 /// the working tree, running tests would only re-confirm whatever the
364 /// pre-run state was and risk halting the dry-run on a flaky suite.
365 skip_tests: bool,
366 /// Operator-supplied override for the deferred-sweep gate. Driven by
367 /// `pitboss play --no-sweep` and `--sweep`; defaults to
368 /// [`SweepOverride::None`] which lets the configured trigger run.
369 sweep_override: SweepOverride,
370}
371
372/// Operator-supplied override for the deferred-sweep trigger.
373///
374/// Set via [`Runner::skip_sweep`] / [`Runner::force_sweep`] (mirrored on the
375/// CLI as `pitboss play --no-sweep` / `--sweep`). The two flags are mutually
376/// exclusive at the clap level, so the runner only ever sees one of these
377/// applied per invocation.
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379enum SweepOverride {
380 /// Honor the configured sweep trigger (default).
381 None,
382 /// Suppress sweeps for the duration of the run. Clears `pending_sweep`
383 /// at the top of the run and refuses to arm it from any subsequent phase
384 /// commit. The override is not persisted to `pitboss.toml`.
385 Skip,
386 /// Force a sweep before the next phase even if the trigger threshold
387 /// isn't met. Sets `pending_sweep = true` at the top of the run and
388 /// bypasses the trigger re-evaluation in the gate so the sweep dispatches
389 /// once. After it lands `pending_sweep` clears normally and the
390 /// post-phase trigger reverts to the configured behavior.
391 Force,
392}
393
394impl<A: Agent, G: Git> Runner<A, G> {
395 /// Build a new runner. The caller has already loaded `config`, `plan`,
396 /// `deferred`, and `state` from the workspace and is responsible for
397 /// having checked out the per-run branch (via [`crate::git::Git`]) before
398 /// calling [`Runner::run`].
399 #[allow(clippy::too_many_arguments)]
400 pub fn new(
401 workspace: impl Into<PathBuf>,
402 config: Config,
403 plan: Plan,
404 deferred: DeferredDoc,
405 state: RunState,
406 agent: A,
407 git: G,
408 ) -> Self {
409 let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
410 Self {
411 workspace: workspace.into(),
412 config,
413 plan,
414 deferred,
415 state,
416 agent,
417 git,
418 events_tx,
419 skip_tests: false,
420 sweep_override: SweepOverride::None,
421 }
422 }
423
424 /// Skip the per-phase test invocation entirely. Used by
425 /// `pitboss play --dry-run` so a no-op agent does not get halted by a
426 /// pre-existing red test suite. The runner emits [`Event::TestsSkipped`]
427 /// in place of [`Event::TestStarted`] / [`Event::TestFinished`] so
428 /// subscribers (logger, TUI) still get a clear signal that tests were
429 /// considered.
430 pub fn skip_tests(mut self, skip: bool) -> Self {
431 self.skip_tests = skip;
432 self
433 }
434
435 /// Suppress deferred sweeps for the duration of this runner. Mirrors the
436 /// CLI flag `pitboss play --no-sweep`: clears `state.pending_sweep`
437 /// immediately so an inherited obligation from a prior run is dropped,
438 /// and refuses to arm the gate from any subsequent phase commit. The
439 /// override is in-memory only — it does not write to `pitboss.toml`.
440 pub fn skip_sweep(mut self, skip: bool) -> Self {
441 if skip {
442 self.sweep_override = SweepOverride::Skip;
443 self.state.pending_sweep = false;
444 }
445 self
446 }
447
448 /// Force a sweep before the next phase, regardless of the configured
449 /// trigger threshold. Mirrors `pitboss play --sweep`: sets
450 /// `state.pending_sweep = true` so the gate fires on the next
451 /// [`Runner::run_phase`], and bypasses the gate's trigger re-evaluation
452 /// so a backlog below `trigger_min_items` still sweeps once. Mutually
453 /// exclusive with [`Runner::skip_sweep`] (enforced at the CLI layer).
454 pub fn force_sweep(mut self, force: bool) -> Self {
455 if force {
456 self.sweep_override = SweepOverride::Force;
457 self.state.pending_sweep = true;
458 }
459 self
460 }
461
462 /// Workspace this runner operates on.
463 pub fn workspace(&self) -> &Path {
464 &self.workspace
465 }
466
467 /// Borrow the loaded plan. Useful for tests asserting state advance.
468 pub fn plan(&self) -> &Plan {
469 &self.plan
470 }
471
472 /// Borrow the loaded deferred doc.
473 pub fn deferred(&self) -> &DeferredDoc {
474 &self.deferred
475 }
476
477 /// Borrow the run state.
478 pub fn state(&self) -> &RunState {
479 &self.state
480 }
481
482 /// Mutable handle to the cached run state.
483 ///
484 /// Test-only escape hatch (kept `pub` so integration tests can reach it;
485 /// `#[cfg(test)]` would only expose it to in-crate unit tests). A few
486 /// sweep tests need to seed a multi-step scenario by mutating the run
487 /// state between runner operations — re-arming `pending_sweep` to force
488 /// an extra sweep, or hand-populating the `deferred_item_attempts` map.
489 /// Without this accessor they had to round-trip through `state::save`
490 /// + drop runner + `Runner::new`, replumbing plan / deferred / agent /
491 /// git by hand.
492 #[doc(hidden)]
493 pub fn state_mut(&mut self) -> &mut RunState {
494 &mut self.state
495 }
496
497 /// Borrow the loaded config. Used by the TUI to populate the agent /
498 /// per-role model header chip.
499 pub fn config(&self) -> &Config {
500 &self.config
501 }
502
503 /// Borrow the dispatching agent. Used by the TUI to read
504 /// [`Agent::name`] for the header chip.
505 pub fn agent(&self) -> &A {
506 &self.agent
507 }
508
509 /// Borrow the git handle the runner is using. CLI code that needs to call
510 /// post-run git operations (e.g., `gh pr create` after a successful run)
511 /// reaches in through here so the same shell-vs-mock implementation the
512 /// runner used during the run is reused.
513 pub fn git_handle(&self) -> &G {
514 &self.git
515 }
516
517 /// Subscribe to the runner's event stream. Returns a fresh receiver each
518 /// call; existing subscribers are unaffected.
519 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
520 self.events_tx.subscribe()
521 }
522
523 /// Snapshot of `## Deferred items` entries whose per-sweep attempt counter
524 /// is at or above [`crate::config::SweepConfig::escalate_after`]. Sorted
525 /// by descending attempts (text ascending as a deterministic tiebreaker)
526 /// and capped at [`STALE_ITEMS_PROMPT_CAP`] entries so the sweep prompt
527 /// stays bounded.
528 ///
529 /// The runner consults this every time it builds a sweep or sweep-auditor
530 /// prompt; `pitboss status` and the TUI also call it to surface the
531 /// "needs human attention" list. Returning a fresh `Vec` keeps the caller
532 /// from leaking the underlying `RunState` map.
533 pub fn stale_items(&self) -> Vec<prompts::StaleItem> {
534 let escalate = self.config.sweep.escalate_after.max(1);
535 let mut items: Vec<prompts::StaleItem> = self
536 .state
537 .deferred_item_attempts
538 .iter()
539 .filter(|(_, &n)| n >= escalate)
540 .map(|(text, &attempts)| prompts::StaleItem {
541 text: text.clone(),
542 attempts,
543 })
544 .collect();
545 items.sort_by(|a, b| b.attempts.cmp(&a.attempts).then(a.text.cmp(&b.text)));
546 items.truncate(STALE_ITEMS_PROMPT_CAP);
547 items
548 }
549
550 /// Drive the runner until the plan completes or a phase halts.
551 ///
552 /// Always emits exactly one terminal event before returning:
553 /// [`Event::RunFinished`] on the success path, [`Event::PhaseHalted`] on
554 /// the halt path. Subscribers (logger, TUI) treat either event as the
555 /// signal that no further events will arrive — the broadcast channel
556 /// itself does not close because `Runner` keeps owning the sender for
557 /// post-run lookups (e.g., PR creation reads `runner.state()`).
558 pub async fn run(&mut self) -> Result<RunSummary> {
559 // Resume guard: a previous run drove past the final phase and either
560 // halted inside the final-sweep loop or interrupted before the loop
561 // could clear `pending_sweep`. Re-enter the loop directly so we don't
562 // accidentally re-run the final phase below.
563 if self.is_post_final_phase_state() {
564 return self.finish_or_run_final_sweep_loop().await;
565 }
566 loop {
567 let result = self.run_phase().await?;
568 match result {
569 PhaseResult::Halted { phase_id, reason } => {
570 let _ = self.events_tx.send(Event::PhaseHalted {
571 phase_id: phase_id.clone(),
572 reason: reason.clone(),
573 });
574 return Ok(RunSummary::Halted { phase_id, reason });
575 }
576 PhaseResult::Advanced {
577 next_phase: None, ..
578 } => {
579 return self.finish_or_run_final_sweep_loop().await;
580 }
581 PhaseResult::Advanced { .. } => {}
582 }
583 }
584 }
585
586 /// True when state + plan describe "the final regular phase has committed
587 /// but `Runner::run` hasn't yet emitted [`Event::RunFinished`]". The
588 /// authoritative signal is the persisted `state.post_final_phase` flag,
589 /// set when the final phase commits in [`Runner::run_phase_inner`]. The
590 /// fallback inference (`completed.last() == current_phase &&
591 /// next_phase_id_after(...).is_none()`) covers state files written by
592 /// builds that predate the explicit flag.
593 fn is_post_final_phase_state(&self) -> bool {
594 if self.state.post_final_phase {
595 return true;
596 }
597 let Some(last_completed) = self.state.completed.last() else {
598 return false;
599 };
600 last_completed == &self.plan.current_phase
601 && self.next_phase_id_after(last_completed).is_none()
602 }
603
604 /// Shared tail used by both the success path of [`Runner::run`]'s phase
605 /// loop and the post-final-phase resume guard. Either dispatches the
606 /// bounded final-sweep drain loop (when sweeps are enabled, the trailing
607 /// drain is enabled, the operator did not pass `--no-sweep`, and at least
608 /// one unchecked item remains) or emits [`Event::RunFinished`] and
609 /// returns.
610 async fn finish_or_run_final_sweep_loop(&mut self) -> Result<RunSummary> {
611 let after = self
612 .state
613 .completed
614 .last()
615 .cloned()
616 .unwrap_or_else(|| self.plan.current_phase.clone());
617 if self.should_run_final_sweep_loop() {
618 return self.run_final_sweep_loop(after).await;
619 }
620 // No drain loop: clear any inherited `pending_sweep` (a `--no-sweep`
621 // resume of a halted final-sweep run gets here) before declaring the
622 // run finished, so a follow-up `pitboss play` doesn't see stale state.
623 if self.state.pending_sweep {
624 self.state.pending_sweep = false;
625 state::save(&self.workspace, Some(&self.state))
626 .context("runner: clearing pending_sweep at run finish")?;
627 }
628 let _ = self.events_tx.send(Event::RunFinished);
629 Ok(RunSummary::Finished)
630 }
631
632 /// Decide whether the final-sweep drain loop should run. The master
633 /// `[sweep] enabled` switch dominates the trailing-drain
634 /// `[sweep] final_sweep_enabled` toggle so an operator with sweeps
635 /// disabled never sees a surprise drain pass. `--no-sweep` suppresses the
636 /// loop the same way it suppresses between-phase sweeps.
637 fn should_run_final_sweep_loop(&self) -> bool {
638 if matches!(self.sweep_override, SweepOverride::Skip) {
639 return false;
640 }
641 if !self.config.sweep.enabled {
642 return false;
643 }
644 if !self.config.sweep.final_sweep_enabled {
645 return false;
646 }
647 sweep::unchecked_count(&self.deferred) > 0
648 }
649
650 /// Bounded final-sweep drain. Runs after the final regular phase commits
651 /// (or when a previous run halted inside this loop) and dispatches at most
652 /// [`crate::config::SweepConfig::final_sweep_max_iterations`] sweeps in a
653 /// row. Each iteration must resolve at least one item or the loop exits;
654 /// items the agent genuinely can't fix fall through to phase 05's
655 /// staleness machinery.
656 ///
657 /// `after` is the phase id every dispatch anchors on — the last completed
658 /// regular phase. Each iteration calls [`Runner::run_sweep_step_inner`] so
659 /// the existing sweep/auditor/staleness pipeline runs unchanged; the loop
660 /// adds only the iteration cap, the no-progress exit, and the per-iter
661 /// `pre_unchecked == 0` short-circuit.
662 async fn run_final_sweep_loop(&mut self, after: PhaseId) -> Result<RunSummary> {
663 // The post-final-phase commit reset `consecutive_sweeps` to 0 already,
664 // but a halted-then-resumed loop might inherit a stale value. We
665 // bypass the gate in [`Runner::run_phase_inner`] and dispatch
666 // [`Runner::run_sweep_step_inner`] directly, so the clamp can't pre-empt
667 // the loop — but resetting keeps the persisted counter honest for
668 // observers (`pitboss status`, the TUI).
669 self.state.consecutive_sweeps = 0;
670
671 let max_iters = self.config.sweep.final_sweep_max_iterations.max(1);
672 for _iter in 1..=max_iters {
673 let pre_unchecked = sweep::unchecked_count(&self.deferred);
674 if pre_unchecked == 0 {
675 // Drain succeeded (either the previous iteration cleared the
676 // last item, or we entered with an already-empty backlog).
677 break;
678 }
679 self.state.pending_sweep = true;
680 state::save(&self.workspace, Some(&self.state))
681 .context("runner: persisting pending_sweep for final-sweep iter")?;
682 let result = self.run_sweep_step(after.clone()).await?;
683 match result {
684 PhaseResult::Halted { reason, .. } => {
685 // `pending_sweep` stays true on the halt path so a resume
686 // re-enters this loop from iteration 1. Persist here
687 // because `run_sweep_step_inner`'s halt path doesn't save
688 // (the regular gate relies on `run_phase`'s save wrapper,
689 // which doesn't run on this code path), and phase 05's
690 // staleness counter increments must survive the halt so
691 // cumulative progress carries across resumes.
692 state::save(&self.workspace, Some(&self.state))
693 .context("runner: persisting state at final-sweep halt")?;
694 let _ = self.events_tx.send(Event::PhaseHalted {
695 phase_id: after.clone(),
696 reason: reason.clone(),
697 });
698 return Ok(RunSummary::Halted {
699 phase_id: after,
700 reason,
701 });
702 }
703 PhaseResult::Advanced { .. } => {
704 let post_unchecked = sweep::unchecked_count(&self.deferred);
705 let resolved = pre_unchecked.saturating_sub(post_unchecked);
706 if resolved == 0 {
707 // Stuck items survive the loop and surface via phase
708 // 05's staleness machinery.
709 break;
710 }
711 }
712 }
713 }
714 // Clean exit — `run_sweep_step_inner` already cleared `pending_sweep`
715 // on each successful iteration, but a `pre_unchecked == 0` short
716 // circuit at the top of iteration 1 (resume on a backlog the user
717 // drained by hand) needs us to clear it here too.
718 if self.state.pending_sweep {
719 self.state.pending_sweep = false;
720 state::save(&self.workspace, Some(&self.state))
721 .context("runner: clearing pending_sweep after final-sweep loop")?;
722 }
723 let _ = self.events_tx.send(Event::RunFinished);
724 Ok(RunSummary::Finished)
725 }
726
727 /// Execute the current phase to completion (success or halt).
728 ///
729 /// Persists [`RunState`] to `.pitboss/play/state.json` on every exit — including
730 /// halts — so the attempts counter and accumulated token usage survive a
731 /// halted phase and a subsequent `pitboss play` (or `pitboss rebuy`)
732 /// invocation can pick them up.
733 pub async fn run_phase(&mut self) -> Result<PhaseResult> {
734 let result = self.run_phase_inner().await;
735 if let Err(e) = state::save(&self.workspace, Some(&self.state)) {
736 tracing::error!("runner: failed to persist state.json: {e:#}");
737 }
738 result
739 }
740
741 async fn run_phase_inner(&mut self) -> Result<PhaseResult> {
742 // Pending sweep gate. A regular phase that closed above the trigger
743 // threshold sets `state.pending_sweep = true`; we re-evaluate the
744 // trigger here against the current on-disk deferred so a manual
745 // cleanup between resumes (the user clearing items by hand) drains
746 // the obligation cleanly rather than firing a no-op sweep.
747 if self.state.pending_sweep {
748 // Re-read from disk so external edits between resumes are
749 // observed, not just the cached `self.deferred`.
750 let deferred_path = paths::deferred_path(&self.workspace);
751 let on_disk = match std::fs::read_to_string(&deferred_path) {
752 Ok(s) => s,
753 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
754 Err(e) => {
755 return Err(anyhow::Error::new(e)
756 .context(format!("runner: reading {:?}", &deferred_path)))
757 }
758 };
759 let parsed = deferred::parse(&on_disk).unwrap_or_else(|_| self.deferred.clone());
760 // `--no-sweep` shouldn't have left `pending_sweep` armed (the
761 // builder clears it), but defend against an externally set flag
762 // by short-circuiting here. `--sweep` bypasses the trigger so a
763 // forced sweep below the threshold still fires once.
764 let allow = match self.sweep_override {
765 SweepOverride::Skip => false,
766 SweepOverride::Force => true,
767 SweepOverride::None => sweep::should_run_deferred_sweep(
768 &parsed,
769 &self.config.sweep,
770 self.state.consecutive_sweeps,
771 ),
772 };
773 if allow {
774 if let Some(prompt_after) = self.state.completed.last().cloned() {
775 self.deferred = parsed;
776 let result = self.run_sweep_step(prompt_after).await?;
777 if matches!(result, PhaseResult::Advanced { .. })
778 && matches!(self.sweep_override, SweepOverride::Force)
779 {
780 // `--sweep` is a one-shot directive: fire one
781 // forced sweep at the next inter-phase boundary.
782 // After the sweep advances, demote the override so
783 // subsequent post-phase triggers fall back to the
784 // configured threshold logic and we don't sweep
785 // between every pair of phases.
786 self.sweep_override = SweepOverride::None;
787 }
788 return Ok(result);
789 }
790 // Fresh run + `--sweep` (the only path that lands here with
791 // `state.completed` empty): silently no-op. There is no
792 // completed phase to anchor on, so the sweep would dispatch
793 // with `prompt_after = None` against an empty history —
794 // operators don't expect "between phases" wording before the
795 // first phase has even started. Clear the obligation, log the
796 // skip for visibility, and fall through to phase 01.
797 tracing::info!("skipping forced sweep: no completed phases yet to anchor on");
798 self.state.pending_sweep = false;
799 state::save(&self.workspace, Some(&self.state))
800 .context("runner: persisting state.json after fresh-run force-sweep no-op")?;
801 self.deferred = parsed;
802 } else {
803 self.state.pending_sweep = false;
804 state::save(&self.workspace, Some(&self.state))
805 .context("runner: persisting state.json after sweep gate cleared")?;
806 self.deferred = parsed;
807 }
808 }
809
810 let phase = self
811 .plan
812 .phase(&self.plan.current_phase)
813 .cloned()
814 .ok_or_else(|| {
815 anyhow!(
816 "plan.current_phase {:?} is not present in plan.phases",
817 self.plan.current_phase.as_str()
818 )
819 })?;
820 let phase_id = phase.id.clone();
821
822 if let Some(reason) = self.check_budget() {
823 return Ok(PhaseResult::Halted { phase_id, reason });
824 }
825
826 let attempt = self.bump_attempts(&phase_id);
827 let _ = self.events_tx.send(Event::PhaseStarted {
828 phase_id: phase_id.clone(),
829 title: phase.title.clone(),
830 attempt,
831 });
832
833 let plan_path = paths::plan_path(&self.workspace);
834 let deferred_path = paths::deferred_path(&self.workspace);
835 // Every pitboss artifact lives under `.pitboss/`, which is gitignored,
836 // so one exclude entry covers plan.md, deferred.md, state.json, logs,
837 // snapshots, grind state, etc.
838 let exclude: [&Path; 1] = [Path::new(".pitboss")];
839
840 let spec = DispatchSpec {
841 request: self.implementer_request(&phase, attempt),
842 phase_id: phase_id.clone(),
843 phase: Some(&phase),
844 plan_path: &plan_path,
845 deferred_path: &deferred_path,
846 exclude_paths: &exclude,
847 audit: self
848 .config
849 .audit
850 .enabled
851 .then_some(AuditKind::Phase { phase: &phase }),
852 };
853
854 let has_changes = match self.run_dispatch_pipeline(spec).await? {
855 PipelineOutcome::Halted(reason) => return Ok(PhaseResult::Halted { phase_id, reason }),
856 PipelineOutcome::Staged { has_changes } => has_changes,
857 };
858
859 let commit = if has_changes {
860 let id = self
861 .git
862 .commit(&git::commit_message(&phase_id, &phase.title))
863 .await
864 .context("runner: committing phase")?;
865 Some(id)
866 } else {
867 warn!(phase = %phase_id, "phase produced no code changes; skipping commit");
868 None
869 };
870
871 self.deferred.sweep();
872 write_atomic(
873 &deferred_path,
874 deferred::serialize(&self.deferred).as_bytes(),
875 )
876 .context("runner: writing deferred.md after sweep")?;
877
878 self.state.completed.push(phase_id.clone());
879 // Forward step → re-arm the consecutive-sweep clamp so the next
880 // pending sweep is allowed to fire.
881 self.state.consecutive_sweeps = 0;
882
883 let next_phase = self.next_phase_id_after(&phase_id);
884 if next_phase.is_none() {
885 // Final regular phase just committed. Persist the flag so a
886 // subsequent `pitboss play` resume re-enters the final-sweep
887 // drain loop directly rather than re-running the phase.
888 self.state.post_final_phase = true;
889 }
890 if let Some(ref next) = next_phase {
891 self.plan.set_current_phase(next.clone());
892 write_atomic(&plan_path, plan::serialize(&self.plan).as_bytes())
893 .context("runner: writing plan.md with advanced current_phase")?;
894 // Only consider scheduling a between-phase sweep when there is a
895 // next phase to insert it before. End-of-run sweeps (after the
896 // final phase) belong to phase 08. `--no-sweep` suppresses
897 // arming entirely; `--sweep` arms once and is consumed by the
898 // gate at the top of `run_phase_inner` after the sweep
899 // advances, so a multi-phase fresh run with `--sweep` fires
900 // its forced sweep at *this* boundary (between the just-
901 // committed first phase and the next), then reverts to
902 // threshold-driven behavior for subsequent boundaries.
903 if !matches!(self.sweep_override, SweepOverride::Skip)
904 && (matches!(self.sweep_override, SweepOverride::Force)
905 || sweep::should_run_deferred_sweep(
906 &self.deferred,
907 &self.config.sweep,
908 self.state.consecutive_sweeps,
909 ))
910 {
911 self.state.pending_sweep = true;
912 }
913 }
914
915 state::save(&self.workspace, Some(&self.state)).context("runner: persisting state.json")?;
916
917 let _ = self.events_tx.send(Event::PhaseCommitted {
918 phase_id: phase_id.clone(),
919 commit: commit.clone(),
920 });
921
922 Ok(PhaseResult::Advanced {
923 phase_id,
924 next_phase,
925 commit,
926 })
927 }
928
929 /// Build the implementer [`AgentRequest`] for `phase`. `attempt` is the
930 /// 1-based dispatch counter the caller pulled from [`bump_attempts`]; it
931 /// flows into the per-attempt log filename so a fixer re-dispatch later in
932 /// the phase does not clobber the implementer's log.
933 fn implementer_request(&self, phase: &crate::plan::Phase, attempt: u32) -> AgentRequest {
934 AgentRequest {
935 role: Role::Implementer,
936 model: self.config.models.implementer.clone(),
937 system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
938 user_prompt: prompts::implementer(&self.plan, &self.deferred, phase),
939 workdir: self.workspace.clone(),
940 log_path: self.attempt_log_path(&phase.id, "implementer", attempt),
941 timeout: DEFAULT_AGENT_TIMEOUT,
942 env: std::collections::HashMap::new(),
943 }
944 }
945
946 /// Drive the dispatch → validate → tests → fixer → optional auditor → stage
947 /// chain for one already-built [`AgentRequest`]. The caller is responsible
948 /// for bumping the attempts counter and emitting the "started" event for
949 /// the dispatch type before invoking this; on return either a halt reason
950 /// is surfaced or the working tree has been staged and `has_changes`
951 /// indicates whether anything outside `exclude_paths` ended up in the
952 /// index. Phase 03 reuses this helper from a sweep entry point, which is
953 /// why the caller — not the helper — owns the commit decision.
954 async fn run_dispatch_pipeline(&mut self, spec: DispatchSpec<'_>) -> Result<PipelineOutcome> {
955 let DispatchSpec {
956 request,
957 phase_id,
958 phase,
959 plan_path,
960 deferred_path,
961 exclude_paths,
962 audit,
963 } = spec;
964
965 let test_runner = match self
966 .run_dispatch_through_fixer(request, phase, plan_path, deferred_path, &phase_id)
967 .await?
968 {
969 DispatchOutcome::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
970 DispatchOutcome::Continue { test_runner } => test_runner,
971 };
972
973 self.run_audit_and_stage(
974 audit,
975 test_runner.as_ref(),
976 plan_path,
977 deferred_path,
978 exclude_paths,
979 &phase_id,
980 )
981 .await
982 }
983
984 /// First half of the pipeline: dispatch the agent, validate planning
985 /// artifacts, run the project tests, and drive the fixer loop on failure.
986 /// Returns the resolved test runner (or `None` when no runner was
987 /// detected) so the caller can hand it to [`Runner::run_audit_and_stage`]
988 /// without re-detecting.
989 ///
990 /// Split out from [`Runner::run_dispatch_pipeline`] so the sweep call
991 /// site can interpose between dispatch and audit: by then `self.deferred`
992 /// reflects the implementer's edits, so the caller can compute the
993 /// `resolved` / `remaining` lists and build [`AuditKind::Sweep`] with
994 /// the spec-shape fields filled in directly.
995 async fn run_dispatch_through_fixer(
996 &mut self,
997 request: AgentRequest,
998 phase: Option<&crate::plan::Phase>,
999 plan_path: &Path,
1000 deferred_path: &Path,
1001 phase_id: &PhaseId,
1002 ) -> Result<DispatchOutcome> {
1003 let role = request.role;
1004 match self
1005 .dispatch_and_validate(request, role, plan_path, deferred_path)
1006 .await?
1007 {
1008 ValidationResult::Continue => {}
1009 ValidationResult::Halt(reason) => return Ok(DispatchOutcome::Halted(reason)),
1010 }
1011
1012 let test_runner = if self.skip_tests {
1013 debug!("dry-run: skipping test detection and execution");
1014 None
1015 } else {
1016 project_tests::detect(&self.workspace, self.config.tests.command.as_deref())
1017 };
1018 if let Some(runner) = &test_runner {
1019 // The post-dispatch test log shares the agent's attempt number so
1020 // operators can pair them up at a glance. `bump_attempts` was
1021 // called by the caller before building `request`, so reading
1022 // state.attempts here yields exactly that attempt.
1023 let attempt = self.state.attempts.get(phase_id).copied().unwrap_or(0);
1024 let outcome = self.run_tests(runner, phase_id, "tests", attempt).await?;
1025 if !outcome.passed {
1026 match self
1027 .run_fixer_loop(
1028 phase_id,
1029 phase,
1030 runner,
1031 plan_path,
1032 deferred_path,
1033 outcome.summary,
1034 )
1035 .await?
1036 {
1037 FixerLoopResult::Passed => {}
1038 FixerLoopResult::Halted(reason) => {
1039 return Ok(DispatchOutcome::Halted(reason));
1040 }
1041 }
1042 }
1043 } else {
1044 if !self.skip_tests {
1045 debug!("no test runner detected and no override configured; skipping tests");
1046 }
1047 let _ = self.events_tx.send(Event::TestsSkipped);
1048 }
1049
1050 Ok(DispatchOutcome::Continue { test_runner })
1051 }
1052
1053 /// Second half of the pipeline: run the optional auditor pass, then
1054 /// re-stage code-only changes so the caller can decide whether to commit.
1055 ///
1056 /// Split out from [`Runner::run_dispatch_pipeline`] for the same reason
1057 /// as [`Runner::run_dispatch_through_fixer`] — see that method's doc for
1058 /// the why. Phase callers reach this through the
1059 /// [`Runner::run_dispatch_pipeline`] wrapper; sweep callers invoke it
1060 /// directly so they can build [`AuditKind::Sweep`] with the
1061 /// post-dispatch `resolved` / `remaining` lists.
1062 async fn run_audit_and_stage(
1063 &mut self,
1064 audit: Option<AuditKind<'_>>,
1065 test_runner: Option<&project_tests::TestRunner>,
1066 plan_path: &Path,
1067 deferred_path: &Path,
1068 exclude_paths: &[&Path],
1069 phase_id: &PhaseId,
1070 ) -> Result<PipelineOutcome> {
1071 if let Some(audit) = audit {
1072 match self
1073 .run_auditor_pass(
1074 audit,
1075 test_runner,
1076 plan_path,
1077 deferred_path,
1078 exclude_paths,
1079 phase_id,
1080 )
1081 .await?
1082 {
1083 AuditPassResult::Continue => {}
1084 AuditPassResult::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
1085 }
1086 }
1087
1088 // Re-stage to capture anything the auditor added or modified. When the
1089 // auditor was skipped (disabled, or no code changes to audit) this is
1090 // the first stage call of the phase.
1091 self.git
1092 .stage_changes(exclude_paths)
1093 .await
1094 .context("runner: staging code-only changes")?;
1095
1096 let has_changes = self
1097 .git
1098 .has_staged_changes()
1099 .await
1100 .context("runner: checking for staged changes")?;
1101
1102 Ok(PipelineOutcome::Staged { has_changes })
1103 }
1104
1105 /// Compare the running [`RunState::token_usage`] against the configured
1106 /// budgets. Returns [`HaltReason::BudgetExceeded`] when either cap has
1107 /// been met or surpassed, otherwise `None`. Called before every agent
1108 /// dispatch so a fresh run never exceeds its budget by more than one
1109 /// dispatch's worth of tokens.
1110 fn check_budget(&self) -> Option<HaltReason> {
1111 let (tokens, usd) = budget_totals(&self.config, &self.state.token_usage);
1112 if let Some(cap) = self.config.budgets.max_total_tokens {
1113 if tokens >= cap {
1114 return Some(HaltReason::BudgetExceeded(format!(
1115 "token budget reached: {tokens} >= cap {cap}"
1116 )));
1117 }
1118 }
1119 if let Some(cap) = self.config.budgets.max_total_usd {
1120 if usd >= cap {
1121 return Some(HaltReason::BudgetExceeded(format!(
1122 "USD budget reached: ${usd:.4} >= cap ${cap:.4}"
1123 )));
1124 }
1125 }
1126 None
1127 }
1128
1129 fn next_phase_id_after(&self, current: &PhaseId) -> Option<PhaseId> {
1130 self.plan
1131 .phases
1132 .iter()
1133 .find(|p| p.id > *current)
1134 .map(|p| p.id.clone())
1135 }
1136
1137 /// Increment and return the per-phase attempt counter. The counter mirrors
1138 /// every agent dispatch made for a phase (implementer + each fixer
1139 /// attempt) so [`crate::state::RunState::attempts`] is the single source
1140 /// of truth for "how many model dispatches did this phase consume."
1141 fn bump_attempts(&mut self, phase_id: &PhaseId) -> u32 {
1142 let entry = self.state.attempts.entry(phase_id.clone()).or_insert(0);
1143 *entry += 1;
1144 *entry
1145 }
1146
1147 fn attempt_log_path(&self, phase_id: &PhaseId, role: &str, attempt: u32) -> PathBuf {
1148 paths::play_logs_dir(&self.workspace)
1149 .join(format!("phase-{}-{}-{}.log", phase_id, role, attempt))
1150 }
1151
1152 /// Per-attempt log path for a sweep dispatch. Distinct prefix from
1153 /// [`Runner::attempt_log_path`] so an operator scanning `.pitboss/play/logs/`
1154 /// can tell sweep dispatches apart from regular phase dispatches at a
1155 /// glance even though both account against `state.attempts[after]`.
1156 fn sweep_log_path(&self, after: &PhaseId, role: &str, attempt: u32) -> PathBuf {
1157 paths::play_logs_dir(&self.workspace)
1158 .join(format!("sweep-after-{}-{}-{}.log", after, role, attempt))
1159 }
1160
1161 /// Run a one-shot deferred sweep against the loaded state without
1162 /// advancing the plan state machine. Backs the `pitboss sweep`
1163 /// subcommand: an operator who edited `deferred.md` by hand or wants to
1164 /// drain a backlog ahead of the next `pitboss play` can invoke this in
1165 /// isolation.
1166 ///
1167 /// `after` is the prompt's `after_phase` label — when `None`, the sweep
1168 /// prompt renders the standalone variant ("no preceding phase to anchor
1169 /// on"). Accounting (attempts counter, log filename, events, commit
1170 /// message) falls back to the plan's current phase when no `after` is
1171 /// supplied so the dispatch still has a stable phase id to key on.
1172 ///
1173 /// `max_items` clamps the prompt's pending-items list to the first N
1174 /// items in document order without changing the on-disk file. Use it to
1175 /// keep a pathological backlog (100+ items) within the agent's effective
1176 /// context window; remaining items surface on the next sweep.
1177 pub async fn run_standalone_sweep(
1178 &mut self,
1179 after: Option<PhaseId>,
1180 max_items: Option<usize>,
1181 persist_state: bool,
1182 ) -> Result<PhaseResult> {
1183 let accounting = after
1184 .clone()
1185 .unwrap_or_else(|| self.plan.current_phase.clone());
1186 self.run_sweep_step_inner(accounting, after, max_items, persist_state)
1187 .await
1188 }
1189
1190 /// Dispatch one sweep anchored on `after`. Common-case wrapper around
1191 /// [`Runner::run_sweep_step_inner`] that fixes `prompt_after = Some(after)`
1192 /// and `max_items = None` — the natural defaults for both the inter-phase
1193 /// gate (when there is at least one completed phase to anchor on) and
1194 /// phase 08's final-sweep drain loop. Bespoke entry points that need
1195 /// either knob ([`Runner::run_standalone_sweep`] for `max_items`, the
1196 /// fresh-run path inside `run_phase_inner` for `prompt_after = None`)
1197 /// continue to call `run_sweep_step_inner` directly.
1198 async fn run_sweep_step(&mut self, after: PhaseId) -> Result<PhaseResult> {
1199 self.run_sweep_step_inner(after.clone(), Some(after), None, true)
1200 .await
1201 }
1202
1203 /// Shared body of the inter-phase sweep gate (in
1204 /// [`Runner::run_phase_inner`]) and [`Runner::run_standalone_sweep`].
1205 ///
1206 /// The sweep reuses [`Runner::run_dispatch_pipeline`] with `phase: None`
1207 /// and `audit: Some(AuditKind::Sweep { .. })` when
1208 /// [`crate::config::SweepConfig::audit_enabled`] is on. The implementer's
1209 /// prompt is built via [`prompts::sweep`]; if tests fail post-dispatch
1210 /// the fixer falls back to [`prompts::fixer_for_sweep`] inside the
1211 /// pipeline.
1212 ///
1213 /// `accounting` is the phase id used everywhere a sweep needs a stable
1214 /// key — `state.attempts`, the per-attempt log filename, the
1215 /// `Sweep{Started,Halted,Completed}` events, and the sweep commit
1216 /// message. `prompt_after` is what the implementer / auditor / fixer
1217 /// prompts read as the `{after}` substitution; `None` selects the
1218 /// standalone-sweep variant in [`prompts::sweep`]. `max_items`, when
1219 /// `Some(n)`, clamps the pending-items list passed into
1220 /// [`prompts::sweep`] to the first `n` pending items in document order.
1221 /// The on-disk `deferred.md` is unchanged: items dropped from the prompt
1222 /// view stay in the file and surface on the next sweep.
1223 async fn run_sweep_step_inner(
1224 &mut self,
1225 accounting: PhaseId,
1226 prompt_after: Option<PhaseId>,
1227 max_items: Option<usize>,
1228 persist_state: bool,
1229 ) -> Result<PhaseResult> {
1230 // Capture pre-sweep accounting. `pre_unchecked` drives the resolved
1231 // count for the commit message; `pre_texts` is the unchecked-item
1232 // text snapshot the post-dispatch step diffs against to derive
1233 // [`AuditKind::Sweep`]'s `resolved` / `remaining` lists and the
1234 // staleness counter increments for surviving items.
1235 let pre_unchecked = sweep::unchecked_count(&self.deferred);
1236 let pre_texts: HashSet<String> = self
1237 .deferred
1238 .items
1239 .iter()
1240 .filter(|i| !i.done)
1241 .map(|i| i.text.clone())
1242 .collect();
1243 // Snapshot the H3 phases block. The sweep prompt forbids touching
1244 // `## Deferred phases`; a mismatch on the way out trips the
1245 // DeferredInvalid guard.
1246 let pre_phases = phases_block_canonical(&self.deferred);
1247
1248 let plan_path = paths::plan_path(&self.workspace);
1249 let deferred_path = paths::deferred_path(&self.workspace);
1250 let pre_deferred_bytes = match std::fs::read(&deferred_path) {
1251 Ok(b) => b,
1252 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Vec::new(),
1253 Err(e) => {
1254 return Err(anyhow::Error::new(e)
1255 .context(format!("runner: reading {:?} before sweep", &deferred_path)))
1256 }
1257 };
1258
1259 if let Some(reason) = self.check_budget() {
1260 // Pre-dispatch budget halt — the agent never ran, so this halt
1261 // doesn't tick the staleness clock. Combined with
1262 // `state.pending_sweep` retrying the same logical sweep on
1263 // resume, ticking here would double-count one operator-visible
1264 // attempt: once for the budget halt, once for the resume's
1265 // post-dispatch increment. Skipping it here keeps "one attempt
1266 // per implementer dispatch" as the rule.
1267 return Ok(PhaseResult::Halted {
1268 phase_id: accounting,
1269 reason,
1270 });
1271 }
1272
1273 let attempt = self.bump_attempts(&accounting);
1274 let _ = self.events_tx.send(Event::SweepStarted {
1275 after: accounting.clone(),
1276 items_pending: pre_unchecked,
1277 attempt,
1278 });
1279
1280 let stale = self.stale_items();
1281 let prompt_doc = match max_items {
1282 Some(n) => clamp_pending_items(&self.deferred, n),
1283 None => self.deferred.clone(),
1284 };
1285 let request = AgentRequest {
1286 role: Role::Implementer,
1287 model: self.config.models.implementer.clone(),
1288 system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1289 user_prompt: prompts::sweep(&self.plan, &prompt_doc, prompt_after.as_ref(), &stale),
1290 workdir: self.workspace.clone(),
1291 log_path: self.sweep_log_path(&accounting, "implementer", attempt),
1292 timeout: DEFAULT_AGENT_TIMEOUT,
1293 env: std::collections::HashMap::new(),
1294 };
1295
1296 let exclude: [&Path; 1] = [Path::new(".pitboss")];
1297
1298 // Split-pipeline dance: run the implementer dispatch + tests + fixer
1299 // first so `self.deferred` reflects the agent's edits, then build
1300 // [`AuditKind::Sweep`] with the spec-shape `resolved` / `remaining`
1301 // lists computed against `pre_texts`, then run the audit + stage half.
1302 let halt_with_staleness =
1303 |this: &mut Self, reason: HaltReason, pre_texts: &HashSet<String>| {
1304 // Halt-path bookkeeping fires before the halt event so the
1305 // staleness counter for surviving items reflects this attempt.
1306 // For dispatch_and_validate halts, `self.deferred` is the
1307 // pre-dispatch state (validation rolled back). For halts after
1308 // a successful implementer dispatch (test failure, audit
1309 // failure), `self.deferred` reflects the agent's edits — items
1310 // the agent flipped to `- [x]` are not in post_unchecked and
1311 // get pruned, items that survived get incremented.
1312 this.apply_sweep_staleness(pre_texts);
1313 let _ = this.events_tx.send(Event::SweepHalted {
1314 after: accounting.clone(),
1315 reason: reason.clone(),
1316 });
1317 PhaseResult::Halted {
1318 phase_id: accounting.clone(),
1319 reason,
1320 }
1321 };
1322
1323 let test_runner = match self
1324 .run_dispatch_through_fixer(request, None, &plan_path, &deferred_path, &accounting)
1325 .await?
1326 {
1327 DispatchOutcome::Halted(reason) => {
1328 return Ok(halt_with_staleness(self, reason, &pre_texts));
1329 }
1330 DispatchOutcome::Continue { test_runner } => test_runner,
1331 };
1332
1333 let audit = self.config.sweep.audit_enabled.then(|| {
1334 // The sweep prompt forbids rewording, so matching the pre-sweep
1335 // unchecked snapshot by exact text is sound.
1336 let resolved: Vec<String> = self
1337 .deferred
1338 .items
1339 .iter()
1340 .filter(|i| i.done && pre_texts.contains(&i.text))
1341 .map(|i| i.text.clone())
1342 .collect();
1343 let remaining: Vec<String> = self
1344 .deferred
1345 .items
1346 .iter()
1347 .filter(|i| !i.done)
1348 .map(|i| i.text.clone())
1349 .collect();
1350 AuditKind::Sweep {
1351 after: accounting.clone(),
1352 resolved,
1353 remaining,
1354 }
1355 });
1356
1357 let has_changes = match self
1358 .run_audit_and_stage(
1359 audit,
1360 test_runner.as_ref(),
1361 &plan_path,
1362 &deferred_path,
1363 &exclude,
1364 &accounting,
1365 )
1366 .await?
1367 {
1368 PipelineOutcome::Halted(reason) => {
1369 return Ok(halt_with_staleness(self, reason, &pre_texts));
1370 }
1371 PipelineOutcome::Staged { has_changes } => has_changes,
1372 };
1373
1374 // H3 invariant: the sweep prompt forbids editing `## Deferred phases`.
1375 // Restore from the pre-dispatch deferred snapshot when violated so the
1376 // halt is genuinely recoverable.
1377 let post_phases = phases_block_canonical(&self.deferred);
1378 if post_phases != pre_phases {
1379 warn!(
1380 after = %accounting,
1381 "sweep modified ## Deferred phases; restoring deferred.md"
1382 );
1383 self.restore_deferred(&deferred_path, &pre_deferred_bytes, true)?;
1384 // Re-parse the restored bytes so the cached doc agrees with disk.
1385 let restored = std::fs::read_to_string(&deferred_path).unwrap_or_default();
1386 if let Ok(parsed) = deferred::parse(&restored) {
1387 self.deferred = parsed;
1388 }
1389 // After restoration `self.deferred` matches pre-state, so the
1390 // bookkeeping treats every pre-text item as a survivor (+1 attempt).
1391 self.apply_sweep_staleness(&pre_texts);
1392 let reason = HaltReason::DeferredInvalid("sweep modified Deferred phases".into());
1393 let _ = self.events_tx.send(Event::SweepHalted {
1394 after: accounting.clone(),
1395 reason: reason.clone(),
1396 });
1397 return Ok(PhaseResult::Halted {
1398 phase_id: accounting,
1399 reason,
1400 });
1401 }
1402
1403 // `resolved` counts items the sweep flipped from `- [ ]` to `- [x]`.
1404 // `saturating_sub` keeps new items the agent appended (against the
1405 // prompt's instruction) from polluting the count into a negative.
1406 let resolved = pre_unchecked.saturating_sub(sweep::unchecked_count(&self.deferred));
1407
1408 let commit = if has_changes {
1409 let id = self
1410 .git
1411 .commit(&git::commit_message_sweep(&accounting, resolved))
1412 .await
1413 .context("runner: committing sweep")?;
1414 Some(id)
1415 } else {
1416 warn!(after = %accounting, "sweep produced no code changes; skipping commit");
1417 None
1418 };
1419
1420 // Staleness bookkeeping must run before `self.deferred.sweep()` would
1421 // matter, but since the helper filters on `!i.done` either order
1422 // produces the same `post_unchecked_texts`. Doing it here keeps the
1423 // success path symmetrical with the halt paths above.
1424 self.apply_sweep_staleness(&pre_texts);
1425
1426 // Drop the items the agent ticked off so a later regular phase doesn't
1427 // re-render them in the next implementer prompt. Mirrors the
1428 // post-phase sweep call in `run_phase_inner`.
1429 self.deferred.sweep();
1430 write_atomic(
1431 &deferred_path,
1432 deferred::serialize(&self.deferred).as_bytes(),
1433 )
1434 .context("runner: writing deferred.md after sweep step")?;
1435
1436 self.state.pending_sweep = false;
1437 self.state.consecutive_sweeps = self.state.consecutive_sweeps.saturating_add(1);
1438 // `state.completed` tracks plan progress only; sweeps do not push to
1439 // it. `current_phase` already advanced when the preceding phase
1440 // committed, so the runner picks the regular phase up on the next
1441 // `run_phase` call.
1442 //
1443 // `persist_state = false` is the standalone-sweep-on-fresh-workspace
1444 // case: state was synthesized in memory and the caller doesn't want
1445 // an empty run claiming the workspace by leaving a state.json
1446 // behind. The runner skips the write so the CLI doesn't have to
1447 // clean up after it.
1448 if persist_state {
1449 state::save(&self.workspace, Some(&self.state))
1450 .context("runner: persisting state.json after sweep")?;
1451 }
1452
1453 let _ = self.events_tx.send(Event::SweepCompleted {
1454 after: accounting.clone(),
1455 resolved,
1456 commit: commit.clone(),
1457 });
1458
1459 Ok(PhaseResult::Advanced {
1460 phase_id: accounting,
1461 next_phase: Some(self.plan.current_phase.clone()),
1462 commit,
1463 })
1464 }
1465
1466 /// Run a single agent dispatch and validate the planning artifacts on the
1467 /// way out: snapshot `plan.md` + `deferred.md` first, dispatch, then
1468 /// require `plan.md` to be byte-identical and `deferred.md` to re-parse.
1469 /// On any failure mode the artifacts are restored from the pre-dispatch
1470 /// snapshots before returning [`ValidationResult::Halt`].
1471 async fn dispatch_and_validate(
1472 &mut self,
1473 request: AgentRequest,
1474 role: Role,
1475 plan_path: &Path,
1476 deferred_path: &Path,
1477 ) -> Result<ValidationResult> {
1478 let plan_pre =
1479 std::fs::read(plan_path).with_context(|| format!("runner: reading {:?}", plan_path))?;
1480 let plan_hash = Snapshot::of_bytes(&plan_pre);
1481 let (deferred_pre, deferred_existed) = match std::fs::read(deferred_path) {
1482 Ok(b) => (b, true),
1483 Err(e) if e.kind() == std::io::ErrorKind::NotFound => (Vec::new(), false),
1484 Err(e) => {
1485 return Err(
1486 anyhow::Error::new(e).context(format!("runner: reading {:?}", deferred_path))
1487 )
1488 }
1489 };
1490
1491 let dispatch = self.dispatch_agent(request).await?;
1492 // Token usage is folded regardless of whether the dispatch ended
1493 // cleanly — even an aborted run can incur partial spend.
1494 self.fold_token_usage(role, &dispatch);
1495
1496 match &dispatch.stop_reason {
1497 StopReason::Completed => {}
1498 StopReason::Timeout => {
1499 return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
1500 "agent {:?} timed out after {:?}",
1501 self.agent.name(),
1502 DEFAULT_AGENT_TIMEOUT
1503 ))));
1504 }
1505 StopReason::Cancelled => {
1506 return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
1507 "agent {:?} was cancelled",
1508 self.agent.name()
1509 ))));
1510 }
1511 StopReason::Error(msg) => {
1512 return Ok(ValidationResult::Halt(HaltReason::AgentFailure(
1513 msg.clone(),
1514 )));
1515 }
1516 }
1517
1518 let plan_post = std::fs::read(plan_path)
1519 .with_context(|| format!("runner: reading {:?} after agent", plan_path))?;
1520 if Snapshot::of_bytes(&plan_post) != plan_hash {
1521 warn!(role = %role, "agent modified plan.md; restoring from snapshot");
1522 write_atomic(plan_path, &plan_pre).with_context(|| {
1523 format!(
1524 "runner: restoring {:?} from snapshot after tamper",
1525 plan_path
1526 )
1527 })?;
1528 return Ok(ValidationResult::Halt(HaltReason::PlanTampered));
1529 }
1530
1531 let deferred_text = match std::fs::read_to_string(deferred_path) {
1532 Ok(s) => s,
1533 Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
1534 Err(e) => {
1535 return Err(anyhow::Error::new(e)
1536 .context(format!("runner: reading {:?} after agent", deferred_path)))
1537 }
1538 };
1539 match deferred::parse(&deferred_text) {
1540 Ok(parsed) => {
1541 self.deferred = parsed;
1542 }
1543 Err(e) => {
1544 let msg = format!("{e}");
1545 warn!(role = %role, error = %msg, "deferred.md is invalid; restoring");
1546 self.restore_deferred(deferred_path, &deferred_pre, deferred_existed)?;
1547 return Ok(ValidationResult::Halt(HaltReason::DeferredInvalid(msg)));
1548 }
1549 }
1550
1551 Ok(ValidationResult::Continue)
1552 }
1553
1554 /// Run the project's test suite once, emitting [`Event::TestStarted`] /
1555 /// [`Event::TestFinished`] around the call. `log_role` distinguishes the
1556 /// log filename so a fixer-driven re-run does not clobber the prior log.
1557 async fn run_tests(
1558 &self,
1559 runner: &project_tests::TestRunner,
1560 phase_id: &PhaseId,
1561 log_role: &str,
1562 attempt: u32,
1563 ) -> Result<project_tests::TestOutcome> {
1564 let _ = self.events_tx.send(Event::TestStarted);
1565 let test_log = self.attempt_log_path(phase_id, log_role, attempt);
1566 let outcome = runner
1567 .run(test_log)
1568 .await
1569 .context("runner: running project tests")?;
1570 let _ = self.events_tx.send(Event::TestFinished {
1571 passed: outcome.passed,
1572 summary: outcome.summary.clone(),
1573 });
1574 Ok(outcome)
1575 }
1576
1577 /// Drive the bounded fixer loop after the implementer's tests fail.
1578 ///
1579 /// Up to [`crate::config::RetryBudgets::fixer_max_attempts`] dispatches:
1580 /// each one snapshots the planning artifacts, dispatches the fixer agent,
1581 /// validates, then re-runs the tests. The first attempt that produces a
1582 /// passing test run resolves to [`FixerLoopResult::Passed`]; once the
1583 /// budget is exhausted the loop returns the final test summary as
1584 /// [`HaltReason::TestsFailed`].
1585 async fn run_fixer_loop(
1586 &mut self,
1587 phase_id: &PhaseId,
1588 phase: Option<&crate::plan::Phase>,
1589 test_runner: &project_tests::TestRunner,
1590 plan_path: &Path,
1591 deferred_path: &Path,
1592 initial_summary: String,
1593 ) -> Result<FixerLoopResult> {
1594 let budget = self.config.retries.fixer_max_attempts;
1595 if budget == 0 {
1596 return Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
1597 initial_summary,
1598 )));
1599 }
1600
1601 let mut last_summary = initial_summary;
1602
1603 for fixer_attempt in 1..=budget {
1604 if let Some(reason) = self.check_budget() {
1605 return Ok(FixerLoopResult::Halted(reason));
1606 }
1607 let total_attempt = self.bump_attempts(phase_id);
1608 let _ = self.events_tx.send(Event::FixerStarted {
1609 phase_id: phase_id.clone(),
1610 fixer_attempt,
1611 attempt: total_attempt,
1612 });
1613
1614 let user_prompt = match phase {
1615 Some(p) => {
1616 prompts::fixer_with_deferred(&self.plan, p, &last_summary, &self.deferred)
1617 }
1618 None => prompts::fixer_for_sweep(&self.plan, &self.deferred, &last_summary),
1619 };
1620 let log_path = self.attempt_log_path(phase_id, "fix", fixer_attempt);
1621 let request = AgentRequest {
1622 role: Role::Fixer,
1623 model: self.config.models.fixer.clone(),
1624 system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1625 user_prompt,
1626 workdir: self.workspace.clone(),
1627 log_path,
1628 timeout: DEFAULT_AGENT_TIMEOUT,
1629 env: std::collections::HashMap::new(),
1630 };
1631
1632 match self
1633 .dispatch_and_validate(request, Role::Fixer, plan_path, deferred_path)
1634 .await?
1635 {
1636 ValidationResult::Continue => {}
1637 ValidationResult::Halt(reason) => return Ok(FixerLoopResult::Halted(reason)),
1638 }
1639
1640 let outcome = self
1641 .run_tests(test_runner, phase_id, "tests", total_attempt)
1642 .await?;
1643 if outcome.passed {
1644 return Ok(FixerLoopResult::Passed);
1645 }
1646 last_summary = outcome.summary;
1647 }
1648
1649 Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
1650 last_summary,
1651 )))
1652 }
1653
1654 /// Run the auditor agent. Gating on
1655 /// [`crate::config::AuditConfig::enabled`] is the caller's responsibility:
1656 /// [`Runner::run_dispatch_pipeline`] only invokes this helper when its
1657 /// `audit` field is `Some`, so callers that build the spec without
1658 /// populating `audit` get the disabled-by-default behavior for free.
1659 ///
1660 /// Slots in after the test suite (and any fixer dispatches) passes and
1661 /// before the per-phase commit. The runner stages the implementer's code
1662 /// changes so it can hand the resulting `git diff --cached` to the auditor
1663 /// as input; if the staged diff is empty (e.g. the implementer only edited
1664 /// planning artifacts) the audit is skipped. The auditor may inline small
1665 /// fixes or extend `deferred.md`; either way the post-dispatch validation
1666 /// re-parses `deferred.md` and re-runs the project tests since the auditor
1667 /// may have edited code. A test failure post-audit halts the phase rather
1668 /// than re-entering the fixer loop — auditor edits are scoped enough that
1669 /// breaking the build deserves operator attention.
1670 async fn run_auditor_pass(
1671 &mut self,
1672 audit: AuditKind<'_>,
1673 test_runner: Option<&project_tests::TestRunner>,
1674 plan_path: &Path,
1675 deferred_path: &Path,
1676 exclude: &[&Path],
1677 phase_id: &PhaseId,
1678 ) -> Result<AuditPassResult> {
1679 // Stage so we can sample the diff. `stage_changes` is idempotent so
1680 // calling it again post-audit picks up anything the auditor adds.
1681 self.git
1682 .stage_changes(exclude)
1683 .await
1684 .context("runner: staging for audit diff")?;
1685 let diff = self
1686 .git
1687 .staged_diff()
1688 .await
1689 .context("runner: capturing staged diff for auditor")?;
1690
1691 let kind = match &audit {
1692 AuditKind::Phase { .. } => AuditContextKind::Phase,
1693 AuditKind::Sweep { .. } => AuditContextKind::Sweep,
1694 };
1695 let context = AuditContext {
1696 phase_id: phase_id.clone(),
1697 kind,
1698 };
1699
1700 if diff.trim().is_empty() {
1701 let _ = self.events_tx.send(Event::AuditorSkippedNoChanges {
1702 context: context.clone(),
1703 });
1704 return Ok(AuditPassResult::Continue);
1705 }
1706
1707 if let Some(reason) = self.check_budget() {
1708 return Ok(AuditPassResult::Halted(reason));
1709 }
1710
1711 let total_attempt = self.bump_attempts(phase_id);
1712 let _ = self.events_tx.send(Event::AuditorStarted {
1713 context: context.clone(),
1714 attempt: total_attempt,
1715 });
1716
1717 let (user_prompt, log_path) = match audit {
1718 AuditKind::Phase { phase } => (
1719 prompts::auditor_with_deferred(
1720 &self.plan,
1721 phase,
1722 &diff,
1723 &self.deferred,
1724 self.config.audit.small_fix_line_limit,
1725 ),
1726 // Phase auditor only ever runs once per phase, so the per-role
1727 // attempt counter in the log filename stays at 1; the global
1728 // `attempt` counter still bumps so [`RunState::attempts`]
1729 // reflects the spend.
1730 self.attempt_log_path(phase_id, "audit", 1),
1731 ),
1732 AuditKind::Sweep {
1733 after,
1734 resolved,
1735 remaining,
1736 } => {
1737 let stale = self.stale_items();
1738 (
1739 prompts::sweep_auditor(prompts::SweepAuditorPrompt {
1740 plan: &self.plan,
1741 deferred: &self.deferred,
1742 after: &after,
1743 diff: &diff,
1744 resolved: &resolved,
1745 remaining: &remaining,
1746 stale_items: &stale,
1747 small_fix_line_limit: self.config.audit.small_fix_line_limit,
1748 }),
1749 // Sweep audits get a sweep-prefix log path so an operator
1750 // scanning `.pitboss/play/logs/` can tell sweep audits
1751 // apart from regular phase audits at a glance.
1752 self.sweep_log_path(&after, "audit", 1),
1753 )
1754 }
1755 };
1756 let request = AgentRequest {
1757 role: Role::Auditor,
1758 model: self.config.models.auditor.clone(),
1759 system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1760 user_prompt,
1761 workdir: self.workspace.clone(),
1762 log_path,
1763 timeout: DEFAULT_AGENT_TIMEOUT,
1764 env: std::collections::HashMap::new(),
1765 };
1766
1767 match self
1768 .dispatch_and_validate(request, Role::Auditor, plan_path, deferred_path)
1769 .await?
1770 {
1771 ValidationResult::Continue => {}
1772 ValidationResult::Halt(reason) => return Ok(AuditPassResult::Halted(reason)),
1773 }
1774
1775 if let Some(test_runner) = test_runner {
1776 let outcome = self
1777 .run_tests(test_runner, phase_id, "tests", total_attempt)
1778 .await?;
1779 if !outcome.passed {
1780 return Ok(AuditPassResult::Halted(HaltReason::TestsFailed(
1781 outcome.summary,
1782 )));
1783 }
1784 }
1785
1786 Ok(AuditPassResult::Continue)
1787 }
1788
1789 /// Run the per-item staleness bookkeeping after a sweep dispatch, success
1790 /// or halt. Reads `post_unchecked_texts` straight from `self.deferred`
1791 /// (after restoration, on halt paths) so the same code path serves every
1792 /// sweep exit. Emits [`Event::DeferredItemStale`] for items whose counter
1793 /// just crossed [`crate::config::SweepConfig::escalate_after`] (transition
1794 /// only — items already at or above the threshold do not re-emit).
1795 fn apply_sweep_staleness(&mut self, pre_texts: &HashSet<String>) {
1796 let post_unchecked_texts: HashSet<String> = self
1797 .deferred
1798 .items
1799 .iter()
1800 .filter(|i| !i.done)
1801 .map(|i| i.text.clone())
1802 .collect();
1803 let crossed = sweep::update_sweep_staleness(
1804 &mut self.state.deferred_item_attempts,
1805 pre_texts,
1806 &post_unchecked_texts,
1807 self.config.sweep.escalate_after,
1808 );
1809 for (text, attempts) in crossed {
1810 let _ = self
1811 .events_tx
1812 .send(Event::DeferredItemStale { text, attempts });
1813 }
1814 }
1815
1816 fn fold_token_usage(&mut self, role: Role, dispatch: &AgentDispatch) {
1817 let tokens = &dispatch.outcome_tokens;
1818 self.state.token_usage.input += tokens.input;
1819 self.state.token_usage.output += tokens.output;
1820 let entry = self
1821 .state
1822 .token_usage
1823 .by_role
1824 .entry(role.as_str().to_string())
1825 .or_default();
1826 entry.input += tokens.input;
1827 entry.output += tokens.output;
1828 for (k, v) in &tokens.by_role {
1829 let e = self.state.token_usage.by_role.entry(k.clone()).or_default();
1830 e.input += v.input;
1831 e.output += v.output;
1832 }
1833 let _ = self
1834 .events_tx
1835 .send(Event::UsageUpdated(self.state.token_usage.clone()));
1836 }
1837
1838 fn restore_deferred(
1839 &self,
1840 deferred_path: &Path,
1841 pre_bytes: &[u8],
1842 existed: bool,
1843 ) -> Result<()> {
1844 if existed {
1845 write_atomic(deferred_path, pre_bytes).with_context(|| {
1846 format!(
1847 "runner: restoring {:?} from snapshot after parse failure",
1848 deferred_path
1849 )
1850 })?;
1851 } else if deferred_path.exists() {
1852 std::fs::remove_file(deferred_path).with_context(|| {
1853 format!(
1854 "runner: removing agent-created {:?} after parse failure",
1855 deferred_path
1856 )
1857 })?;
1858 }
1859 Ok(())
1860 }
1861
1862 async fn dispatch_agent(&self, request: AgentRequest) -> Result<AgentDispatch> {
1863 let role = request.role;
1864 let (mpsc_tx, mpsc_rx) = mpsc::channel(AGENT_EVENT_CHANNEL_CAPACITY);
1865 let cancel = CancellationToken::new();
1866 let events_tx = self.events_tx.clone();
1867
1868 let forward = tokio::spawn(forward_agent_events(mpsc_rx, events_tx));
1869
1870 let outcome = self
1871 .agent
1872 .run(request, mpsc_tx, cancel)
1873 .await
1874 .with_context(|| format!("runner: agent {:?} failed to run", self.agent.name()))?;
1875 let _ = forward.await;
1876
1877 Ok(AgentDispatch {
1878 stop_reason: outcome.stop_reason,
1879 outcome_tokens: outcome.tokens,
1880 _role: role,
1881 })
1882 }
1883}
1884
1885/// Clone `doc` and keep only the first `max` pending items (in document
1886/// order). Already-checked items are filtered out by the sweep prompt
1887/// renderer, so they're left alone here. `## Deferred phases` are preserved
1888/// verbatim — the sweep prompt forbids editing them and the auditor cross-
1889/// checks against the full file.
1890///
1891/// Used by [`Runner::run_standalone_sweep`] (and any future
1892/// `--max-items`-style truncation) to keep a pathological backlog within
1893/// the agent's effective context window without touching disk: the original
1894/// `deferred.md` is unchanged, so items not in the prompt view stay pending
1895/// for the next sweep.
1896fn clamp_pending_items(doc: &DeferredDoc, max: usize) -> DeferredDoc {
1897 let mut out = DeferredDoc {
1898 items: Vec::with_capacity(doc.items.len().min(max + 1)),
1899 phases: doc.phases.clone(),
1900 };
1901 let mut pending_kept = 0usize;
1902 for item in &doc.items {
1903 if !item.done {
1904 if pending_kept >= max {
1905 continue;
1906 }
1907 pending_kept += 1;
1908 }
1909 out.items.push(item.clone());
1910 }
1911 out
1912}
1913
1914/// Canonical serialization of just the `## Deferred phases` block of a
1915/// [`DeferredDoc`]. Used by the sweep step to detect whether the agent edited
1916/// any H3 entries — the sweep prompt forbids it, so any difference between the
1917/// pre- and post-dispatch hash trips a halt and a deferred.md rollback.
1918fn phases_block_canonical(doc: &DeferredDoc) -> String {
1919 let phases_only = DeferredDoc {
1920 items: Vec::new(),
1921 phases: doc.phases.clone(),
1922 };
1923 deferred::serialize(&phases_only)
1924}
1925
1926/// Snapshot of the agent dispatch the runner needs after the call returns.
1927struct AgentDispatch {
1928 stop_reason: StopReason,
1929 outcome_tokens: crate::state::TokenUsage,
1930 _role: Role,
1931}
1932
1933/// Outcome of [`Runner::dispatch_and_validate`]. Either continue with the
1934/// post-dispatch flow, or short-circuit with a halt reason already populated
1935/// (snapshots restored, tokens folded).
1936enum ValidationResult {
1937 Continue,
1938 Halt(HaltReason),
1939}
1940
1941/// Outcome of [`Runner::run_fixer_loop`]. `Passed` means a fixer attempt
1942/// produced a passing test run; `Halted` carries the reason — either an agent
1943/// failure during a fixer dispatch, a planning-artifact tamper, or budget
1944/// exhaustion (in which case the variant is [`HaltReason::TestsFailed`]).
1945enum FixerLoopResult {
1946 Passed,
1947 Halted(HaltReason),
1948}
1949
1950/// Outcome of [`Runner::run_auditor_pass`]. `Continue` covers all the keep-
1951/// going cases (no diff to audit, audit ran and tests still pass); `Halted`
1952/// carries an agent-side failure, a planning-artifact tamper, or post-audit
1953/// test breakage. The "audit disabled" case is handled upstream by
1954/// [`Runner::run_dispatch_pipeline`] only invoking the auditor when the spec
1955/// carries an [`AuditKind`].
1956enum AuditPassResult {
1957 Continue,
1958 Halted(HaltReason),
1959}
1960
1961/// Inputs to [`Runner::run_dispatch_pipeline`]. The pipeline is the shared
1962/// dispatch → validate → tests → fixer → optional auditor → stage chain that
1963/// both the per-phase implementer dispatch (today) and the per-sweep
1964/// implementer dispatch (phase 03) hand off to. Lifetimes tie everything to
1965/// the caller's stack frame so the caller keeps ownership of the source
1966/// [`crate::plan::Phase`] and path buffers.
1967struct DispatchSpec<'a> {
1968 /// The pre-built agent request to dispatch first. The caller is
1969 /// responsible for having bumped [`RunState::attempts`] for `phase_id`
1970 /// before constructing this so the request's `log_path` reflects the
1971 /// dispatch's attempt number.
1972 request: AgentRequest,
1973 /// Phase id under which this dispatch is recorded — drives the fixer
1974 /// loop's attempt tracking, the auditor's "started" event, and the
1975 /// post-dispatch test log filename. For phase-driven dispatches this is
1976 /// the current phase. For sweep dispatches (phase 03) this is the most
1977 /// recently completed `after_phase`, since `state.attempts` keys on real
1978 /// phase ids.
1979 phase_id: PhaseId,
1980 /// The current phase, when there is one. The fixer loop uses this to
1981 /// render `prompts::fixer_with_deferred`; sweep dispatches pass `None` to
1982 /// fall back to `prompts::fixer_for_sweep`.
1983 phase: Option<&'a crate::plan::Phase>,
1984 /// Path to `plan.md` for snapshot validation.
1985 plan_path: &'a Path,
1986 /// Path to `deferred.md` for snapshot + parse validation.
1987 deferred_path: &'a Path,
1988 /// Paths to exclude from `git add` so planning artifacts under
1989 /// `.pitboss/` never end up in the per-phase commit.
1990 exclude_paths: &'a [&'a Path],
1991 /// Whether to run the auditor pass after tests pass. `None` skips the
1992 /// auditor entirely; `Some(kind)` selects the prompt variant.
1993 audit: Option<AuditKind<'a>>,
1994}
1995
1996/// Selects the auditor prompt variant.
1997///
1998/// `Phase` runs the regular auditor against an implementer diff for a plan
1999/// phase. `Sweep` runs the sweep-specific auditor against a deferred-sweep
2000/// dispatch; its contract is "for each item the implementer marked `- [x]`,
2001/// does the diff actually do that work? revert anything unrelated."
2002enum AuditKind<'a> {
2003 /// Audit a regular plan-phase implementer dispatch. Renders
2004 /// [`crate::prompts::auditor_with_deferred`] for the carried phase.
2005 Phase {
2006 /// The phase whose implementer diff is under review.
2007 phase: &'a crate::plan::Phase,
2008 },
2009 /// Audit a deferred-sweep dispatch. Renders [`crate::prompts::sweep_auditor`]
2010 /// with the sweep's resolved / remaining item lists threaded through.
2011 ///
2012 /// Both lists are computed by the sweep call site after the implementer
2013 /// dispatch has updated `self.deferred`; the auditor pass just reads them.
2014 /// See [`Runner::run_dispatch_through_fixer`] /
2015 /// [`Runner::run_audit_and_stage`] for the split-pipeline shape that
2016 /// makes this possible.
2017 Sweep {
2018 /// Most recently completed plan phase the sweep fired after. Becomes
2019 /// the `{after}` substitution and selects the sweep-prefix log path.
2020 after: PhaseId,
2021 /// Items the implementer flipped from `- [ ]` to `- [x]` during the
2022 /// sweep dispatch (intersected with the pre-sweep unchecked snapshot
2023 /// so a reworded item doesn't get miscounted).
2024 resolved: Vec<String>,
2025 /// Items still unchecked in `deferred.md` after the sweep dispatch.
2026 remaining: Vec<String>,
2027 },
2028}
2029
2030/// Outcome of [`Runner::run_dispatch_through_fixer`]. The first half of the
2031/// pipeline either halts (validation, fixer-budget, or budget exhaustion) or
2032/// continues and hands back the test runner it resolved so the second half
2033/// ([`Runner::run_audit_and_stage`]) can re-use it for the post-audit re-run.
2034enum DispatchOutcome {
2035 Halted(HaltReason),
2036 Continue {
2037 /// Test runner detected for this dispatch (or `None` when tests were
2038 /// skipped or no runner was configured). Re-used by the auditor pass
2039 /// so a single dispatch only probes for tests once.
2040 test_runner: Option<project_tests::TestRunner>,
2041 },
2042}
2043
2044/// Outcome of [`Runner::run_dispatch_pipeline`]. The pipeline never commits;
2045/// `Staged { has_changes }` hands back whether the staged index is non-empty
2046/// so the caller (today: [`Runner::run_phase_inner`]) can decide whether to
2047/// commit.
2048enum PipelineOutcome {
2049 Halted(HaltReason),
2050 Staged {
2051 /// `true` when `git.has_staged_changes()` returned true after the
2052 /// final stage call — i.e. the dispatch (and any auditor edits)
2053 /// produced code outside `exclude_paths`. `false` when the only
2054 /// changes were inside `.pitboss/` and got excluded.
2055 has_changes: bool,
2056 },
2057}
2058
2059async fn forward_agent_events(mut rx: mpsc::Receiver<AgentEvent>, tx: broadcast::Sender<Event>) {
2060 while let Some(ev) = rx.recv().await {
2061 match ev {
2062 AgentEvent::Stdout(line) => {
2063 let _ = tx.send(Event::AgentStdout(line));
2064 }
2065 AgentEvent::Stderr(line) => {
2066 let _ = tx.send(Event::AgentStderr(line));
2067 }
2068 AgentEvent::ToolUse(name) => {
2069 let _ = tx.send(Event::AgentToolUse(name));
2070 }
2071 AgentEvent::TokenDelta(_) => {
2072 // Token deltas are folded into [`RunState::token_usage`] from
2073 // the final outcome, not the stream — folding here would
2074 // double-count any agent that emits both intermediate deltas
2075 // and a totals report.
2076 }
2077 }
2078 }
2079}
2080
2081/// Build a fresh [`RunState`] for a workspace that has not started a run yet.
2082///
2083/// `now` is the timestamp used to derive both the run id and the per-run
2084/// branch (`<config.git.branch_prefix><utc_timestamp>`). Keeping the timestamp
2085/// explicit makes startup deterministic in tests.
2086pub fn fresh_run_state(plan: &Plan, config: &Config, now: chrono::DateTime<Utc>) -> RunState {
2087 let run_id = now.format("%Y%m%dT%H%M%SZ").to_string();
2088 let branch = git::branch_name(&config.git.branch_prefix, now);
2089 let mut s = RunState::new(run_id, branch, plan.current_phase.clone());
2090 s.started_at = now;
2091 s
2092}
2093
2094/// Subscribe to a runner's event stream and print a human-readable line per
2095/// event to stderr until a terminal event arrives or the channel closes.
2096///
2097/// This is the "no TUI" CLI experience: progress is rendered via plain
2098/// `tracing::info`-style lines so log piping and CI logs work out of the box.
2099///
2100/// The runner emits [`Event::RunFinished`] on the success path and
2101/// [`Event::PhaseHalted`] on the halt path; this function logs the terminal
2102/// event and then returns. Otherwise the broadcast channel would block
2103/// forever after `Runner::run()` returns, because the runner itself keeps
2104/// holding the [`broadcast::Sender`] for post-run lookups.
2105pub async fn log_events(mut rx: broadcast::Receiver<Event>) {
2106 use broadcast::error::RecvError;
2107 loop {
2108 match rx.recv().await {
2109 Ok(event) => {
2110 let terminal = matches!(event, Event::RunFinished | Event::PhaseHalted { .. });
2111 log_event_line(&event);
2112 if terminal {
2113 return;
2114 }
2115 }
2116 Err(RecvError::Closed) => return,
2117 Err(RecvError::Lagged(n)) => {
2118 eprintln!("[pitboss] (logger lagged: dropped {n} events)");
2119 }
2120 }
2121 }
2122}
2123
2124fn log_event_line(event: &Event) {
2125 use crate::style::{self, col};
2126 let c = style::use_color_stderr();
2127
2128 let fm = col(c, style::BOLD_CYAN, "[pitboss]");
2129
2130 match event {
2131 Event::PhaseStarted {
2132 phase_id,
2133 title,
2134 attempt,
2135 } => {
2136 let rule = col(c, style::DARK_GRAY, &"─".repeat(60));
2137 if c {
2138 eprintln!("{rule}");
2139 }
2140 eprintln!(
2141 "{} {}",
2142 col(c, style::BOLD_CYAN, "[pitboss]"),
2143 col(
2144 c,
2145 style::BOLD_WHITE,
2146 &format!("phase {phase_id} ({title}), attempt {attempt}")
2147 )
2148 );
2149 if c {
2150 eprintln!("{rule}");
2151 }
2152 }
2153 Event::FixerStarted {
2154 phase_id,
2155 fixer_attempt,
2156 attempt,
2157 } => {
2158 eprintln!(
2159 "{fm} {}",
2160 col(
2161 c,
2162 style::YELLOW,
2163 &format!(
2164 "phase {phase_id} fixer attempt {fixer_attempt} (total dispatch {attempt})"
2165 )
2166 )
2167 );
2168 }
2169 Event::AuditorStarted { context, attempt } => {
2170 let label = match context.kind {
2171 AuditContextKind::Phase => format!(
2172 "phase {} auditor (total dispatch {attempt})",
2173 context.phase_id
2174 ),
2175 AuditContextKind::Sweep => format!(
2176 "sweep after phase {} auditor (total dispatch {attempt})",
2177 context.phase_id
2178 ),
2179 };
2180 eprintln!("{fm} {}", col(c, style::BLUE, &label));
2181 }
2182 Event::AuditorSkippedNoChanges { context } => {
2183 let label = match context.kind {
2184 AuditContextKind::Phase => format!(
2185 "phase {} auditor skipped: no code changes to audit",
2186 context.phase_id
2187 ),
2188 AuditContextKind::Sweep => format!(
2189 "sweep after phase {} auditor skipped: no code changes to audit",
2190 context.phase_id
2191 ),
2192 };
2193 eprintln!("{fm} {}", col(c, style::DIM, &label));
2194 }
2195 Event::AgentStdout(line) => {
2196 eprintln!("{} {line}", col(c, style::DIM, "[agent]"));
2197 }
2198 Event::AgentStderr(line) => {
2199 eprintln!(
2200 "{} {}",
2201 col(c, style::RED, "[agent:err]"),
2202 col(c, style::RED, line)
2203 );
2204 }
2205 Event::AgentToolUse(name) => {
2206 eprintln!(
2207 "{}",
2208 col(c, style::DARK_GRAY, &format!("[agent:tool] {name}"))
2209 );
2210 }
2211 Event::TestStarted => {
2212 eprintln!("{fm} {}", col(c, style::MAGENTA, "running tests"));
2213 }
2214 Event::TestFinished { passed, summary } => {
2215 if *passed {
2216 eprintln!(
2217 "{fm} {}",
2218 col(c, style::BOLD_GREEN, &format!("tests passed: {summary}"))
2219 );
2220 } else {
2221 eprintln!(
2222 "{fm} {}",
2223 col(c, style::BOLD_RED, &format!("tests failed: {summary}"))
2224 );
2225 }
2226 }
2227 Event::TestsSkipped => {
2228 eprintln!(
2229 "{fm} {}",
2230 col(c, style::DIM, "no test runner detected; skipping")
2231 );
2232 }
2233 Event::PhaseCommitted {
2234 phase_id,
2235 commit: Some(hash),
2236 } => {
2237 eprintln!(
2238 "{fm} {}",
2239 col(
2240 c,
2241 style::GREEN,
2242 &format!("phase {phase_id} committed: {hash}")
2243 )
2244 );
2245 }
2246 Event::PhaseCommitted {
2247 phase_id,
2248 commit: None,
2249 } => {
2250 eprintln!(
2251 "{fm} {}",
2252 col(
2253 c,
2254 style::DIM,
2255 &format!("phase {phase_id} produced no code changes; no commit")
2256 )
2257 );
2258 }
2259 Event::PhaseHalted { phase_id, reason } => {
2260 eprintln!(
2261 "{} {}",
2262 col(c, style::BOLD_RED, "[pitboss]"),
2263 col(
2264 c,
2265 style::BOLD_RED,
2266 &format!("phase {phase_id} halted: {reason}")
2267 )
2268 );
2269 }
2270 Event::RunFinished => {
2271 let rule = col(c, style::BOLD_GREEN, &"─".repeat(60));
2272 if c {
2273 eprintln!("{rule}");
2274 }
2275 eprintln!("{}", col(c, style::BOLD_GREEN, "[pitboss] run finished"));
2276 if c {
2277 eprintln!("{rule}");
2278 }
2279 }
2280 Event::UsageUpdated(_) => {
2281 // Snapshot consumed by the TUI; the plain logger doesn't surface
2282 // running token totals to keep stderr clean.
2283 }
2284 Event::SweepStarted {
2285 after,
2286 items_pending,
2287 attempt,
2288 } => {
2289 eprintln!(
2290 "{fm} {}",
2291 col(
2292 c,
2293 style::BOLD_CYAN,
2294 &format!(
2295 "sweep after phase {after} ({items_pending} pending, total dispatch {attempt})"
2296 )
2297 )
2298 );
2299 }
2300 Event::SweepCompleted {
2301 after,
2302 resolved,
2303 commit: Some(hash),
2304 } => {
2305 eprintln!(
2306 "{fm} {}",
2307 col(
2308 c,
2309 style::GREEN,
2310 &format!("sweep after phase {after} committed: {resolved} resolved ({hash})")
2311 )
2312 );
2313 }
2314 Event::SweepCompleted {
2315 after,
2316 resolved,
2317 commit: None,
2318 } => {
2319 eprintln!(
2320 "{fm} {}",
2321 col(
2322 c,
2323 style::DIM,
2324 &format!(
2325 "sweep after phase {after}: {resolved} resolved; no code changes to commit"
2326 )
2327 )
2328 );
2329 }
2330 Event::SweepHalted { after, reason } => {
2331 eprintln!(
2332 "{} {}",
2333 col(c, style::BOLD_RED, "[pitboss]"),
2334 col(
2335 c,
2336 style::BOLD_RED,
2337 &format!("sweep after phase {after} halted: {reason}")
2338 )
2339 );
2340 }
2341 Event::DeferredItemStale { text, attempts } => {
2342 eprintln!(
2343 "{fm} {}",
2344 col(
2345 c,
2346 style::BOLD_YELLOW,
2347 &format!(
2348 "deferred item stale ({attempts} sweep attempts; needs human attention): {text}"
2349 )
2350 )
2351 );
2352 }
2353 }
2354}
2355
2356#[cfg(test)]
2357mod tests {
2358 use super::*;
2359
2360 fn pid(s: &str) -> PhaseId {
2361 PhaseId::parse(s).unwrap()
2362 }
2363
2364 #[test]
2365 fn fresh_run_state_uses_branch_prefix_and_timestamp() {
2366 let plan = Plan::new(
2367 pid("01"),
2368 vec![crate::plan::Phase {
2369 id: pid("01"),
2370 title: "First".into(),
2371 body: String::new(),
2372 }],
2373 );
2374 let cfg = Config::default();
2375 let now = chrono::DateTime::parse_from_rfc3339("2026-04-29T14:30:22Z")
2376 .unwrap()
2377 .with_timezone(&Utc);
2378 let state = fresh_run_state(&plan, &cfg, now);
2379 assert_eq!(state.run_id, "20260429T143022Z");
2380 assert_eq!(state.branch, "pitboss/play/20260429T143022Z");
2381 assert_eq!(state.started_phase, pid("01"));
2382 assert_eq!(state.started_at, now);
2383 assert!(state.completed.is_empty());
2384 }
2385
2386 #[test]
2387 fn halt_reason_display_summaries_are_human_readable() {
2388 assert_eq!(
2389 HaltReason::PlanTampered.to_string(),
2390 "plan.md was modified by the agent"
2391 );
2392 assert!(HaltReason::DeferredInvalid("bad".into())
2393 .to_string()
2394 .contains("deferred.md"));
2395 assert!(HaltReason::TestsFailed("nope".into())
2396 .to_string()
2397 .contains("tests failed"));
2398 assert!(HaltReason::AgentFailure("boom".into())
2399 .to_string()
2400 .contains("boom"));
2401 }
2402}