Skip to main content

pitboss/runner/
mod.rs

1//! Orchestration loop and event channel.
2//!
3//! The runner owns the per-phase state machine. It snapshots the planning
4//! artifacts, dispatches the implementer agent, validates the agent's output,
5//! runs the project tests, and lands a per-phase commit. Every observable
6//! transition is broadcast on a [`tokio::sync::broadcast`] channel so the CLI
7//! logger and the (later) TUI can subscribe without changing the runner.
8//!
9//! Phase 12 wired the implementer-only flow: agent → validate → tests → commit.
10//! Phase 13 layers a bounded fixer loop on top: when the project tests fail,
11//! the runner dispatches the fixer agent up to
12//! [`crate::config::RetryBudgets::fixer_max_attempts`] times, re-running tests
13//! after each attempt, before halting. Phase 14 inserts the auditor pass
14//! between the (passing) test run and the per-phase commit: when
15//! [`crate::config::AuditConfig::enabled`] is on the runner stages the
16//! implementer's diff, hands it to the auditor agent, re-validates the
17//! planning artifacts, and re-runs the tests before letting the commit land.
18
19pub mod sweep;
20
21use std::collections::HashSet;
22use std::path::{Path, PathBuf};
23use std::time::Duration;
24
25use anyhow::{anyhow, Context, Result};
26use chrono::Utc;
27use tokio::sync::{broadcast, mpsc};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, warn};
30
31use crate::agent::{Agent, AgentEvent, AgentRequest, Role, StopReason};
32use crate::config::Config;
33use crate::deferred::{self, DeferredDoc};
34use crate::git::{self, CommitId, Git};
35use crate::plan::{self, PhaseId, Plan, Snapshot};
36use crate::prompts;
37use crate::state::{self, RunState, TokenUsage};
38use crate::tests as project_tests;
39use crate::util::{paths, write_atomic};
40
41/// Default agent wall-clock cap. Conservative so a stuck agent does not strand
42/// a run for an unbounded time; phase 18 makes this configurable.
43const DEFAULT_AGENT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
44
45/// Capacity of the broadcast channel events fan out on. Enough that a slow
46/// subscriber falls behind by hundreds of events before lagging; sends are
47/// best-effort so a slow subscriber never blocks the runner.
48pub const EVENT_CHANNEL_CAPACITY: usize = 256;
49
50/// Per-dispatch capacity of the mpsc channel between the agent and the
51/// runner's forwarder task. Bounded to apply backpressure on a misbehaving
52/// agent that floods events.
53const AGENT_EVENT_CHANNEL_CAPACITY: usize = 64;
54
55/// Cap on the number of stale `## Deferred items` entries fed into the sweep
56/// and sweep-auditor prompts. Past this, the prompt would balloon without
57/// adding value: the agent can only meaningfully address a handful of
58/// "high-stakes" items per dispatch, and a runaway map (hundreds of stuck
59/// items) almost always means the staleness signal needs operator attention,
60/// not more agent passes.
61pub const STALE_ITEMS_PROMPT_CAP: usize = 10;
62
63/// Cap on the number of stale items each operator-facing surface (the TUI
64/// stale panel, `pitboss status`) renders before truncating with a
65/// `+N more` footer. Lower than [`STALE_ITEMS_PROMPT_CAP`] because vertical
66/// real estate on the operator's screen is tighter than the auditor's
67/// prompt budget.
68pub const STALE_ITEMS_DISPLAY_CAP: usize = 5;
69
70/// Why the runner stopped advancing the plan.
71///
72/// Each variant carries enough context for the CLI logger (and the eventual
73/// TUI) to render a useful single-line message without needing to re-read
74/// log files.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub enum HaltReason {
77    /// The agent modified `plan.md`. The runner restored from the pre-agent
78    /// snapshot before halting.
79    PlanTampered,
80    /// The agent left `deferred.md` in an unparsable state. The runner
81    /// restored from the pre-agent snapshot before halting. The string is
82    /// the parser's diagnostic.
83    DeferredInvalid(String),
84    /// The project's test suite failed. Holds the short summary captured by
85    /// [`crate::tests::TestRunner::run`].
86    TestsFailed(String),
87    /// The agent exited via timeout, cancellation, or an internal error.
88    AgentFailure(String),
89    /// A configured budget was exhausted before the next agent dispatch
90    /// could fire. Carries a human-readable summary (`token budget exceeded:
91    /// 105000 >= cap 100000`, `USD budget exceeded: $5.0234 >= cap $5.0000`).
92    BudgetExceeded(String),
93}
94
95impl std::fmt::Display for HaltReason {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            HaltReason::PlanTampered => f.write_str("plan.md was modified by the agent"),
99            HaltReason::DeferredInvalid(msg) => write!(f, "deferred.md is invalid: {msg}"),
100            HaltReason::TestsFailed(summary) => write!(f, "tests failed: {summary}"),
101            HaltReason::AgentFailure(msg) => write!(f, "agent failure: {msg}"),
102            HaltReason::BudgetExceeded(msg) => write!(f, "budget exceeded: {msg}"),
103        }
104    }
105}
106
107/// Compute `(total_tokens, total_usd)` for a [`TokenUsage`] under the supplied
108/// [`Config`].
109///
110/// `total_tokens` is the simple `input + output` sum from the top-level
111/// counter — that's the figure compared against
112/// [`crate::config::Budgets::max_total_tokens`].
113///
114/// `total_usd` walks each role in [`crate::config::ModelRoles`], looks up the
115/// per-role tokens in `usage.by_role`, and prices them against
116/// [`crate::config::Budgets::pricing`] keyed by the role's configured model
117/// id. Roles whose model is missing from the pricing table contribute zero —
118/// the token-count budget still applies. Agent-supplied `by_role` keys that
119/// don't match a configured role are ignored here, since we have no model
120/// assignment to price them under.
121pub fn budget_totals(config: &Config, usage: &TokenUsage) -> (u64, f64) {
122    let total_tokens = usage.input.saturating_add(usage.output);
123    let mut total_usd = 0.0;
124    let role_models: [(&str, &str); 4] = [
125        ("planner", config.models.planner.as_str()),
126        ("implementer", config.models.implementer.as_str()),
127        ("auditor", config.models.auditor.as_str()),
128        ("fixer", config.models.fixer.as_str()),
129    ];
130    for (role_key, model) in role_models {
131        let Some(role_usage) = usage.by_role.get(role_key) else {
132            continue;
133        };
134        let Some(price) = config.budgets.pricing.get(model) else {
135            continue;
136        };
137        total_usd += price.cost_usd(role_usage.input, role_usage.output);
138    }
139    (total_tokens, total_usd)
140}
141
142/// Discriminator on [`AuditContext`]. Tells subscribers whether the audit
143/// firing belongs to a regular plan phase or a deferred sweep so the TUI and
144/// loggers can render the right header text without inspecting state.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum AuditContextKind {
147    /// Audit pass for a regular plan-phase implementer dispatch.
148    Phase,
149    /// Audit pass for a deferred-sweep dispatch.
150    Sweep,
151}
152
153/// Payload threaded through the auditor events. Carries the phase id under
154/// which the audit's attempts are accounted plus the kind discriminator. Sweep
155/// audits set `phase_id` to the most recently completed real phase (the same
156/// id [`Event::SweepStarted`] uses) so the running attempts counter still keys
157/// on a real phase id.
158#[derive(Debug, Clone)]
159pub struct AuditContext {
160    /// Phase id under which the audit's attempts are tallied.
161    pub phase_id: PhaseId,
162    /// Whether this is a phase audit or a sweep audit.
163    pub kind: AuditContextKind,
164}
165
166/// Streaming events the runner broadcasts to subscribers. Sends are
167/// best-effort: a lagging or absent subscriber never blocks the runner.
168#[derive(Debug, Clone)]
169#[cfg_attr(test, derive(strum::EnumDiscriminants))]
170#[cfg_attr(
171    test,
172    strum_discriminants(name(EventDiscriminants), derive(strum::EnumIter, Hash))
173)]
174pub enum Event {
175    /// A phase began. Emitted once per implementer dispatch — fixer
176    /// re-dispatches inside the same phase emit [`Event::FixerStarted`] instead
177    /// so subscribers can distinguish them.
178    PhaseStarted {
179        /// Phase being entered.
180        phase_id: PhaseId,
181        /// Phase title from the heading.
182        title: String,
183        /// 1-based total agent dispatch counter at this phase, mirrored into
184        /// [`crate::state::RunState::attempts`]. Stays at `1` for the
185        /// implementer dispatch that fires this event.
186        attempt: u32,
187    },
188    /// The runner dispatched the fixer agent after a test failure.
189    FixerStarted {
190        /// Phase the fixer is operating on.
191        phase_id: PhaseId,
192        /// 1-based fixer attempt within this phase
193        /// (`1..=fixer_max_attempts`).
194        fixer_attempt: u32,
195        /// Total agent-dispatch counter at this phase (mirrors
196        /// [`crate::state::RunState::attempts`]).
197        attempt: u32,
198    },
199    /// The runner dispatched the auditor agent after the test suite passed.
200    /// Fires at most once per phase or sweep, and only when the relevant
201    /// audit toggle ([`crate::config::AuditConfig::enabled`] for phases,
202    /// [`crate::config::SweepConfig::audit_enabled`] for sweeps) is on and
203    /// the dispatch produced staged code changes. `context.kind` discriminates
204    /// the two so the TUI can render the right header text.
205    AuditorStarted {
206        /// Audit-pass context: phase id + whether this is a phase or sweep
207        /// audit. Phase id is exposed via [`AuditContext::phase_id`] so plain
208        /// log consumers don't need to match on the kind.
209        context: AuditContext,
210        /// Total agent-dispatch counter at this phase (mirrors
211        /// [`crate::state::RunState::attempts`]).
212        attempt: u32,
213    },
214    /// The auditor dispatched without finding code changes worth auditing
215    /// (the index was empty after staging excluded paths).
216    AuditorSkippedNoChanges {
217        /// Audit-pass context: phase id + whether this is a phase or sweep
218        /// audit.
219        context: AuditContext,
220    },
221    /// One line of agent stdout.
222    AgentStdout(String),
223    /// One line of agent stderr.
224    AgentStderr(String),
225    /// Agent invoked a tool. Carries the tool name.
226    AgentToolUse(String),
227    /// The runner began running the project test suite.
228    TestStarted,
229    /// The test suite finished with the carried summary.
230    TestFinished {
231        /// Whether the run exited zero.
232        passed: bool,
233        /// Short summary suitable for inline display.
234        summary: String,
235    },
236    /// The runner skipped tests because no runner was detected and no
237    /// `[tests] command = "..."` override was configured.
238    TestsSkipped,
239    /// A phase's code changes were committed (or skipped because the only
240    /// changes were to excluded planning artifacts).
241    PhaseCommitted {
242        /// Phase that completed.
243        phase_id: PhaseId,
244        /// Resulting commit, or `None` when only excluded paths changed.
245        commit: Option<CommitId>,
246    },
247    /// The runner stopped without advancing.
248    PhaseHalted {
249        /// Phase that halted.
250        phase_id: PhaseId,
251        /// Why the runner halted.
252        reason: HaltReason,
253    },
254    /// The runner advanced past the final phase. No further phases remain.
255    RunFinished,
256    /// Aggregated token usage was updated after an agent dispatch finished.
257    /// Carries a snapshot of [`crate::state::RunState::token_usage`] so
258    /// subscribers can show running cost / token totals without needing a
259    /// reference to the runner state.
260    UsageUpdated(crate::state::TokenUsage),
261    /// A deferred-sweep dispatch began. Fires once per sweep step, between
262    /// the [`Event::PhaseCommitted`] of the preceding phase and the
263    /// [`Event::PhaseStarted`] of the next regular phase. The sweep does not
264    /// introduce a synthetic phase id — `after` is the most recently completed
265    /// phase under which the sweep's attempts are accounted.
266    SweepStarted {
267        /// Phase the sweep is firing after.
268        after: PhaseId,
269        /// Unchecked-item count observed before the sweep dispatched.
270        items_pending: usize,
271        /// Total agent-dispatch counter under `after` after the sweep's
272        /// implementer dispatch is recorded.
273        attempt: u32,
274    },
275    /// A deferred-sweep dispatch finished and its commit (or empty diff) has
276    /// landed. `commit` is `None` when the sweep produced no code changes.
277    SweepCompleted {
278        /// Phase the sweep fired after.
279        after: PhaseId,
280        /// Number of `## Deferred items` entries the sweep flipped from
281        /// `- [ ]` to `- [x]` (and which the post-sweep `DeferredDoc::sweep`
282        /// then dropped). New items the agent appended don't pollute this
283        /// count.
284        resolved: usize,
285        /// Resulting commit, or `None` for the no-code-changes case.
286        commit: Option<CommitId>,
287    },
288    /// A deferred-sweep dispatch halted before it could land a commit. Mirrors
289    /// [`Event::PhaseHalted`] for the sweep entry point so subscribers can
290    /// distinguish "phase failed" from "sweep failed."
291    SweepHalted {
292        /// Phase the sweep fired after.
293        after: PhaseId,
294        /// Why the sweep halted.
295        reason: HaltReason,
296    },
297    /// A `## Deferred items` entry's per-sweep attempt counter just crossed
298    /// [`crate::config::SweepConfig::escalate_after`]. Transition-only — the
299    /// next sweep that increments the same item's counter further does *not*
300    /// re-emit, so subscribers (the activity log, [`pitboss status`], the TUI)
301    /// can treat each occurrence as a fresh "needs human attention" signal.
302    DeferredItemStale {
303        /// The item text from `deferred.md` (the bytes after `- [ ]`).
304        text: String,
305        /// The new attempt count, equal to or greater than `escalate_after`.
306        attempts: u32,
307    },
308}
309
310/// Outcome of [`Runner::run_phase`].
311#[derive(Debug, Clone)]
312pub enum PhaseResult {
313    /// Phase completed and the runner advanced. `commit` is `None` when the
314    /// agent only modified excluded paths.
315    Advanced {
316        /// Phase that just completed.
317        phase_id: PhaseId,
318        /// Phase the runner advanced to, or `None` if no phases remain.
319        next_phase: Option<PhaseId>,
320        /// Resulting commit, or `None` for the excluded-only case.
321        commit: Option<CommitId>,
322    },
323    /// Runner halted; no phase advance.
324    Halted {
325        /// Phase that was active when the halt fired.
326        phase_id: PhaseId,
327        /// Why the halt fired.
328        reason: HaltReason,
329    },
330}
331
332/// Outcome of [`Runner::run`].
333#[derive(Debug, Clone)]
334pub enum RunSummary {
335    /// All phases completed.
336    Finished,
337    /// The run halted at the carried phase for the carried reason.
338    Halted {
339        /// Phase that halted.
340        phase_id: PhaseId,
341        /// Why the halt fired.
342        reason: HaltReason,
343    },
344}
345
346/// Per-phase orchestrator.
347///
348/// One `Runner` drives a single workspace through its plan. Construct with
349/// [`Runner::new`], subscribe one or more receivers via [`Runner::subscribe`],
350/// then call [`Runner::run`] (or [`Runner::run_phase`] for tests).
351pub struct Runner<A: Agent, G: Git> {
352    workspace: PathBuf,
353    config: Config,
354    plan: Plan,
355    deferred: DeferredDoc,
356    state: RunState,
357    agent: A,
358    git: G,
359    events_tx: broadcast::Sender<Event>,
360    /// When `true`, [`Runner::run_phase`] skips test detection and execution.
361    /// Set by `pitboss play --dry-run`, which dispatches the no-op
362    /// [`crate::agent::dry_run::DryRunAgent`]: since the agent never modifies
363    /// the working tree, running tests would only re-confirm whatever the
364    /// pre-run state was and risk halting the dry-run on a flaky suite.
365    skip_tests: bool,
366    /// Operator-supplied override for the deferred-sweep gate. Driven by
367    /// `pitboss play --no-sweep` and `--sweep`; defaults to
368    /// [`SweepOverride::None`] which lets the configured trigger run.
369    sweep_override: SweepOverride,
370}
371
372/// Operator-supplied override for the deferred-sweep trigger.
373///
374/// Set via [`Runner::skip_sweep`] / [`Runner::force_sweep`] (mirrored on the
375/// CLI as `pitboss play --no-sweep` / `--sweep`). The two flags are mutually
376/// exclusive at the clap level, so the runner only ever sees one of these
377/// applied per invocation.
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379enum SweepOverride {
380    /// Honor the configured sweep trigger (default).
381    None,
382    /// Suppress sweeps for the duration of the run. Clears `pending_sweep`
383    /// at the top of the run and refuses to arm it from any subsequent phase
384    /// commit. The override is not persisted to `pitboss.toml`.
385    Skip,
386    /// Force a sweep before the next phase even if the trigger threshold
387    /// isn't met. Sets `pending_sweep = true` at the top of the run and
388    /// bypasses the trigger re-evaluation in the gate so the sweep dispatches
389    /// once. After it lands `pending_sweep` clears normally and the
390    /// post-phase trigger reverts to the configured behavior.
391    Force,
392}
393
394impl<A: Agent, G: Git> Runner<A, G> {
395    /// Build a new runner. The caller has already loaded `config`, `plan`,
396    /// `deferred`, and `state` from the workspace and is responsible for
397    /// having checked out the per-run branch (via [`crate::git::Git`]) before
398    /// calling [`Runner::run`].
399    #[allow(clippy::too_many_arguments)]
400    pub fn new(
401        workspace: impl Into<PathBuf>,
402        config: Config,
403        plan: Plan,
404        deferred: DeferredDoc,
405        state: RunState,
406        agent: A,
407        git: G,
408    ) -> Self {
409        let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
410        Self {
411            workspace: workspace.into(),
412            config,
413            plan,
414            deferred,
415            state,
416            agent,
417            git,
418            events_tx,
419            skip_tests: false,
420            sweep_override: SweepOverride::None,
421        }
422    }
423
424    /// Skip the per-phase test invocation entirely. Used by
425    /// `pitboss play --dry-run` so a no-op agent does not get halted by a
426    /// pre-existing red test suite. The runner emits [`Event::TestsSkipped`]
427    /// in place of [`Event::TestStarted`] / [`Event::TestFinished`] so
428    /// subscribers (logger, TUI) still get a clear signal that tests were
429    /// considered.
430    pub fn skip_tests(mut self, skip: bool) -> Self {
431        self.skip_tests = skip;
432        self
433    }
434
435    /// Suppress deferred sweeps for the duration of this runner. Mirrors the
436    /// CLI flag `pitboss play --no-sweep`: clears `state.pending_sweep`
437    /// immediately so an inherited obligation from a prior run is dropped,
438    /// and refuses to arm the gate from any subsequent phase commit. The
439    /// override is in-memory only — it does not write to `pitboss.toml`.
440    pub fn skip_sweep(mut self, skip: bool) -> Self {
441        if skip {
442            self.sweep_override = SweepOverride::Skip;
443            self.state.pending_sweep = false;
444        }
445        self
446    }
447
448    /// Force a sweep before the next phase, regardless of the configured
449    /// trigger threshold. Mirrors `pitboss play --sweep`: sets
450    /// `state.pending_sweep = true` so the gate fires on the next
451    /// [`Runner::run_phase`], and bypasses the gate's trigger re-evaluation
452    /// so a backlog below `trigger_min_items` still sweeps once. Mutually
453    /// exclusive with [`Runner::skip_sweep`] (enforced at the CLI layer).
454    pub fn force_sweep(mut self, force: bool) -> Self {
455        if force {
456            self.sweep_override = SweepOverride::Force;
457            self.state.pending_sweep = true;
458        }
459        self
460    }
461
462    /// Workspace this runner operates on.
463    pub fn workspace(&self) -> &Path {
464        &self.workspace
465    }
466
467    /// Borrow the loaded plan. Useful for tests asserting state advance.
468    pub fn plan(&self) -> &Plan {
469        &self.plan
470    }
471
472    /// Borrow the loaded deferred doc.
473    pub fn deferred(&self) -> &DeferredDoc {
474        &self.deferred
475    }
476
477    /// Borrow the run state.
478    pub fn state(&self) -> &RunState {
479        &self.state
480    }
481
482    /// Mutable handle to the cached run state.
483    ///
484    /// Test-only escape hatch (kept `pub` so integration tests can reach it;
485    /// `#[cfg(test)]` would only expose it to in-crate unit tests). A few
486    /// sweep tests need to seed a multi-step scenario by mutating the run
487    /// state between runner operations — re-arming `pending_sweep` to force
488    /// an extra sweep, or hand-populating the `deferred_item_attempts` map.
489    /// Without this accessor they had to round-trip through `state::save`
490    /// + drop runner + `Runner::new`, replumbing plan / deferred / agent /
491    /// git by hand.
492    #[doc(hidden)]
493    pub fn state_mut(&mut self) -> &mut RunState {
494        &mut self.state
495    }
496
497    /// Borrow the loaded config. Used by the TUI to populate the agent /
498    /// per-role model header chip.
499    pub fn config(&self) -> &Config {
500        &self.config
501    }
502
503    /// Borrow the dispatching agent. Used by the TUI to read
504    /// [`Agent::name`] for the header chip.
505    pub fn agent(&self) -> &A {
506        &self.agent
507    }
508
509    /// Borrow the git handle the runner is using. CLI code that needs to call
510    /// post-run git operations (e.g., `gh pr create` after a successful run)
511    /// reaches in through here so the same shell-vs-mock implementation the
512    /// runner used during the run is reused.
513    pub fn git_handle(&self) -> &G {
514        &self.git
515    }
516
517    /// Subscribe to the runner's event stream. Returns a fresh receiver each
518    /// call; existing subscribers are unaffected.
519    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
520        self.events_tx.subscribe()
521    }
522
523    /// Snapshot of `## Deferred items` entries whose per-sweep attempt counter
524    /// is at or above [`crate::config::SweepConfig::escalate_after`]. Sorted
525    /// by descending attempts (text ascending as a deterministic tiebreaker)
526    /// and capped at [`STALE_ITEMS_PROMPT_CAP`] entries so the sweep prompt
527    /// stays bounded.
528    ///
529    /// The runner consults this every time it builds a sweep or sweep-auditor
530    /// prompt; `pitboss status` and the TUI also call it to surface the
531    /// "needs human attention" list. Returning a fresh `Vec` keeps the caller
532    /// from leaking the underlying `RunState` map.
533    pub fn stale_items(&self) -> Vec<prompts::StaleItem> {
534        let escalate = self.config.sweep.escalate_after.max(1);
535        let mut items: Vec<prompts::StaleItem> = self
536            .state
537            .deferred_item_attempts
538            .iter()
539            .filter(|(_, &n)| n >= escalate)
540            .map(|(text, &attempts)| prompts::StaleItem {
541                text: text.clone(),
542                attempts,
543            })
544            .collect();
545        items.sort_by(|a, b| b.attempts.cmp(&a.attempts).then(a.text.cmp(&b.text)));
546        items.truncate(STALE_ITEMS_PROMPT_CAP);
547        items
548    }
549
550    /// Drive the runner until the plan completes or a phase halts.
551    ///
552    /// Always emits exactly one terminal event before returning:
553    /// [`Event::RunFinished`] on the success path, [`Event::PhaseHalted`] on
554    /// the halt path. Subscribers (logger, TUI) treat either event as the
555    /// signal that no further events will arrive — the broadcast channel
556    /// itself does not close because `Runner` keeps owning the sender for
557    /// post-run lookups (e.g., PR creation reads `runner.state()`).
558    pub async fn run(&mut self) -> Result<RunSummary> {
559        // Resume guard: a previous run drove past the final phase and either
560        // halted inside the final-sweep loop or interrupted before the loop
561        // could clear `pending_sweep`. Re-enter the loop directly so we don't
562        // accidentally re-run the final phase below.
563        if self.is_post_final_phase_state() {
564            return self.finish_or_run_final_sweep_loop().await;
565        }
566        loop {
567            let result = self.run_phase().await?;
568            match result {
569                PhaseResult::Halted { phase_id, reason } => {
570                    let _ = self.events_tx.send(Event::PhaseHalted {
571                        phase_id: phase_id.clone(),
572                        reason: reason.clone(),
573                    });
574                    return Ok(RunSummary::Halted { phase_id, reason });
575                }
576                PhaseResult::Advanced {
577                    next_phase: None, ..
578                } => {
579                    return self.finish_or_run_final_sweep_loop().await;
580                }
581                PhaseResult::Advanced { .. } => {}
582            }
583        }
584    }
585
586    /// True when state + plan describe "the final regular phase has committed
587    /// but `Runner::run` hasn't yet emitted [`Event::RunFinished`]". The
588    /// authoritative signal is the persisted `state.post_final_phase` flag,
589    /// set when the final phase commits in [`Runner::run_phase_inner`]. The
590    /// fallback inference (`completed.last() == current_phase &&
591    /// next_phase_id_after(...).is_none()`) covers state files written by
592    /// builds that predate the explicit flag.
593    fn is_post_final_phase_state(&self) -> bool {
594        if self.state.post_final_phase {
595            return true;
596        }
597        let Some(last_completed) = self.state.completed.last() else {
598            return false;
599        };
600        last_completed == &self.plan.current_phase
601            && self.next_phase_id_after(last_completed).is_none()
602    }
603
604    /// Shared tail used by both the success path of [`Runner::run`]'s phase
605    /// loop and the post-final-phase resume guard. Either dispatches the
606    /// bounded final-sweep drain loop (when sweeps are enabled, the trailing
607    /// drain is enabled, the operator did not pass `--no-sweep`, and at least
608    /// one unchecked item remains) or emits [`Event::RunFinished`] and
609    /// returns.
610    async fn finish_or_run_final_sweep_loop(&mut self) -> Result<RunSummary> {
611        let after = self
612            .state
613            .completed
614            .last()
615            .cloned()
616            .unwrap_or_else(|| self.plan.current_phase.clone());
617        if self.should_run_final_sweep_loop() {
618            return self.run_final_sweep_loop(after).await;
619        }
620        // No drain loop: clear any inherited `pending_sweep` (a `--no-sweep`
621        // resume of a halted final-sweep run gets here) before declaring the
622        // run finished, so a follow-up `pitboss play` doesn't see stale state.
623        if self.state.pending_sweep {
624            self.state.pending_sweep = false;
625            state::save(&self.workspace, Some(&self.state))
626                .context("runner: clearing pending_sweep at run finish")?;
627        }
628        let _ = self.events_tx.send(Event::RunFinished);
629        Ok(RunSummary::Finished)
630    }
631
632    /// Decide whether the final-sweep drain loop should run. The master
633    /// `[sweep] enabled` switch dominates the trailing-drain
634    /// `[sweep] final_sweep_enabled` toggle so an operator with sweeps
635    /// disabled never sees a surprise drain pass. `--no-sweep` suppresses the
636    /// loop the same way it suppresses between-phase sweeps.
637    fn should_run_final_sweep_loop(&self) -> bool {
638        if matches!(self.sweep_override, SweepOverride::Skip) {
639            return false;
640        }
641        if !self.config.sweep.enabled {
642            return false;
643        }
644        if !self.config.sweep.final_sweep_enabled {
645            return false;
646        }
647        sweep::unchecked_count(&self.deferred) > 0
648    }
649
650    /// Bounded final-sweep drain. Runs after the final regular phase commits
651    /// (or when a previous run halted inside this loop) and dispatches at most
652    /// [`crate::config::SweepConfig::final_sweep_max_iterations`] sweeps in a
653    /// row. Each iteration must resolve at least one item or the loop exits;
654    /// items the agent genuinely can't fix fall through to phase 05's
655    /// staleness machinery.
656    ///
657    /// `after` is the phase id every dispatch anchors on — the last completed
658    /// regular phase. Each iteration calls [`Runner::run_sweep_step_inner`] so
659    /// the existing sweep/auditor/staleness pipeline runs unchanged; the loop
660    /// adds only the iteration cap, the no-progress exit, and the per-iter
661    /// `pre_unchecked == 0` short-circuit.
662    async fn run_final_sweep_loop(&mut self, after: PhaseId) -> Result<RunSummary> {
663        // The post-final-phase commit reset `consecutive_sweeps` to 0 already,
664        // but a halted-then-resumed loop might inherit a stale value. We
665        // bypass the gate in [`Runner::run_phase_inner`] and dispatch
666        // [`Runner::run_sweep_step_inner`] directly, so the clamp can't pre-empt
667        // the loop — but resetting keeps the persisted counter honest for
668        // observers (`pitboss status`, the TUI).
669        self.state.consecutive_sweeps = 0;
670
671        let max_iters = self.config.sweep.final_sweep_max_iterations.max(1);
672        for _iter in 1..=max_iters {
673            let pre_unchecked = sweep::unchecked_count(&self.deferred);
674            if pre_unchecked == 0 {
675                // Drain succeeded (either the previous iteration cleared the
676                // last item, or we entered with an already-empty backlog).
677                break;
678            }
679            self.state.pending_sweep = true;
680            state::save(&self.workspace, Some(&self.state))
681                .context("runner: persisting pending_sweep for final-sweep iter")?;
682            let result = self.run_sweep_step(after.clone()).await?;
683            match result {
684                PhaseResult::Halted { reason, .. } => {
685                    // `pending_sweep` stays true on the halt path so a resume
686                    // re-enters this loop from iteration 1. Persist here
687                    // because `run_sweep_step_inner`'s halt path doesn't save
688                    // (the regular gate relies on `run_phase`'s save wrapper,
689                    // which doesn't run on this code path), and phase 05's
690                    // staleness counter increments must survive the halt so
691                    // cumulative progress carries across resumes.
692                    state::save(&self.workspace, Some(&self.state))
693                        .context("runner: persisting state at final-sweep halt")?;
694                    let _ = self.events_tx.send(Event::PhaseHalted {
695                        phase_id: after.clone(),
696                        reason: reason.clone(),
697                    });
698                    return Ok(RunSummary::Halted {
699                        phase_id: after,
700                        reason,
701                    });
702                }
703                PhaseResult::Advanced { .. } => {
704                    let post_unchecked = sweep::unchecked_count(&self.deferred);
705                    let resolved = pre_unchecked.saturating_sub(post_unchecked);
706                    if resolved == 0 {
707                        // Stuck items survive the loop and surface via phase
708                        // 05's staleness machinery.
709                        break;
710                    }
711                }
712            }
713        }
714        // Clean exit — `run_sweep_step_inner` already cleared `pending_sweep`
715        // on each successful iteration, but a `pre_unchecked == 0` short
716        // circuit at the top of iteration 1 (resume on a backlog the user
717        // drained by hand) needs us to clear it here too.
718        if self.state.pending_sweep {
719            self.state.pending_sweep = false;
720            state::save(&self.workspace, Some(&self.state))
721                .context("runner: clearing pending_sweep after final-sweep loop")?;
722        }
723        let _ = self.events_tx.send(Event::RunFinished);
724        Ok(RunSummary::Finished)
725    }
726
727    /// Execute the current phase to completion (success or halt).
728    ///
729    /// Persists [`RunState`] to `.pitboss/play/state.json` on every exit — including
730    /// halts — so the attempts counter and accumulated token usage survive a
731    /// halted phase and a subsequent `pitboss play` (or `pitboss rebuy`)
732    /// invocation can pick them up.
733    pub async fn run_phase(&mut self) -> Result<PhaseResult> {
734        let result = self.run_phase_inner().await;
735        if let Err(e) = state::save(&self.workspace, Some(&self.state)) {
736            tracing::error!("runner: failed to persist state.json: {e:#}");
737        }
738        result
739    }
740
741    async fn run_phase_inner(&mut self) -> Result<PhaseResult> {
742        // Pending sweep gate. A regular phase that closed above the trigger
743        // threshold sets `state.pending_sweep = true`; we re-evaluate the
744        // trigger here against the current on-disk deferred so a manual
745        // cleanup between resumes (the user clearing items by hand) drains
746        // the obligation cleanly rather than firing a no-op sweep.
747        if self.state.pending_sweep {
748            // Re-read from disk so external edits between resumes are
749            // observed, not just the cached `self.deferred`.
750            let deferred_path = paths::deferred_path(&self.workspace);
751            let on_disk = match std::fs::read_to_string(&deferred_path) {
752                Ok(s) => s,
753                Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
754                Err(e) => {
755                    return Err(anyhow::Error::new(e)
756                        .context(format!("runner: reading {:?}", &deferred_path)))
757                }
758            };
759            let parsed = deferred::parse(&on_disk).unwrap_or_else(|_| self.deferred.clone());
760            // `--no-sweep` shouldn't have left `pending_sweep` armed (the
761            // builder clears it), but defend against an externally set flag
762            // by short-circuiting here. `--sweep` bypasses the trigger so a
763            // forced sweep below the threshold still fires once.
764            let allow = match self.sweep_override {
765                SweepOverride::Skip => false,
766                SweepOverride::Force => true,
767                SweepOverride::None => sweep::should_run_deferred_sweep(
768                    &parsed,
769                    &self.config.sweep,
770                    self.state.consecutive_sweeps,
771                ),
772            };
773            if allow {
774                if let Some(prompt_after) = self.state.completed.last().cloned() {
775                    self.deferred = parsed;
776                    let result = self.run_sweep_step(prompt_after).await?;
777                    if matches!(result, PhaseResult::Advanced { .. })
778                        && matches!(self.sweep_override, SweepOverride::Force)
779                    {
780                        // `--sweep` is a one-shot directive: fire one
781                        // forced sweep at the next inter-phase boundary.
782                        // After the sweep advances, demote the override so
783                        // subsequent post-phase triggers fall back to the
784                        // configured threshold logic and we don't sweep
785                        // between every pair of phases.
786                        self.sweep_override = SweepOverride::None;
787                    }
788                    return Ok(result);
789                }
790                // Fresh run + `--sweep` (the only path that lands here with
791                // `state.completed` empty): silently no-op. There is no
792                // completed phase to anchor on, so the sweep would dispatch
793                // with `prompt_after = None` against an empty history —
794                // operators don't expect "between phases" wording before the
795                // first phase has even started. Clear the obligation, log the
796                // skip for visibility, and fall through to phase 01.
797                tracing::info!("skipping forced sweep: no completed phases yet to anchor on");
798                self.state.pending_sweep = false;
799                state::save(&self.workspace, Some(&self.state))
800                    .context("runner: persisting state.json after fresh-run force-sweep no-op")?;
801                self.deferred = parsed;
802            } else {
803                self.state.pending_sweep = false;
804                state::save(&self.workspace, Some(&self.state))
805                    .context("runner: persisting state.json after sweep gate cleared")?;
806                self.deferred = parsed;
807            }
808        }
809
810        let phase = self
811            .plan
812            .phase(&self.plan.current_phase)
813            .cloned()
814            .ok_or_else(|| {
815                anyhow!(
816                    "plan.current_phase {:?} is not present in plan.phases",
817                    self.plan.current_phase.as_str()
818                )
819            })?;
820        let phase_id = phase.id.clone();
821
822        if let Some(reason) = self.check_budget() {
823            return Ok(PhaseResult::Halted { phase_id, reason });
824        }
825
826        let attempt = self.bump_attempts(&phase_id);
827        let _ = self.events_tx.send(Event::PhaseStarted {
828            phase_id: phase_id.clone(),
829            title: phase.title.clone(),
830            attempt,
831        });
832
833        let plan_path = paths::plan_path(&self.workspace);
834        let deferred_path = paths::deferred_path(&self.workspace);
835        // Every pitboss artifact lives under `.pitboss/`, which is gitignored,
836        // so one exclude entry covers plan.md, deferred.md, state.json, logs,
837        // snapshots, grind state, etc.
838        let exclude: [&Path; 1] = [Path::new(".pitboss")];
839
840        let spec = DispatchSpec {
841            request: self.implementer_request(&phase, attempt),
842            phase_id: phase_id.clone(),
843            phase: Some(&phase),
844            plan_path: &plan_path,
845            deferred_path: &deferred_path,
846            exclude_paths: &exclude,
847            audit: self
848                .config
849                .audit
850                .enabled
851                .then_some(AuditKind::Phase { phase: &phase }),
852        };
853
854        let has_changes = match self.run_dispatch_pipeline(spec).await? {
855            PipelineOutcome::Halted(reason) => return Ok(PhaseResult::Halted { phase_id, reason }),
856            PipelineOutcome::Staged { has_changes } => has_changes,
857        };
858
859        let commit = if has_changes {
860            let id = self
861                .git
862                .commit(&git::commit_message(&phase_id, &phase.title))
863                .await
864                .context("runner: committing phase")?;
865            Some(id)
866        } else {
867            warn!(phase = %phase_id, "phase produced no code changes; skipping commit");
868            None
869        };
870
871        self.deferred.sweep();
872        write_atomic(
873            &deferred_path,
874            deferred::serialize(&self.deferred).as_bytes(),
875        )
876        .context("runner: writing deferred.md after sweep")?;
877
878        self.state.completed.push(phase_id.clone());
879        // Forward step → re-arm the consecutive-sweep clamp so the next
880        // pending sweep is allowed to fire.
881        self.state.consecutive_sweeps = 0;
882
883        let next_phase = self.next_phase_id_after(&phase_id);
884        if next_phase.is_none() {
885            // Final regular phase just committed. Persist the flag so a
886            // subsequent `pitboss play` resume re-enters the final-sweep
887            // drain loop directly rather than re-running the phase.
888            self.state.post_final_phase = true;
889        }
890        if let Some(ref next) = next_phase {
891            self.plan.set_current_phase(next.clone());
892            write_atomic(&plan_path, plan::serialize(&self.plan).as_bytes())
893                .context("runner: writing plan.md with advanced current_phase")?;
894            // Only consider scheduling a between-phase sweep when there is a
895            // next phase to insert it before. End-of-run sweeps (after the
896            // final phase) belong to phase 08. `--no-sweep` suppresses
897            // arming entirely; `--sweep` arms once and is consumed by the
898            // gate at the top of `run_phase_inner` after the sweep
899            // advances, so a multi-phase fresh run with `--sweep` fires
900            // its forced sweep at *this* boundary (between the just-
901            // committed first phase and the next), then reverts to
902            // threshold-driven behavior for subsequent boundaries.
903            if !matches!(self.sweep_override, SweepOverride::Skip)
904                && (matches!(self.sweep_override, SweepOverride::Force)
905                    || sweep::should_run_deferred_sweep(
906                        &self.deferred,
907                        &self.config.sweep,
908                        self.state.consecutive_sweeps,
909                    ))
910            {
911                self.state.pending_sweep = true;
912            }
913        }
914
915        state::save(&self.workspace, Some(&self.state)).context("runner: persisting state.json")?;
916
917        let _ = self.events_tx.send(Event::PhaseCommitted {
918            phase_id: phase_id.clone(),
919            commit: commit.clone(),
920        });
921
922        Ok(PhaseResult::Advanced {
923            phase_id,
924            next_phase,
925            commit,
926        })
927    }
928
929    /// Build the implementer [`AgentRequest`] for `phase`. `attempt` is the
930    /// 1-based dispatch counter the caller pulled from [`bump_attempts`]; it
931    /// flows into the per-attempt log filename so a fixer re-dispatch later in
932    /// the phase does not clobber the implementer's log.
933    fn implementer_request(&self, phase: &crate::plan::Phase, attempt: u32) -> AgentRequest {
934        AgentRequest {
935            role: Role::Implementer,
936            model: self.config.models.implementer.clone(),
937            system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
938            user_prompt: prompts::implementer(&self.plan, &self.deferred, phase),
939            workdir: self.workspace.clone(),
940            log_path: self.attempt_log_path(&phase.id, "implementer", attempt),
941            timeout: DEFAULT_AGENT_TIMEOUT,
942            env: std::collections::HashMap::new(),
943        }
944    }
945
946    /// Drive the dispatch → validate → tests → fixer → optional auditor → stage
947    /// chain for one already-built [`AgentRequest`]. The caller is responsible
948    /// for bumping the attempts counter and emitting the "started" event for
949    /// the dispatch type before invoking this; on return either a halt reason
950    /// is surfaced or the working tree has been staged and `has_changes`
951    /// indicates whether anything outside `exclude_paths` ended up in the
952    /// index. Phase 03 reuses this helper from a sweep entry point, which is
953    /// why the caller — not the helper — owns the commit decision.
954    async fn run_dispatch_pipeline(&mut self, spec: DispatchSpec<'_>) -> Result<PipelineOutcome> {
955        let DispatchSpec {
956            request,
957            phase_id,
958            phase,
959            plan_path,
960            deferred_path,
961            exclude_paths,
962            audit,
963        } = spec;
964
965        let test_runner = match self
966            .run_dispatch_through_fixer(request, phase, plan_path, deferred_path, &phase_id)
967            .await?
968        {
969            DispatchOutcome::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
970            DispatchOutcome::Continue { test_runner } => test_runner,
971        };
972
973        self.run_audit_and_stage(
974            audit,
975            test_runner.as_ref(),
976            plan_path,
977            deferred_path,
978            exclude_paths,
979            &phase_id,
980        )
981        .await
982    }
983
984    /// First half of the pipeline: dispatch the agent, validate planning
985    /// artifacts, run the project tests, and drive the fixer loop on failure.
986    /// Returns the resolved test runner (or `None` when no runner was
987    /// detected) so the caller can hand it to [`Runner::run_audit_and_stage`]
988    /// without re-detecting.
989    ///
990    /// Split out from [`Runner::run_dispatch_pipeline`] so the sweep call
991    /// site can interpose between dispatch and audit: by then `self.deferred`
992    /// reflects the implementer's edits, so the caller can compute the
993    /// `resolved` / `remaining` lists and build [`AuditKind::Sweep`] with
994    /// the spec-shape fields filled in directly.
995    async fn run_dispatch_through_fixer(
996        &mut self,
997        request: AgentRequest,
998        phase: Option<&crate::plan::Phase>,
999        plan_path: &Path,
1000        deferred_path: &Path,
1001        phase_id: &PhaseId,
1002    ) -> Result<DispatchOutcome> {
1003        let role = request.role;
1004        match self
1005            .dispatch_and_validate(request, role, plan_path, deferred_path)
1006            .await?
1007        {
1008            ValidationResult::Continue => {}
1009            ValidationResult::Halt(reason) => return Ok(DispatchOutcome::Halted(reason)),
1010        }
1011
1012        let test_runner = if self.skip_tests {
1013            debug!("dry-run: skipping test detection and execution");
1014            None
1015        } else {
1016            project_tests::detect(&self.workspace, self.config.tests.command.as_deref())
1017        };
1018        if let Some(runner) = &test_runner {
1019            // The post-dispatch test log shares the agent's attempt number so
1020            // operators can pair them up at a glance. `bump_attempts` was
1021            // called by the caller before building `request`, so reading
1022            // state.attempts here yields exactly that attempt.
1023            let attempt = self.state.attempts.get(phase_id).copied().unwrap_or(0);
1024            let outcome = self.run_tests(runner, phase_id, "tests", attempt).await?;
1025            if !outcome.passed {
1026                match self
1027                    .run_fixer_loop(
1028                        phase_id,
1029                        phase,
1030                        runner,
1031                        plan_path,
1032                        deferred_path,
1033                        outcome.summary,
1034                    )
1035                    .await?
1036                {
1037                    FixerLoopResult::Passed => {}
1038                    FixerLoopResult::Halted(reason) => {
1039                        return Ok(DispatchOutcome::Halted(reason));
1040                    }
1041                }
1042            }
1043        } else {
1044            if !self.skip_tests {
1045                debug!("no test runner detected and no override configured; skipping tests");
1046            }
1047            let _ = self.events_tx.send(Event::TestsSkipped);
1048        }
1049
1050        Ok(DispatchOutcome::Continue { test_runner })
1051    }
1052
1053    /// Second half of the pipeline: run the optional auditor pass, then
1054    /// re-stage code-only changes so the caller can decide whether to commit.
1055    ///
1056    /// Split out from [`Runner::run_dispatch_pipeline`] for the same reason
1057    /// as [`Runner::run_dispatch_through_fixer`] — see that method's doc for
1058    /// the why. Phase callers reach this through the
1059    /// [`Runner::run_dispatch_pipeline`] wrapper; sweep callers invoke it
1060    /// directly so they can build [`AuditKind::Sweep`] with the
1061    /// post-dispatch `resolved` / `remaining` lists.
1062    async fn run_audit_and_stage(
1063        &mut self,
1064        audit: Option<AuditKind<'_>>,
1065        test_runner: Option<&project_tests::TestRunner>,
1066        plan_path: &Path,
1067        deferred_path: &Path,
1068        exclude_paths: &[&Path],
1069        phase_id: &PhaseId,
1070    ) -> Result<PipelineOutcome> {
1071        if let Some(audit) = audit {
1072            match self
1073                .run_auditor_pass(
1074                    audit,
1075                    test_runner,
1076                    plan_path,
1077                    deferred_path,
1078                    exclude_paths,
1079                    phase_id,
1080                )
1081                .await?
1082            {
1083                AuditPassResult::Continue => {}
1084                AuditPassResult::Halted(reason) => return Ok(PipelineOutcome::Halted(reason)),
1085            }
1086        }
1087
1088        // Re-stage to capture anything the auditor added or modified. When the
1089        // auditor was skipped (disabled, or no code changes to audit) this is
1090        // the first stage call of the phase.
1091        self.git
1092            .stage_changes(exclude_paths)
1093            .await
1094            .context("runner: staging code-only changes")?;
1095
1096        let has_changes = self
1097            .git
1098            .has_staged_changes()
1099            .await
1100            .context("runner: checking for staged changes")?;
1101
1102        Ok(PipelineOutcome::Staged { has_changes })
1103    }
1104
1105    /// Compare the running [`RunState::token_usage`] against the configured
1106    /// budgets. Returns [`HaltReason::BudgetExceeded`] when either cap has
1107    /// been met or surpassed, otherwise `None`. Called before every agent
1108    /// dispatch so a fresh run never exceeds its budget by more than one
1109    /// dispatch's worth of tokens.
1110    fn check_budget(&self) -> Option<HaltReason> {
1111        let (tokens, usd) = budget_totals(&self.config, &self.state.token_usage);
1112        if let Some(cap) = self.config.budgets.max_total_tokens {
1113            if tokens >= cap {
1114                return Some(HaltReason::BudgetExceeded(format!(
1115                    "token budget reached: {tokens} >= cap {cap}"
1116                )));
1117            }
1118        }
1119        if let Some(cap) = self.config.budgets.max_total_usd {
1120            if usd >= cap {
1121                return Some(HaltReason::BudgetExceeded(format!(
1122                    "USD budget reached: ${usd:.4} >= cap ${cap:.4}"
1123                )));
1124            }
1125        }
1126        None
1127    }
1128
1129    fn next_phase_id_after(&self, current: &PhaseId) -> Option<PhaseId> {
1130        self.plan
1131            .phases
1132            .iter()
1133            .find(|p| p.id > *current)
1134            .map(|p| p.id.clone())
1135    }
1136
1137    /// Increment and return the per-phase attempt counter. The counter mirrors
1138    /// every agent dispatch made for a phase (implementer + each fixer
1139    /// attempt) so [`crate::state::RunState::attempts`] is the single source
1140    /// of truth for "how many model dispatches did this phase consume."
1141    fn bump_attempts(&mut self, phase_id: &PhaseId) -> u32 {
1142        let entry = self.state.attempts.entry(phase_id.clone()).or_insert(0);
1143        *entry += 1;
1144        *entry
1145    }
1146
1147    fn attempt_log_path(&self, phase_id: &PhaseId, role: &str, attempt: u32) -> PathBuf {
1148        paths::play_logs_dir(&self.workspace)
1149            .join(format!("phase-{}-{}-{}.log", phase_id, role, attempt))
1150    }
1151
1152    /// Per-attempt log path for a sweep dispatch. Distinct prefix from
1153    /// [`Runner::attempt_log_path`] so an operator scanning `.pitboss/play/logs/`
1154    /// can tell sweep dispatches apart from regular phase dispatches at a
1155    /// glance even though both account against `state.attempts[after]`.
1156    fn sweep_log_path(&self, after: &PhaseId, role: &str, attempt: u32) -> PathBuf {
1157        paths::play_logs_dir(&self.workspace)
1158            .join(format!("sweep-after-{}-{}-{}.log", after, role, attempt))
1159    }
1160
1161    /// Run a one-shot deferred sweep against the loaded state without
1162    /// advancing the plan state machine. Backs the `pitboss sweep`
1163    /// subcommand: an operator who edited `deferred.md` by hand or wants to
1164    /// drain a backlog ahead of the next `pitboss play` can invoke this in
1165    /// isolation.
1166    ///
1167    /// `after` is the prompt's `after_phase` label — when `None`, the sweep
1168    /// prompt renders the standalone variant ("no preceding phase to anchor
1169    /// on"). Accounting (attempts counter, log filename, events, commit
1170    /// message) falls back to the plan's current phase when no `after` is
1171    /// supplied so the dispatch still has a stable phase id to key on.
1172    ///
1173    /// `max_items` clamps the prompt's pending-items list to the first N
1174    /// items in document order without changing the on-disk file. Use it to
1175    /// keep a pathological backlog (100+ items) within the agent's effective
1176    /// context window; remaining items surface on the next sweep.
1177    pub async fn run_standalone_sweep(
1178        &mut self,
1179        after: Option<PhaseId>,
1180        max_items: Option<usize>,
1181        persist_state: bool,
1182    ) -> Result<PhaseResult> {
1183        let accounting = after
1184            .clone()
1185            .unwrap_or_else(|| self.plan.current_phase.clone());
1186        self.run_sweep_step_inner(accounting, after, max_items, persist_state)
1187            .await
1188    }
1189
1190    /// Dispatch one sweep anchored on `after`. Common-case wrapper around
1191    /// [`Runner::run_sweep_step_inner`] that fixes `prompt_after = Some(after)`
1192    /// and `max_items = None` — the natural defaults for both the inter-phase
1193    /// gate (when there is at least one completed phase to anchor on) and
1194    /// phase 08's final-sweep drain loop. Bespoke entry points that need
1195    /// either knob ([`Runner::run_standalone_sweep`] for `max_items`, the
1196    /// fresh-run path inside `run_phase_inner` for `prompt_after = None`)
1197    /// continue to call `run_sweep_step_inner` directly.
1198    async fn run_sweep_step(&mut self, after: PhaseId) -> Result<PhaseResult> {
1199        self.run_sweep_step_inner(after.clone(), Some(after), None, true)
1200            .await
1201    }
1202
1203    /// Shared body of the inter-phase sweep gate (in
1204    /// [`Runner::run_phase_inner`]) and [`Runner::run_standalone_sweep`].
1205    ///
1206    /// The sweep reuses [`Runner::run_dispatch_pipeline`] with `phase: None`
1207    /// and `audit: Some(AuditKind::Sweep { .. })` when
1208    /// [`crate::config::SweepConfig::audit_enabled`] is on. The implementer's
1209    /// prompt is built via [`prompts::sweep`]; if tests fail post-dispatch
1210    /// the fixer falls back to [`prompts::fixer_for_sweep`] inside the
1211    /// pipeline.
1212    ///
1213    /// `accounting` is the phase id used everywhere a sweep needs a stable
1214    /// key — `state.attempts`, the per-attempt log filename, the
1215    /// `Sweep{Started,Halted,Completed}` events, and the sweep commit
1216    /// message. `prompt_after` is what the implementer / auditor / fixer
1217    /// prompts read as the `{after}` substitution; `None` selects the
1218    /// standalone-sweep variant in [`prompts::sweep`]. `max_items`, when
1219    /// `Some(n)`, clamps the pending-items list passed into
1220    /// [`prompts::sweep`] to the first `n` pending items in document order.
1221    /// The on-disk `deferred.md` is unchanged: items dropped from the prompt
1222    /// view stay in the file and surface on the next sweep.
1223    async fn run_sweep_step_inner(
1224        &mut self,
1225        accounting: PhaseId,
1226        prompt_after: Option<PhaseId>,
1227        max_items: Option<usize>,
1228        persist_state: bool,
1229    ) -> Result<PhaseResult> {
1230        // Capture pre-sweep accounting. `pre_unchecked` drives the resolved
1231        // count for the commit message; `pre_texts` is the unchecked-item
1232        // text snapshot the post-dispatch step diffs against to derive
1233        // [`AuditKind::Sweep`]'s `resolved` / `remaining` lists and the
1234        // staleness counter increments for surviving items.
1235        let pre_unchecked = sweep::unchecked_count(&self.deferred);
1236        let pre_texts: HashSet<String> = self
1237            .deferred
1238            .items
1239            .iter()
1240            .filter(|i| !i.done)
1241            .map(|i| i.text.clone())
1242            .collect();
1243        // Snapshot the H3 phases block. The sweep prompt forbids touching
1244        // `## Deferred phases`; a mismatch on the way out trips the
1245        // DeferredInvalid guard.
1246        let pre_phases = phases_block_canonical(&self.deferred);
1247
1248        let plan_path = paths::plan_path(&self.workspace);
1249        let deferred_path = paths::deferred_path(&self.workspace);
1250        let pre_deferred_bytes = match std::fs::read(&deferred_path) {
1251            Ok(b) => b,
1252            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Vec::new(),
1253            Err(e) => {
1254                return Err(anyhow::Error::new(e)
1255                    .context(format!("runner: reading {:?} before sweep", &deferred_path)))
1256            }
1257        };
1258
1259        if let Some(reason) = self.check_budget() {
1260            // Pre-dispatch budget halt — the agent never ran, so this halt
1261            // doesn't tick the staleness clock. Combined with
1262            // `state.pending_sweep` retrying the same logical sweep on
1263            // resume, ticking here would double-count one operator-visible
1264            // attempt: once for the budget halt, once for the resume's
1265            // post-dispatch increment. Skipping it here keeps "one attempt
1266            // per implementer dispatch" as the rule.
1267            return Ok(PhaseResult::Halted {
1268                phase_id: accounting,
1269                reason,
1270            });
1271        }
1272
1273        let attempt = self.bump_attempts(&accounting);
1274        let _ = self.events_tx.send(Event::SweepStarted {
1275            after: accounting.clone(),
1276            items_pending: pre_unchecked,
1277            attempt,
1278        });
1279
1280        let stale = self.stale_items();
1281        let prompt_doc = match max_items {
1282            Some(n) => clamp_pending_items(&self.deferred, n),
1283            None => self.deferred.clone(),
1284        };
1285        let request = AgentRequest {
1286            role: Role::Implementer,
1287            model: self.config.models.implementer.clone(),
1288            system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1289            user_prompt: prompts::sweep(&self.plan, &prompt_doc, prompt_after.as_ref(), &stale),
1290            workdir: self.workspace.clone(),
1291            log_path: self.sweep_log_path(&accounting, "implementer", attempt),
1292            timeout: DEFAULT_AGENT_TIMEOUT,
1293            env: std::collections::HashMap::new(),
1294        };
1295
1296        let exclude: [&Path; 1] = [Path::new(".pitboss")];
1297
1298        // Split-pipeline dance: run the implementer dispatch + tests + fixer
1299        // first so `self.deferred` reflects the agent's edits, then build
1300        // [`AuditKind::Sweep`] with the spec-shape `resolved` / `remaining`
1301        // lists computed against `pre_texts`, then run the audit + stage half.
1302        let halt_with_staleness =
1303            |this: &mut Self, reason: HaltReason, pre_texts: &HashSet<String>| {
1304                // Halt-path bookkeeping fires before the halt event so the
1305                // staleness counter for surviving items reflects this attempt.
1306                // For dispatch_and_validate halts, `self.deferred` is the
1307                // pre-dispatch state (validation rolled back). For halts after
1308                // a successful implementer dispatch (test failure, audit
1309                // failure), `self.deferred` reflects the agent's edits — items
1310                // the agent flipped to `- [x]` are not in post_unchecked and
1311                // get pruned, items that survived get incremented.
1312                this.apply_sweep_staleness(pre_texts);
1313                let _ = this.events_tx.send(Event::SweepHalted {
1314                    after: accounting.clone(),
1315                    reason: reason.clone(),
1316                });
1317                PhaseResult::Halted {
1318                    phase_id: accounting.clone(),
1319                    reason,
1320                }
1321            };
1322
1323        let test_runner = match self
1324            .run_dispatch_through_fixer(request, None, &plan_path, &deferred_path, &accounting)
1325            .await?
1326        {
1327            DispatchOutcome::Halted(reason) => {
1328                return Ok(halt_with_staleness(self, reason, &pre_texts));
1329            }
1330            DispatchOutcome::Continue { test_runner } => test_runner,
1331        };
1332
1333        let audit = self.config.sweep.audit_enabled.then(|| {
1334            // The sweep prompt forbids rewording, so matching the pre-sweep
1335            // unchecked snapshot by exact text is sound.
1336            let resolved: Vec<String> = self
1337                .deferred
1338                .items
1339                .iter()
1340                .filter(|i| i.done && pre_texts.contains(&i.text))
1341                .map(|i| i.text.clone())
1342                .collect();
1343            let remaining: Vec<String> = self
1344                .deferred
1345                .items
1346                .iter()
1347                .filter(|i| !i.done)
1348                .map(|i| i.text.clone())
1349                .collect();
1350            AuditKind::Sweep {
1351                after: accounting.clone(),
1352                resolved,
1353                remaining,
1354            }
1355        });
1356
1357        let has_changes = match self
1358            .run_audit_and_stage(
1359                audit,
1360                test_runner.as_ref(),
1361                &plan_path,
1362                &deferred_path,
1363                &exclude,
1364                &accounting,
1365            )
1366            .await?
1367        {
1368            PipelineOutcome::Halted(reason) => {
1369                return Ok(halt_with_staleness(self, reason, &pre_texts));
1370            }
1371            PipelineOutcome::Staged { has_changes } => has_changes,
1372        };
1373
1374        // H3 invariant: the sweep prompt forbids editing `## Deferred phases`.
1375        // Restore from the pre-dispatch deferred snapshot when violated so the
1376        // halt is genuinely recoverable.
1377        let post_phases = phases_block_canonical(&self.deferred);
1378        if post_phases != pre_phases {
1379            warn!(
1380                after = %accounting,
1381                "sweep modified ## Deferred phases; restoring deferred.md"
1382            );
1383            self.restore_deferred(&deferred_path, &pre_deferred_bytes, true)?;
1384            // Re-parse the restored bytes so the cached doc agrees with disk.
1385            let restored = std::fs::read_to_string(&deferred_path).unwrap_or_default();
1386            if let Ok(parsed) = deferred::parse(&restored) {
1387                self.deferred = parsed;
1388            }
1389            // After restoration `self.deferred` matches pre-state, so the
1390            // bookkeeping treats every pre-text item as a survivor (+1 attempt).
1391            self.apply_sweep_staleness(&pre_texts);
1392            let reason = HaltReason::DeferredInvalid("sweep modified Deferred phases".into());
1393            let _ = self.events_tx.send(Event::SweepHalted {
1394                after: accounting.clone(),
1395                reason: reason.clone(),
1396            });
1397            return Ok(PhaseResult::Halted {
1398                phase_id: accounting,
1399                reason,
1400            });
1401        }
1402
1403        // `resolved` counts items the sweep flipped from `- [ ]` to `- [x]`.
1404        // `saturating_sub` keeps new items the agent appended (against the
1405        // prompt's instruction) from polluting the count into a negative.
1406        let resolved = pre_unchecked.saturating_sub(sweep::unchecked_count(&self.deferred));
1407
1408        let commit = if has_changes {
1409            let id = self
1410                .git
1411                .commit(&git::commit_message_sweep(&accounting, resolved))
1412                .await
1413                .context("runner: committing sweep")?;
1414            Some(id)
1415        } else {
1416            warn!(after = %accounting, "sweep produced no code changes; skipping commit");
1417            None
1418        };
1419
1420        // Staleness bookkeeping must run before `self.deferred.sweep()` would
1421        // matter, but since the helper filters on `!i.done` either order
1422        // produces the same `post_unchecked_texts`. Doing it here keeps the
1423        // success path symmetrical with the halt paths above.
1424        self.apply_sweep_staleness(&pre_texts);
1425
1426        // Drop the items the agent ticked off so a later regular phase doesn't
1427        // re-render them in the next implementer prompt. Mirrors the
1428        // post-phase sweep call in `run_phase_inner`.
1429        self.deferred.sweep();
1430        write_atomic(
1431            &deferred_path,
1432            deferred::serialize(&self.deferred).as_bytes(),
1433        )
1434        .context("runner: writing deferred.md after sweep step")?;
1435
1436        self.state.pending_sweep = false;
1437        self.state.consecutive_sweeps = self.state.consecutive_sweeps.saturating_add(1);
1438        // `state.completed` tracks plan progress only; sweeps do not push to
1439        // it. `current_phase` already advanced when the preceding phase
1440        // committed, so the runner picks the regular phase up on the next
1441        // `run_phase` call.
1442        //
1443        // `persist_state = false` is the standalone-sweep-on-fresh-workspace
1444        // case: state was synthesized in memory and the caller doesn't want
1445        // an empty run claiming the workspace by leaving a state.json
1446        // behind. The runner skips the write so the CLI doesn't have to
1447        // clean up after it.
1448        if persist_state {
1449            state::save(&self.workspace, Some(&self.state))
1450                .context("runner: persisting state.json after sweep")?;
1451        }
1452
1453        let _ = self.events_tx.send(Event::SweepCompleted {
1454            after: accounting.clone(),
1455            resolved,
1456            commit: commit.clone(),
1457        });
1458
1459        Ok(PhaseResult::Advanced {
1460            phase_id: accounting,
1461            next_phase: Some(self.plan.current_phase.clone()),
1462            commit,
1463        })
1464    }
1465
1466    /// Run a single agent dispatch and validate the planning artifacts on the
1467    /// way out: snapshot `plan.md` + `deferred.md` first, dispatch, then
1468    /// require `plan.md` to be byte-identical and `deferred.md` to re-parse.
1469    /// On any failure mode the artifacts are restored from the pre-dispatch
1470    /// snapshots before returning [`ValidationResult::Halt`].
1471    async fn dispatch_and_validate(
1472        &mut self,
1473        request: AgentRequest,
1474        role: Role,
1475        plan_path: &Path,
1476        deferred_path: &Path,
1477    ) -> Result<ValidationResult> {
1478        let plan_pre =
1479            std::fs::read(plan_path).with_context(|| format!("runner: reading {:?}", plan_path))?;
1480        let plan_hash = Snapshot::of_bytes(&plan_pre);
1481        let (deferred_pre, deferred_existed) = match std::fs::read(deferred_path) {
1482            Ok(b) => (b, true),
1483            Err(e) if e.kind() == std::io::ErrorKind::NotFound => (Vec::new(), false),
1484            Err(e) => {
1485                return Err(
1486                    anyhow::Error::new(e).context(format!("runner: reading {:?}", deferred_path))
1487                )
1488            }
1489        };
1490
1491        let dispatch = self.dispatch_agent(request).await?;
1492        // Token usage is folded regardless of whether the dispatch ended
1493        // cleanly — even an aborted run can incur partial spend.
1494        self.fold_token_usage(role, &dispatch);
1495
1496        match &dispatch.stop_reason {
1497            StopReason::Completed => {}
1498            StopReason::Timeout => {
1499                return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
1500                    "agent {:?} timed out after {:?}",
1501                    self.agent.name(),
1502                    DEFAULT_AGENT_TIMEOUT
1503                ))));
1504            }
1505            StopReason::Cancelled => {
1506                return Ok(ValidationResult::Halt(HaltReason::AgentFailure(format!(
1507                    "agent {:?} was cancelled",
1508                    self.agent.name()
1509                ))));
1510            }
1511            StopReason::Error(msg) => {
1512                return Ok(ValidationResult::Halt(HaltReason::AgentFailure(
1513                    msg.clone(),
1514                )));
1515            }
1516        }
1517
1518        let plan_post = std::fs::read(plan_path)
1519            .with_context(|| format!("runner: reading {:?} after agent", plan_path))?;
1520        if Snapshot::of_bytes(&plan_post) != plan_hash {
1521            warn!(role = %role, "agent modified plan.md; restoring from snapshot");
1522            write_atomic(plan_path, &plan_pre).with_context(|| {
1523                format!(
1524                    "runner: restoring {:?} from snapshot after tamper",
1525                    plan_path
1526                )
1527            })?;
1528            return Ok(ValidationResult::Halt(HaltReason::PlanTampered));
1529        }
1530
1531        let deferred_text = match std::fs::read_to_string(deferred_path) {
1532            Ok(s) => s,
1533            Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(),
1534            Err(e) => {
1535                return Err(anyhow::Error::new(e)
1536                    .context(format!("runner: reading {:?} after agent", deferred_path)))
1537            }
1538        };
1539        match deferred::parse(&deferred_text) {
1540            Ok(parsed) => {
1541                self.deferred = parsed;
1542            }
1543            Err(e) => {
1544                let msg = format!("{e}");
1545                warn!(role = %role, error = %msg, "deferred.md is invalid; restoring");
1546                self.restore_deferred(deferred_path, &deferred_pre, deferred_existed)?;
1547                return Ok(ValidationResult::Halt(HaltReason::DeferredInvalid(msg)));
1548            }
1549        }
1550
1551        Ok(ValidationResult::Continue)
1552    }
1553
1554    /// Run the project's test suite once, emitting [`Event::TestStarted`] /
1555    /// [`Event::TestFinished`] around the call. `log_role` distinguishes the
1556    /// log filename so a fixer-driven re-run does not clobber the prior log.
1557    async fn run_tests(
1558        &self,
1559        runner: &project_tests::TestRunner,
1560        phase_id: &PhaseId,
1561        log_role: &str,
1562        attempt: u32,
1563    ) -> Result<project_tests::TestOutcome> {
1564        let _ = self.events_tx.send(Event::TestStarted);
1565        let test_log = self.attempt_log_path(phase_id, log_role, attempt);
1566        let outcome = runner
1567            .run(test_log)
1568            .await
1569            .context("runner: running project tests")?;
1570        let _ = self.events_tx.send(Event::TestFinished {
1571            passed: outcome.passed,
1572            summary: outcome.summary.clone(),
1573        });
1574        Ok(outcome)
1575    }
1576
1577    /// Drive the bounded fixer loop after the implementer's tests fail.
1578    ///
1579    /// Up to [`crate::config::RetryBudgets::fixer_max_attempts`] dispatches:
1580    /// each one snapshots the planning artifacts, dispatches the fixer agent,
1581    /// validates, then re-runs the tests. The first attempt that produces a
1582    /// passing test run resolves to [`FixerLoopResult::Passed`]; once the
1583    /// budget is exhausted the loop returns the final test summary as
1584    /// [`HaltReason::TestsFailed`].
1585    async fn run_fixer_loop(
1586        &mut self,
1587        phase_id: &PhaseId,
1588        phase: Option<&crate::plan::Phase>,
1589        test_runner: &project_tests::TestRunner,
1590        plan_path: &Path,
1591        deferred_path: &Path,
1592        initial_summary: String,
1593    ) -> Result<FixerLoopResult> {
1594        let budget = self.config.retries.fixer_max_attempts;
1595        if budget == 0 {
1596            return Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
1597                initial_summary,
1598            )));
1599        }
1600
1601        let mut last_summary = initial_summary;
1602
1603        for fixer_attempt in 1..=budget {
1604            if let Some(reason) = self.check_budget() {
1605                return Ok(FixerLoopResult::Halted(reason));
1606            }
1607            let total_attempt = self.bump_attempts(phase_id);
1608            let _ = self.events_tx.send(Event::FixerStarted {
1609                phase_id: phase_id.clone(),
1610                fixer_attempt,
1611                attempt: total_attempt,
1612            });
1613
1614            let user_prompt = match phase {
1615                Some(p) => {
1616                    prompts::fixer_with_deferred(&self.plan, p, &last_summary, &self.deferred)
1617                }
1618                None => prompts::fixer_for_sweep(&self.plan, &self.deferred, &last_summary),
1619            };
1620            let log_path = self.attempt_log_path(phase_id, "fix", fixer_attempt);
1621            let request = AgentRequest {
1622                role: Role::Fixer,
1623                model: self.config.models.fixer.clone(),
1624                system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1625                user_prompt,
1626                workdir: self.workspace.clone(),
1627                log_path,
1628                timeout: DEFAULT_AGENT_TIMEOUT,
1629                env: std::collections::HashMap::new(),
1630            };
1631
1632            match self
1633                .dispatch_and_validate(request, Role::Fixer, plan_path, deferred_path)
1634                .await?
1635            {
1636                ValidationResult::Continue => {}
1637                ValidationResult::Halt(reason) => return Ok(FixerLoopResult::Halted(reason)),
1638            }
1639
1640            let outcome = self
1641                .run_tests(test_runner, phase_id, "tests", total_attempt)
1642                .await?;
1643            if outcome.passed {
1644                return Ok(FixerLoopResult::Passed);
1645            }
1646            last_summary = outcome.summary;
1647        }
1648
1649        Ok(FixerLoopResult::Halted(HaltReason::TestsFailed(
1650            last_summary,
1651        )))
1652    }
1653
1654    /// Run the auditor agent. Gating on
1655    /// [`crate::config::AuditConfig::enabled`] is the caller's responsibility:
1656    /// [`Runner::run_dispatch_pipeline`] only invokes this helper when its
1657    /// `audit` field is `Some`, so callers that build the spec without
1658    /// populating `audit` get the disabled-by-default behavior for free.
1659    ///
1660    /// Slots in after the test suite (and any fixer dispatches) passes and
1661    /// before the per-phase commit. The runner stages the implementer's code
1662    /// changes so it can hand the resulting `git diff --cached` to the auditor
1663    /// as input; if the staged diff is empty (e.g. the implementer only edited
1664    /// planning artifacts) the audit is skipped. The auditor may inline small
1665    /// fixes or extend `deferred.md`; either way the post-dispatch validation
1666    /// re-parses `deferred.md` and re-runs the project tests since the auditor
1667    /// may have edited code. A test failure post-audit halts the phase rather
1668    /// than re-entering the fixer loop — auditor edits are scoped enough that
1669    /// breaking the build deserves operator attention.
1670    async fn run_auditor_pass(
1671        &mut self,
1672        audit: AuditKind<'_>,
1673        test_runner: Option<&project_tests::TestRunner>,
1674        plan_path: &Path,
1675        deferred_path: &Path,
1676        exclude: &[&Path],
1677        phase_id: &PhaseId,
1678    ) -> Result<AuditPassResult> {
1679        // Stage so we can sample the diff. `stage_changes` is idempotent so
1680        // calling it again post-audit picks up anything the auditor adds.
1681        self.git
1682            .stage_changes(exclude)
1683            .await
1684            .context("runner: staging for audit diff")?;
1685        let diff = self
1686            .git
1687            .staged_diff()
1688            .await
1689            .context("runner: capturing staged diff for auditor")?;
1690
1691        let kind = match &audit {
1692            AuditKind::Phase { .. } => AuditContextKind::Phase,
1693            AuditKind::Sweep { .. } => AuditContextKind::Sweep,
1694        };
1695        let context = AuditContext {
1696            phase_id: phase_id.clone(),
1697            kind,
1698        };
1699
1700        if diff.trim().is_empty() {
1701            let _ = self.events_tx.send(Event::AuditorSkippedNoChanges {
1702                context: context.clone(),
1703            });
1704            return Ok(AuditPassResult::Continue);
1705        }
1706
1707        if let Some(reason) = self.check_budget() {
1708            return Ok(AuditPassResult::Halted(reason));
1709        }
1710
1711        let total_attempt = self.bump_attempts(phase_id);
1712        let _ = self.events_tx.send(Event::AuditorStarted {
1713            context: context.clone(),
1714            attempt: total_attempt,
1715        });
1716
1717        let (user_prompt, log_path) = match audit {
1718            AuditKind::Phase { phase } => (
1719                prompts::auditor_with_deferred(
1720                    &self.plan,
1721                    phase,
1722                    &diff,
1723                    &self.deferred,
1724                    self.config.audit.small_fix_line_limit,
1725                ),
1726                // Phase auditor only ever runs once per phase, so the per-role
1727                // attempt counter in the log filename stays at 1; the global
1728                // `attempt` counter still bumps so [`RunState::attempts`]
1729                // reflects the spend.
1730                self.attempt_log_path(phase_id, "audit", 1),
1731            ),
1732            AuditKind::Sweep {
1733                after,
1734                resolved,
1735                remaining,
1736            } => {
1737                let stale = self.stale_items();
1738                (
1739                    prompts::sweep_auditor(prompts::SweepAuditorPrompt {
1740                        plan: &self.plan,
1741                        deferred: &self.deferred,
1742                        after: &after,
1743                        diff: &diff,
1744                        resolved: &resolved,
1745                        remaining: &remaining,
1746                        stale_items: &stale,
1747                        small_fix_line_limit: self.config.audit.small_fix_line_limit,
1748                    }),
1749                    // Sweep audits get a sweep-prefix log path so an operator
1750                    // scanning `.pitboss/play/logs/` can tell sweep audits
1751                    // apart from regular phase audits at a glance.
1752                    self.sweep_log_path(&after, "audit", 1),
1753                )
1754            }
1755        };
1756        let request = AgentRequest {
1757            role: Role::Auditor,
1758            model: self.config.models.auditor.clone(),
1759            system_prompt: prompts::caveman::system_prompt(&self.config.caveman),
1760            user_prompt,
1761            workdir: self.workspace.clone(),
1762            log_path,
1763            timeout: DEFAULT_AGENT_TIMEOUT,
1764            env: std::collections::HashMap::new(),
1765        };
1766
1767        match self
1768            .dispatch_and_validate(request, Role::Auditor, plan_path, deferred_path)
1769            .await?
1770        {
1771            ValidationResult::Continue => {}
1772            ValidationResult::Halt(reason) => return Ok(AuditPassResult::Halted(reason)),
1773        }
1774
1775        if let Some(test_runner) = test_runner {
1776            let outcome = self
1777                .run_tests(test_runner, phase_id, "tests", total_attempt)
1778                .await?;
1779            if !outcome.passed {
1780                return Ok(AuditPassResult::Halted(HaltReason::TestsFailed(
1781                    outcome.summary,
1782                )));
1783            }
1784        }
1785
1786        Ok(AuditPassResult::Continue)
1787    }
1788
1789    /// Run the per-item staleness bookkeeping after a sweep dispatch, success
1790    /// or halt. Reads `post_unchecked_texts` straight from `self.deferred`
1791    /// (after restoration, on halt paths) so the same code path serves every
1792    /// sweep exit. Emits [`Event::DeferredItemStale`] for items whose counter
1793    /// just crossed [`crate::config::SweepConfig::escalate_after`] (transition
1794    /// only — items already at or above the threshold do not re-emit).
1795    fn apply_sweep_staleness(&mut self, pre_texts: &HashSet<String>) {
1796        let post_unchecked_texts: HashSet<String> = self
1797            .deferred
1798            .items
1799            .iter()
1800            .filter(|i| !i.done)
1801            .map(|i| i.text.clone())
1802            .collect();
1803        let crossed = sweep::update_sweep_staleness(
1804            &mut self.state.deferred_item_attempts,
1805            pre_texts,
1806            &post_unchecked_texts,
1807            self.config.sweep.escalate_after,
1808        );
1809        for (text, attempts) in crossed {
1810            let _ = self
1811                .events_tx
1812                .send(Event::DeferredItemStale { text, attempts });
1813        }
1814    }
1815
1816    fn fold_token_usage(&mut self, role: Role, dispatch: &AgentDispatch) {
1817        let tokens = &dispatch.outcome_tokens;
1818        self.state.token_usage.input += tokens.input;
1819        self.state.token_usage.output += tokens.output;
1820        let entry = self
1821            .state
1822            .token_usage
1823            .by_role
1824            .entry(role.as_str().to_string())
1825            .or_default();
1826        entry.input += tokens.input;
1827        entry.output += tokens.output;
1828        for (k, v) in &tokens.by_role {
1829            let e = self.state.token_usage.by_role.entry(k.clone()).or_default();
1830            e.input += v.input;
1831            e.output += v.output;
1832        }
1833        let _ = self
1834            .events_tx
1835            .send(Event::UsageUpdated(self.state.token_usage.clone()));
1836    }
1837
1838    fn restore_deferred(
1839        &self,
1840        deferred_path: &Path,
1841        pre_bytes: &[u8],
1842        existed: bool,
1843    ) -> Result<()> {
1844        if existed {
1845            write_atomic(deferred_path, pre_bytes).with_context(|| {
1846                format!(
1847                    "runner: restoring {:?} from snapshot after parse failure",
1848                    deferred_path
1849                )
1850            })?;
1851        } else if deferred_path.exists() {
1852            std::fs::remove_file(deferred_path).with_context(|| {
1853                format!(
1854                    "runner: removing agent-created {:?} after parse failure",
1855                    deferred_path
1856                )
1857            })?;
1858        }
1859        Ok(())
1860    }
1861
1862    async fn dispatch_agent(&self, request: AgentRequest) -> Result<AgentDispatch> {
1863        let role = request.role;
1864        let (mpsc_tx, mpsc_rx) = mpsc::channel(AGENT_EVENT_CHANNEL_CAPACITY);
1865        let cancel = CancellationToken::new();
1866        let events_tx = self.events_tx.clone();
1867
1868        let forward = tokio::spawn(forward_agent_events(mpsc_rx, events_tx));
1869
1870        let outcome = self
1871            .agent
1872            .run(request, mpsc_tx, cancel)
1873            .await
1874            .with_context(|| format!("runner: agent {:?} failed to run", self.agent.name()))?;
1875        let _ = forward.await;
1876
1877        Ok(AgentDispatch {
1878            stop_reason: outcome.stop_reason,
1879            outcome_tokens: outcome.tokens,
1880            _role: role,
1881        })
1882    }
1883}
1884
1885/// Clone `doc` and keep only the first `max` pending items (in document
1886/// order). Already-checked items are filtered out by the sweep prompt
1887/// renderer, so they're left alone here. `## Deferred phases` are preserved
1888/// verbatim — the sweep prompt forbids editing them and the auditor cross-
1889/// checks against the full file.
1890///
1891/// Used by [`Runner::run_standalone_sweep`] (and any future
1892/// `--max-items`-style truncation) to keep a pathological backlog within
1893/// the agent's effective context window without touching disk: the original
1894/// `deferred.md` is unchanged, so items not in the prompt view stay pending
1895/// for the next sweep.
1896fn clamp_pending_items(doc: &DeferredDoc, max: usize) -> DeferredDoc {
1897    let mut out = DeferredDoc {
1898        items: Vec::with_capacity(doc.items.len().min(max + 1)),
1899        phases: doc.phases.clone(),
1900    };
1901    let mut pending_kept = 0usize;
1902    for item in &doc.items {
1903        if !item.done {
1904            if pending_kept >= max {
1905                continue;
1906            }
1907            pending_kept += 1;
1908        }
1909        out.items.push(item.clone());
1910    }
1911    out
1912}
1913
1914/// Canonical serialization of just the `## Deferred phases` block of a
1915/// [`DeferredDoc`]. Used by the sweep step to detect whether the agent edited
1916/// any H3 entries — the sweep prompt forbids it, so any difference between the
1917/// pre- and post-dispatch hash trips a halt and a deferred.md rollback.
1918fn phases_block_canonical(doc: &DeferredDoc) -> String {
1919    let phases_only = DeferredDoc {
1920        items: Vec::new(),
1921        phases: doc.phases.clone(),
1922    };
1923    deferred::serialize(&phases_only)
1924}
1925
1926/// Snapshot of the agent dispatch the runner needs after the call returns.
1927struct AgentDispatch {
1928    stop_reason: StopReason,
1929    outcome_tokens: crate::state::TokenUsage,
1930    _role: Role,
1931}
1932
1933/// Outcome of [`Runner::dispatch_and_validate`]. Either continue with the
1934/// post-dispatch flow, or short-circuit with a halt reason already populated
1935/// (snapshots restored, tokens folded).
1936enum ValidationResult {
1937    Continue,
1938    Halt(HaltReason),
1939}
1940
1941/// Outcome of [`Runner::run_fixer_loop`]. `Passed` means a fixer attempt
1942/// produced a passing test run; `Halted` carries the reason — either an agent
1943/// failure during a fixer dispatch, a planning-artifact tamper, or budget
1944/// exhaustion (in which case the variant is [`HaltReason::TestsFailed`]).
1945enum FixerLoopResult {
1946    Passed,
1947    Halted(HaltReason),
1948}
1949
1950/// Outcome of [`Runner::run_auditor_pass`]. `Continue` covers all the keep-
1951/// going cases (no diff to audit, audit ran and tests still pass); `Halted`
1952/// carries an agent-side failure, a planning-artifact tamper, or post-audit
1953/// test breakage. The "audit disabled" case is handled upstream by
1954/// [`Runner::run_dispatch_pipeline`] only invoking the auditor when the spec
1955/// carries an [`AuditKind`].
1956enum AuditPassResult {
1957    Continue,
1958    Halted(HaltReason),
1959}
1960
1961/// Inputs to [`Runner::run_dispatch_pipeline`]. The pipeline is the shared
1962/// dispatch → validate → tests → fixer → optional auditor → stage chain that
1963/// both the per-phase implementer dispatch (today) and the per-sweep
1964/// implementer dispatch (phase 03) hand off to. Lifetimes tie everything to
1965/// the caller's stack frame so the caller keeps ownership of the source
1966/// [`crate::plan::Phase`] and path buffers.
1967struct DispatchSpec<'a> {
1968    /// The pre-built agent request to dispatch first. The caller is
1969    /// responsible for having bumped [`RunState::attempts`] for `phase_id`
1970    /// before constructing this so the request's `log_path` reflects the
1971    /// dispatch's attempt number.
1972    request: AgentRequest,
1973    /// Phase id under which this dispatch is recorded — drives the fixer
1974    /// loop's attempt tracking, the auditor's "started" event, and the
1975    /// post-dispatch test log filename. For phase-driven dispatches this is
1976    /// the current phase. For sweep dispatches (phase 03) this is the most
1977    /// recently completed `after_phase`, since `state.attempts` keys on real
1978    /// phase ids.
1979    phase_id: PhaseId,
1980    /// The current phase, when there is one. The fixer loop uses this to
1981    /// render `prompts::fixer_with_deferred`; sweep dispatches pass `None` to
1982    /// fall back to `prompts::fixer_for_sweep`.
1983    phase: Option<&'a crate::plan::Phase>,
1984    /// Path to `plan.md` for snapshot validation.
1985    plan_path: &'a Path,
1986    /// Path to `deferred.md` for snapshot + parse validation.
1987    deferred_path: &'a Path,
1988    /// Paths to exclude from `git add` so planning artifacts under
1989    /// `.pitboss/` never end up in the per-phase commit.
1990    exclude_paths: &'a [&'a Path],
1991    /// Whether to run the auditor pass after tests pass. `None` skips the
1992    /// auditor entirely; `Some(kind)` selects the prompt variant.
1993    audit: Option<AuditKind<'a>>,
1994}
1995
1996/// Selects the auditor prompt variant.
1997///
1998/// `Phase` runs the regular auditor against an implementer diff for a plan
1999/// phase. `Sweep` runs the sweep-specific auditor against a deferred-sweep
2000/// dispatch; its contract is "for each item the implementer marked `- [x]`,
2001/// does the diff actually do that work? revert anything unrelated."
2002enum AuditKind<'a> {
2003    /// Audit a regular plan-phase implementer dispatch. Renders
2004    /// [`crate::prompts::auditor_with_deferred`] for the carried phase.
2005    Phase {
2006        /// The phase whose implementer diff is under review.
2007        phase: &'a crate::plan::Phase,
2008    },
2009    /// Audit a deferred-sweep dispatch. Renders [`crate::prompts::sweep_auditor`]
2010    /// with the sweep's resolved / remaining item lists threaded through.
2011    ///
2012    /// Both lists are computed by the sweep call site after the implementer
2013    /// dispatch has updated `self.deferred`; the auditor pass just reads them.
2014    /// See [`Runner::run_dispatch_through_fixer`] /
2015    /// [`Runner::run_audit_and_stage`] for the split-pipeline shape that
2016    /// makes this possible.
2017    Sweep {
2018        /// Most recently completed plan phase the sweep fired after. Becomes
2019        /// the `{after}` substitution and selects the sweep-prefix log path.
2020        after: PhaseId,
2021        /// Items the implementer flipped from `- [ ]` to `- [x]` during the
2022        /// sweep dispatch (intersected with the pre-sweep unchecked snapshot
2023        /// so a reworded item doesn't get miscounted).
2024        resolved: Vec<String>,
2025        /// Items still unchecked in `deferred.md` after the sweep dispatch.
2026        remaining: Vec<String>,
2027    },
2028}
2029
2030/// Outcome of [`Runner::run_dispatch_through_fixer`]. The first half of the
2031/// pipeline either halts (validation, fixer-budget, or budget exhaustion) or
2032/// continues and hands back the test runner it resolved so the second half
2033/// ([`Runner::run_audit_and_stage`]) can re-use it for the post-audit re-run.
2034enum DispatchOutcome {
2035    Halted(HaltReason),
2036    Continue {
2037        /// Test runner detected for this dispatch (or `None` when tests were
2038        /// skipped or no runner was configured). Re-used by the auditor pass
2039        /// so a single dispatch only probes for tests once.
2040        test_runner: Option<project_tests::TestRunner>,
2041    },
2042}
2043
2044/// Outcome of [`Runner::run_dispatch_pipeline`]. The pipeline never commits;
2045/// `Staged { has_changes }` hands back whether the staged index is non-empty
2046/// so the caller (today: [`Runner::run_phase_inner`]) can decide whether to
2047/// commit.
2048enum PipelineOutcome {
2049    Halted(HaltReason),
2050    Staged {
2051        /// `true` when `git.has_staged_changes()` returned true after the
2052        /// final stage call — i.e. the dispatch (and any auditor edits)
2053        /// produced code outside `exclude_paths`. `false` when the only
2054        /// changes were inside `.pitboss/` and got excluded.
2055        has_changes: bool,
2056    },
2057}
2058
2059async fn forward_agent_events(mut rx: mpsc::Receiver<AgentEvent>, tx: broadcast::Sender<Event>) {
2060    while let Some(ev) = rx.recv().await {
2061        match ev {
2062            AgentEvent::Stdout(line) => {
2063                let _ = tx.send(Event::AgentStdout(line));
2064            }
2065            AgentEvent::Stderr(line) => {
2066                let _ = tx.send(Event::AgentStderr(line));
2067            }
2068            AgentEvent::ToolUse(name) => {
2069                let _ = tx.send(Event::AgentToolUse(name));
2070            }
2071            AgentEvent::TokenDelta(_) => {
2072                // Token deltas are folded into [`RunState::token_usage`] from
2073                // the final outcome, not the stream — folding here would
2074                // double-count any agent that emits both intermediate deltas
2075                // and a totals report.
2076            }
2077        }
2078    }
2079}
2080
2081/// Build a fresh [`RunState`] for a workspace that has not started a run yet.
2082///
2083/// `now` is the timestamp used to derive both the run id and the per-run
2084/// branch (`<config.git.branch_prefix><utc_timestamp>`). Keeping the timestamp
2085/// explicit makes startup deterministic in tests.
2086pub fn fresh_run_state(plan: &Plan, config: &Config, now: chrono::DateTime<Utc>) -> RunState {
2087    let run_id = now.format("%Y%m%dT%H%M%SZ").to_string();
2088    let branch = git::branch_name(&config.git.branch_prefix, now);
2089    let mut s = RunState::new(run_id, branch, plan.current_phase.clone());
2090    s.started_at = now;
2091    s
2092}
2093
2094/// Subscribe to a runner's event stream and print a human-readable line per
2095/// event to stderr until a terminal event arrives or the channel closes.
2096///
2097/// This is the "no TUI" CLI experience: progress is rendered via plain
2098/// `tracing::info`-style lines so log piping and CI logs work out of the box.
2099///
2100/// The runner emits [`Event::RunFinished`] on the success path and
2101/// [`Event::PhaseHalted`] on the halt path; this function logs the terminal
2102/// event and then returns. Otherwise the broadcast channel would block
2103/// forever after `Runner::run()` returns, because the runner itself keeps
2104/// holding the [`broadcast::Sender`] for post-run lookups.
2105pub async fn log_events(mut rx: broadcast::Receiver<Event>) {
2106    use broadcast::error::RecvError;
2107    loop {
2108        match rx.recv().await {
2109            Ok(event) => {
2110                let terminal = matches!(event, Event::RunFinished | Event::PhaseHalted { .. });
2111                log_event_line(&event);
2112                if terminal {
2113                    return;
2114                }
2115            }
2116            Err(RecvError::Closed) => return,
2117            Err(RecvError::Lagged(n)) => {
2118                eprintln!("[pitboss] (logger lagged: dropped {n} events)");
2119            }
2120        }
2121    }
2122}
2123
2124fn log_event_line(event: &Event) {
2125    use crate::style::{self, col};
2126    let c = style::use_color_stderr();
2127
2128    let fm = col(c, style::BOLD_CYAN, "[pitboss]");
2129
2130    match event {
2131        Event::PhaseStarted {
2132            phase_id,
2133            title,
2134            attempt,
2135        } => {
2136            let rule = col(c, style::DARK_GRAY, &"─".repeat(60));
2137            if c {
2138                eprintln!("{rule}");
2139            }
2140            eprintln!(
2141                "{} {}",
2142                col(c, style::BOLD_CYAN, "[pitboss]"),
2143                col(
2144                    c,
2145                    style::BOLD_WHITE,
2146                    &format!("phase {phase_id} ({title}), attempt {attempt}")
2147                )
2148            );
2149            if c {
2150                eprintln!("{rule}");
2151            }
2152        }
2153        Event::FixerStarted {
2154            phase_id,
2155            fixer_attempt,
2156            attempt,
2157        } => {
2158            eprintln!(
2159                "{fm} {}",
2160                col(
2161                    c,
2162                    style::YELLOW,
2163                    &format!(
2164                        "phase {phase_id} fixer attempt {fixer_attempt} (total dispatch {attempt})"
2165                    )
2166                )
2167            );
2168        }
2169        Event::AuditorStarted { context, attempt } => {
2170            let label = match context.kind {
2171                AuditContextKind::Phase => format!(
2172                    "phase {} auditor (total dispatch {attempt})",
2173                    context.phase_id
2174                ),
2175                AuditContextKind::Sweep => format!(
2176                    "sweep after phase {} auditor (total dispatch {attempt})",
2177                    context.phase_id
2178                ),
2179            };
2180            eprintln!("{fm} {}", col(c, style::BLUE, &label));
2181        }
2182        Event::AuditorSkippedNoChanges { context } => {
2183            let label = match context.kind {
2184                AuditContextKind::Phase => format!(
2185                    "phase {} auditor skipped: no code changes to audit",
2186                    context.phase_id
2187                ),
2188                AuditContextKind::Sweep => format!(
2189                    "sweep after phase {} auditor skipped: no code changes to audit",
2190                    context.phase_id
2191                ),
2192            };
2193            eprintln!("{fm} {}", col(c, style::DIM, &label));
2194        }
2195        Event::AgentStdout(line) => {
2196            eprintln!("{} {line}", col(c, style::DIM, "[agent]"));
2197        }
2198        Event::AgentStderr(line) => {
2199            eprintln!(
2200                "{} {}",
2201                col(c, style::RED, "[agent:err]"),
2202                col(c, style::RED, line)
2203            );
2204        }
2205        Event::AgentToolUse(name) => {
2206            eprintln!(
2207                "{}",
2208                col(c, style::DARK_GRAY, &format!("[agent:tool] {name}"))
2209            );
2210        }
2211        Event::TestStarted => {
2212            eprintln!("{fm} {}", col(c, style::MAGENTA, "running tests"));
2213        }
2214        Event::TestFinished { passed, summary } => {
2215            if *passed {
2216                eprintln!(
2217                    "{fm} {}",
2218                    col(c, style::BOLD_GREEN, &format!("tests passed: {summary}"))
2219                );
2220            } else {
2221                eprintln!(
2222                    "{fm} {}",
2223                    col(c, style::BOLD_RED, &format!("tests failed: {summary}"))
2224                );
2225            }
2226        }
2227        Event::TestsSkipped => {
2228            eprintln!(
2229                "{fm} {}",
2230                col(c, style::DIM, "no test runner detected; skipping")
2231            );
2232        }
2233        Event::PhaseCommitted {
2234            phase_id,
2235            commit: Some(hash),
2236        } => {
2237            eprintln!(
2238                "{fm} {}",
2239                col(
2240                    c,
2241                    style::GREEN,
2242                    &format!("phase {phase_id} committed: {hash}")
2243                )
2244            );
2245        }
2246        Event::PhaseCommitted {
2247            phase_id,
2248            commit: None,
2249        } => {
2250            eprintln!(
2251                "{fm} {}",
2252                col(
2253                    c,
2254                    style::DIM,
2255                    &format!("phase {phase_id} produced no code changes; no commit")
2256                )
2257            );
2258        }
2259        Event::PhaseHalted { phase_id, reason } => {
2260            eprintln!(
2261                "{} {}",
2262                col(c, style::BOLD_RED, "[pitboss]"),
2263                col(
2264                    c,
2265                    style::BOLD_RED,
2266                    &format!("phase {phase_id} halted: {reason}")
2267                )
2268            );
2269        }
2270        Event::RunFinished => {
2271            let rule = col(c, style::BOLD_GREEN, &"─".repeat(60));
2272            if c {
2273                eprintln!("{rule}");
2274            }
2275            eprintln!("{}", col(c, style::BOLD_GREEN, "[pitboss] run finished"));
2276            if c {
2277                eprintln!("{rule}");
2278            }
2279        }
2280        Event::UsageUpdated(_) => {
2281            // Snapshot consumed by the TUI; the plain logger doesn't surface
2282            // running token totals to keep stderr clean.
2283        }
2284        Event::SweepStarted {
2285            after,
2286            items_pending,
2287            attempt,
2288        } => {
2289            eprintln!(
2290                "{fm} {}",
2291                col(
2292                    c,
2293                    style::BOLD_CYAN,
2294                    &format!(
2295                        "sweep after phase {after} ({items_pending} pending, total dispatch {attempt})"
2296                    )
2297                )
2298            );
2299        }
2300        Event::SweepCompleted {
2301            after,
2302            resolved,
2303            commit: Some(hash),
2304        } => {
2305            eprintln!(
2306                "{fm} {}",
2307                col(
2308                    c,
2309                    style::GREEN,
2310                    &format!("sweep after phase {after} committed: {resolved} resolved ({hash})")
2311                )
2312            );
2313        }
2314        Event::SweepCompleted {
2315            after,
2316            resolved,
2317            commit: None,
2318        } => {
2319            eprintln!(
2320                "{fm} {}",
2321                col(
2322                    c,
2323                    style::DIM,
2324                    &format!(
2325                        "sweep after phase {after}: {resolved} resolved; no code changes to commit"
2326                    )
2327                )
2328            );
2329        }
2330        Event::SweepHalted { after, reason } => {
2331            eprintln!(
2332                "{} {}",
2333                col(c, style::BOLD_RED, "[pitboss]"),
2334                col(
2335                    c,
2336                    style::BOLD_RED,
2337                    &format!("sweep after phase {after} halted: {reason}")
2338                )
2339            );
2340        }
2341        Event::DeferredItemStale { text, attempts } => {
2342            eprintln!(
2343                "{fm} {}",
2344                col(
2345                    c,
2346                    style::BOLD_YELLOW,
2347                    &format!(
2348                        "deferred item stale ({attempts} sweep attempts; needs human attention): {text}"
2349                    )
2350                )
2351            );
2352        }
2353    }
2354}
2355
2356#[cfg(test)]
2357mod tests {
2358    use super::*;
2359
2360    fn pid(s: &str) -> PhaseId {
2361        PhaseId::parse(s).unwrap()
2362    }
2363
2364    #[test]
2365    fn fresh_run_state_uses_branch_prefix_and_timestamp() {
2366        let plan = Plan::new(
2367            pid("01"),
2368            vec![crate::plan::Phase {
2369                id: pid("01"),
2370                title: "First".into(),
2371                body: String::new(),
2372            }],
2373        );
2374        let cfg = Config::default();
2375        let now = chrono::DateTime::parse_from_rfc3339("2026-04-29T14:30:22Z")
2376            .unwrap()
2377            .with_timezone(&Utc);
2378        let state = fresh_run_state(&plan, &cfg, now);
2379        assert_eq!(state.run_id, "20260429T143022Z");
2380        assert_eq!(state.branch, "pitboss/play/20260429T143022Z");
2381        assert_eq!(state.started_phase, pid("01"));
2382        assert_eq!(state.started_at, now);
2383        assert!(state.completed.is_empty());
2384    }
2385
2386    #[test]
2387    fn halt_reason_display_summaries_are_human_readable() {
2388        assert_eq!(
2389            HaltReason::PlanTampered.to_string(),
2390            "plan.md was modified by the agent"
2391        );
2392        assert!(HaltReason::DeferredInvalid("bad".into())
2393            .to_string()
2394            .contains("deferred.md"));
2395        assert!(HaltReason::TestsFailed("nope".into())
2396            .to_string()
2397            .contains("tests failed"));
2398        assert!(HaltReason::AgentFailure("boom".into())
2399            .to_string()
2400            .contains("boom"));
2401    }
2402}