use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::util::paths::grind_runs_dir;
use crate::util::write_atomic;
use super::budget::BudgetSnapshot;
use super::plan::GrindPlan;
use super::prompt::PromptDoc;
use super::run_dir::{RunPaths, SessionRecord, SessionStatus, STATE_FILENAME};
use super::scheduler::{Scheduler, SchedulerState};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RunStatus {
Active,
Completed,
Aborted,
Failed,
}
impl RunStatus {
pub fn is_resumable(self) -> bool {
matches!(self, RunStatus::Active | RunStatus::Aborted)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RunState {
pub run_id: String,
pub branch: String,
pub plan_name: String,
pub prompt_names: Vec<String>,
pub scheduler_state: SchedulerState,
pub budget_consumed: BudgetSnapshot,
pub last_session_seq: u32,
pub started_at: DateTime<Utc>,
pub last_updated_at: DateTime<Utc>,
pub status: RunStatus,
}
impl RunState {
pub fn write(&self, paths: &RunPaths) -> Result<()> {
let mut bytes =
serde_json::to_vec_pretty(self).context("grind state: serializing RunState")?;
bytes.push(b'\n');
write_atomic(&paths.state, &bytes)?;
Ok(())
}
pub fn read(paths: &RunPaths) -> Result<Self> {
Self::read_path(&paths.state)
}
pub fn read_path(path: &Path) -> Result<Self> {
let raw = fs::read_to_string(path)
.with_context(|| format!("grind state: reading {}", path.display()))?;
let parsed: RunState = serde_json::from_str(&raw)
.with_context(|| format!("grind state: parsing {}", path.display()))?;
Ok(parsed)
}
}
#[derive(Debug, Clone)]
pub struct RunListing {
pub run_id: String,
pub state_path: PathBuf,
pub state: RunState,
}
pub fn list_runs(repo_root: &Path) -> Vec<RunListing> {
let grind_root = grind_runs_dir(repo_root);
let entries = match fs::read_dir(&grind_root) {
Ok(it) => it,
Err(_) => return Vec::new(),
};
let mut out: Vec<RunListing> = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let state_path = path.join(STATE_FILENAME);
let Ok(state) = RunState::read_path(&state_path) else {
continue;
};
let run_id = match path.file_name().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
out.push(RunListing {
run_id,
state_path,
state,
});
}
out.sort_by_key(|b| std::cmp::Reverse(b.state.last_updated_at));
out
}
pub fn most_recent_resumable(repo_root: &Path) -> Option<RunListing> {
list_runs(repo_root)
.into_iter()
.find(|r| r.state.status.is_resumable())
}
#[derive(Debug, thiserror::Error)]
pub enum ResumeError {
#[error("no resumable grind run found under {dir}")]
NoResumableRun {
dir: PathBuf,
},
#[error("grind run {run_id:?} not found at {dir}")]
RunNotFound {
run_id: String,
dir: PathBuf,
},
#[error("grind run {run_id:?}: failed to read state: {source:#}")]
StateUnreadable {
run_id: String,
#[source]
source: anyhow::Error,
},
#[error(
"grind run {run_id:?} is {status:?} and cannot be resumed; start a new run with `pitboss grind`"
)]
NotResumable {
run_id: String,
status: RunStatus,
},
#[error(
"grind run {run_id:?}: plan name changed (was {original:?}, now {current:?}); start a new run with `pitboss grind`"
)]
PlanRenamed {
run_id: String,
original: String,
current: String,
},
#[error(
"grind run {run_id:?}: prompt set changed (added: {added:?}, removed: {removed:?}); start a new run with `pitboss grind`"
)]
PromptSetChanged {
run_id: String,
added: Vec<String>,
removed: Vec<String>,
},
#[error(
"grind run {run_id:?}: state.json out of sync with sessions.jsonl (state says \
last_session_seq={state_seq}, log tail has {jsonl_seq}); start a new run or repair \
state.json by hand"
)]
StateOutOfSync {
run_id: String,
state_seq: u32,
jsonl_seq: u32,
},
}
pub fn diff_prompt_names(
original: &[String],
current: &[String],
) -> Option<(Vec<String>, Vec<String>)> {
let mut a: Vec<String> = original.to_vec();
a.sort();
a.dedup();
let mut b: Vec<String> = current.to_vec();
b.sort();
b.dedup();
if a == b {
return None;
}
let added: Vec<String> = b.iter().filter(|n| !a.contains(n)).cloned().collect();
let removed: Vec<String> = a.iter().filter(|n| !b.contains(n)).cloned().collect();
Some((added, removed))
}
pub fn resolve_target(
repo_root: &Path,
requested: Option<&str>,
) -> Result<RunListing, ResumeError> {
let grind_root = grind_runs_dir(repo_root);
match requested {
Some(id) => {
let dir = grind_root.join(id);
if !dir.is_dir() {
return Err(ResumeError::RunNotFound {
run_id: id.to_string(),
dir,
});
}
let state_path = dir.join(STATE_FILENAME);
let state =
RunState::read_path(&state_path).map_err(|e| ResumeError::StateUnreadable {
run_id: id.to_string(),
source: e,
})?;
Ok(RunListing {
run_id: id.to_string(),
state_path,
state,
})
}
None => {
most_recent_resumable(repo_root).ok_or(ResumeError::NoResumableRun { dir: grind_root })
}
}
}
pub fn validate_resume(
listing: RunListing,
current_plan_name: &str,
current_prompt_names: &[String],
) -> Result<RunListing, ResumeError> {
if !listing.state.status.is_resumable() {
return Err(ResumeError::NotResumable {
run_id: listing.run_id,
status: listing.state.status,
});
}
if listing.state.plan_name != current_plan_name {
return Err(ResumeError::PlanRenamed {
run_id: listing.run_id,
original: listing.state.plan_name,
current: current_plan_name.to_string(),
});
}
if let Some((added, removed)) =
diff_prompt_names(&listing.state.prompt_names, current_prompt_names)
{
return Err(ResumeError::PromptSetChanged {
run_id: listing.run_id,
added,
removed,
});
}
Ok(listing)
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReconciledState {
pub scheduler_state: SchedulerState,
pub budget_consumed: BudgetSnapshot,
pub last_session_seq: u32,
pub records_replayed: usize,
}
pub fn reconstruct_state_from_log(
state: &RunState,
log_records: &[SessionRecord],
plan: &GrindPlan,
prompts: &BTreeMap<String, PromptDoc>,
) -> Result<ReconciledState, ResumeError> {
let jsonl_seq = log_records.iter().map(|r| r.seq).max().unwrap_or(0);
if jsonl_seq == state.last_session_seq {
return Ok(ReconciledState {
scheduler_state: state.scheduler_state.clone(),
budget_consumed: state.budget_consumed,
last_session_seq: state.last_session_seq,
records_replayed: 0,
});
}
if jsonl_seq < state.last_session_seq {
return Err(ResumeError::StateOutOfSync {
run_id: state.run_id.clone(),
state_seq: state.last_session_seq,
jsonl_seq,
});
}
let mut missing: Vec<&SessionRecord> = log_records
.iter()
.filter(|r| r.seq > state.last_session_seq)
.collect();
missing.sort_by_key(|r| r.seq);
let mut sched =
Scheduler::with_state(plan.clone(), prompts.clone(), state.scheduler_state.clone());
let mut budget = state.budget_consumed;
for rec in &missing {
let picked = sched.next();
match picked {
Some(p) if p.meta.name == rec.prompt => {
sched.record_run(&p.meta.name);
}
_ => {
return Err(ResumeError::StateOutOfSync {
run_id: state.run_id.clone(),
state_seq: state.last_session_seq,
jsonl_seq,
});
}
}
budget.iterations = budget.iterations.saturating_add(1);
budget.tokens_input = budget.tokens_input.saturating_add(rec.tokens.input);
budget.tokens_output = budget.tokens_output.saturating_add(rec.tokens.output);
budget.cost_usd += rec.cost_usd;
match rec.status {
SessionStatus::Ok | SessionStatus::Dirty => {
budget.consecutive_failures = 0;
}
SessionStatus::Error | SessionStatus::Timeout => {
budget.consecutive_failures = budget.consecutive_failures.saturating_add(1);
}
SessionStatus::Aborted | SessionStatus::Skipped => {}
}
}
Ok(ReconciledState {
scheduler_state: sched.state().clone(),
budget_consumed: budget,
last_session_seq: jsonl_seq,
records_replayed: missing.len(),
})
}
#[allow(clippy::too_many_arguments)]
pub fn build_state(
run_id: String,
branch: String,
plan_name: String,
prompt_names: Vec<String>,
scheduler_state: SchedulerState,
budget_consumed: BudgetSnapshot,
last_session_seq: u32,
started_at: DateTime<Utc>,
status: RunStatus,
) -> RunState {
RunState {
run_id,
branch,
plan_name,
prompt_names,
scheduler_state,
budget_consumed,
last_session_seq,
started_at,
last_updated_at: Utc::now(),
status,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::grind::run_dir::RunPaths;
use std::collections::BTreeMap;
use tempfile::tempdir;
fn fixture_state(run_id: &str, status: RunStatus) -> RunState {
let mut runs = BTreeMap::new();
runs.insert("alpha".to_string(), 2u32);
runs.insert("bravo".to_string(), 1u32);
RunState {
run_id: run_id.to_string(),
branch: format!("pitboss/grind/{run_id}"),
plan_name: "default".into(),
prompt_names: vec!["alpha".into(), "bravo".into()],
scheduler_state: SchedulerState {
rotation: 3,
runs_per_prompt: runs,
},
budget_consumed: BudgetSnapshot {
iterations: 3,
tokens_input: 1500,
tokens_output: 750,
cost_usd: 0.045,
consecutive_failures: 0,
},
last_session_seq: 3,
started_at: "2026-04-30T17:00:00Z".parse().unwrap(),
last_updated_at: "2026-04-30T17:30:00Z".parse().unwrap(),
status,
}
}
#[test]
fn round_trips_through_disk() {
let repo = tempdir().unwrap();
let run_id = "20260430T180000Z-rt00";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let state = fixture_state(run_id, RunStatus::Active);
state.write(&paths).unwrap();
let back = RunState::read(&paths).unwrap();
assert_eq!(back, state);
}
#[test]
fn malformed_state_is_rejected_with_path_in_error() {
let dir = tempdir().unwrap();
let path = dir.path().join("state.json");
fs::write(&path, "{not json").unwrap();
let err = RunState::read_path(&path).unwrap_err();
assert!(
err.to_string().contains("state.json"),
"expected error to surface path, got: {err}"
);
}
#[test]
fn list_runs_returns_runs_sorted_by_last_updated_desc() {
let repo = tempdir().unwrap();
for (run_id, ts) in [
("rid-old", "2026-04-30T10:00:00Z"),
("rid-new", "2026-04-30T18:00:00Z"),
("rid-mid", "2026-04-30T15:00:00Z"),
] {
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let mut s = fixture_state(run_id, RunStatus::Active);
s.last_updated_at = ts.parse().unwrap();
s.write(&paths).unwrap();
}
let listings = list_runs(repo.path());
let ids: Vec<&str> = listings.iter().map(|l| l.run_id.as_str()).collect();
assert_eq!(ids, vec!["rid-new", "rid-mid", "rid-old"]);
}
#[test]
fn most_recent_resumable_skips_terminal_runs() {
let repo = tempdir().unwrap();
for (run_id, ts, status) in [
(
"rid-completed",
"2026-04-30T18:00:00Z",
RunStatus::Completed,
),
("rid-aborted", "2026-04-30T17:00:00Z", RunStatus::Aborted),
("rid-active", "2026-04-30T16:00:00Z", RunStatus::Active),
] {
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let mut s = fixture_state(run_id, status);
s.last_updated_at = ts.parse().unwrap();
s.write(&paths).unwrap();
}
let pick = most_recent_resumable(repo.path()).unwrap();
assert_eq!(pick.run_id, "rid-aborted");
}
#[test]
fn most_recent_resumable_returns_none_when_grind_dir_is_missing() {
let repo = tempdir().unwrap();
assert!(most_recent_resumable(repo.path()).is_none());
}
#[test]
fn resolve_target_explicit_run_not_found() {
let repo = tempdir().unwrap();
let err = resolve_target(repo.path(), Some("ghost")).unwrap_err();
assert!(matches!(err, ResumeError::RunNotFound { .. }));
}
#[test]
fn resolve_target_default_no_resumable() {
let repo = tempdir().unwrap();
let err = resolve_target(repo.path(), None).unwrap_err();
assert!(matches!(err, ResumeError::NoResumableRun { .. }));
}
#[test]
fn validate_resume_rejects_terminal_status() {
let repo = tempdir().unwrap();
let run_id = "rid";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let s = fixture_state(run_id, RunStatus::Completed);
s.write(&paths).unwrap();
let listing = resolve_target(repo.path(), Some(run_id)).unwrap();
let err =
validate_resume(listing, "default", &["alpha".into(), "bravo".into()]).unwrap_err();
assert!(matches!(err, ResumeError::NotResumable { .. }));
}
#[test]
fn validate_resume_detects_removed_prompt() {
let repo = tempdir().unwrap();
let run_id = "rid";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let s = fixture_state(run_id, RunStatus::Active);
s.write(&paths).unwrap();
let listing = resolve_target(repo.path(), Some(run_id)).unwrap();
let err = validate_resume(listing, "default", &["alpha".into()]).unwrap_err();
match err {
ResumeError::PromptSetChanged { removed, added, .. } => {
assert_eq!(removed, vec!["bravo".to_string()]);
assert!(added.is_empty());
}
other => panic!("expected PromptSetChanged, got {other:?}"),
}
}
#[test]
fn validate_resume_detects_added_prompt() {
let repo = tempdir().unwrap();
let run_id = "rid";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let s = fixture_state(run_id, RunStatus::Active);
s.write(&paths).unwrap();
let listing = resolve_target(repo.path(), Some(run_id)).unwrap();
let err = validate_resume(
listing,
"default",
&["alpha".into(), "bravo".into(), "charlie".into()],
)
.unwrap_err();
match err {
ResumeError::PromptSetChanged { added, removed, .. } => {
assert_eq!(added, vec!["charlie".to_string()]);
assert!(removed.is_empty());
}
other => panic!("expected PromptSetChanged, got {other:?}"),
}
}
#[test]
fn validate_resume_accepts_unchanged_prompt_set() {
let repo = tempdir().unwrap();
let run_id = "rid";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let s = fixture_state(run_id, RunStatus::Active);
s.write(&paths).unwrap();
let listing = resolve_target(repo.path(), Some(run_id)).unwrap();
let ok = validate_resume(listing, "default", &["bravo".into(), "alpha".into()]).unwrap();
assert_eq!(ok.run_id, run_id);
}
#[test]
fn validate_resume_rejects_renamed_plan() {
let repo = tempdir().unwrap();
let run_id = "rid";
let paths = RunPaths::for_run(repo.path(), run_id);
fs::create_dir_all(&paths.root).unwrap();
let s = fixture_state(run_id, RunStatus::Active);
s.write(&paths).unwrap();
let listing = resolve_target(repo.path(), Some(run_id)).unwrap();
let err =
validate_resume(listing, "fp-cleanup", &["alpha".into(), "bravo".into()]).unwrap_err();
assert!(matches!(err, ResumeError::PlanRenamed { .. }));
}
#[test]
fn run_status_is_resumable_truth_table() {
assert!(RunStatus::Active.is_resumable());
assert!(RunStatus::Aborted.is_resumable());
assert!(!RunStatus::Completed.is_resumable());
assert!(!RunStatus::Failed.is_resumable());
}
#[test]
fn missing_state_json_is_skipped_in_listings() {
let repo = tempdir().unwrap();
let dir = repo.path().join(".pitboss/grind/runs/no-state");
fs::create_dir_all(&dir).unwrap();
assert!(list_runs(repo.path()).is_empty());
}
fn fixture_session_record(seq: u32) -> super::super::run_dir::SessionRecord {
fixture_session_record_named(seq, "alpha", SessionStatus::Ok, 0, 0, 0.0)
}
fn fixture_session_record_named(
seq: u32,
prompt: &str,
status: SessionStatus,
input: u64,
output: u64,
cost: f64,
) -> SessionRecord {
use crate::git::CommitId;
use crate::state::TokenUsage;
use std::collections::HashMap;
use std::path::PathBuf;
SessionRecord {
seq,
run_id: "rid".into(),
prompt: prompt.into(),
started_at: "2026-04-30T18:00:00Z".parse().unwrap(),
ended_at: "2026-04-30T18:01:00Z".parse().unwrap(),
status,
summary: Some(format!("session {seq}")),
commit: Some(CommitId::new(format!("abc{seq:040}"))),
tokens: TokenUsage {
input,
output,
by_role: HashMap::new(),
},
cost_usd: cost,
transcript_path: PathBuf::from(format!("transcripts/session-{seq:04}.log")),
}
}
fn fixture_prompt(name: &str) -> PromptDoc {
use crate::grind::prompt::PromptMeta;
PromptDoc {
meta: PromptMeta {
name: name.into(),
description: format!("desc for {name}"),
weight: 1,
every: 1,
max_runs: None,
verify: false,
parallel_safe: false,
tags: vec![],
max_session_seconds: None,
max_session_cost_usd: None,
},
body: format!("body for {name}"),
source_path: PathBuf::from(format!("/fixture/{name}.md")),
source_kind: crate::grind::prompt::PromptSource::Project,
}
}
fn fixture_plan_one_prompt(name: &str) -> (GrindPlan, BTreeMap<String, PromptDoc>) {
use crate::grind::plan::default_plan_from_dir;
let prompts = vec![fixture_prompt(name)];
let plan = default_plan_from_dir(&prompts);
let lookup: BTreeMap<String, PromptDoc> = prompts
.into_iter()
.map(|p| (p.meta.name.clone(), p))
.collect();
(plan, lookup)
}
fn fixture_state_aligned_with_log(records: &[SessionRecord]) -> RunState {
let mut state = fixture_state("rid", RunStatus::Active);
state.last_session_seq = records.iter().map(|r| r.seq).max().unwrap_or(0);
let mut runs: BTreeMap<String, u32> = BTreeMap::new();
for r in records {
*runs.entry(r.prompt.clone()).or_default() += 1;
}
state.scheduler_state = SchedulerState {
rotation: records.len() as u64,
runs_per_prompt: runs,
};
state
}
#[test]
fn reconstruct_returns_identity_when_state_matches_jsonl_tail() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records = vec![
fixture_session_record(1),
fixture_session_record(2),
fixture_session_record(3),
];
let state = fixture_state_aligned_with_log(&records);
let recon = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap();
assert_eq!(recon.records_replayed, 0);
assert_eq!(recon.last_session_seq, 3);
assert_eq!(recon.scheduler_state, state.scheduler_state);
}
#[test]
fn reconstruct_passes_on_empty_log_with_zero_state() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records: Vec<SessionRecord> = Vec::new();
let mut state = fixture_state("rid", RunStatus::Active);
state.last_session_seq = 0;
state.scheduler_state = SchedulerState::default();
let recon = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap();
assert_eq!(recon.records_replayed, 0);
assert_eq!(recon.last_session_seq, 0);
}
#[test]
fn reconstruct_replays_missing_records_when_jsonl_is_ahead() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records = vec![
fixture_session_record_named(1, "alpha", SessionStatus::Ok, 100, 50, 0.01),
fixture_session_record_named(2, "alpha", SessionStatus::Ok, 100, 50, 0.01),
fixture_session_record_named(3, "alpha", SessionStatus::Ok, 200, 100, 0.02),
];
let state = fixture_state_aligned_with_log(&records[..2]);
let original_budget = state.budget_consumed;
let recon = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap();
assert_eq!(recon.records_replayed, 1);
assert_eq!(recon.last_session_seq, 3);
assert_eq!(recon.scheduler_state.rotation, 3);
assert_eq!(recon.scheduler_state.runs_per_prompt.get("alpha"), Some(&3));
assert_eq!(
recon.budget_consumed.iterations,
original_budget.iterations + 1
);
assert_eq!(
recon.budget_consumed.tokens_input,
original_budget.tokens_input + 200
);
assert_eq!(
recon.budget_consumed.tokens_output,
original_budget.tokens_output + 100
);
assert!((recon.budget_consumed.cost_usd - (original_budget.cost_usd + 0.02)).abs() < 1e-9);
}
#[test]
fn reconstruct_resets_consecutive_failures_on_replayed_success() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records = vec![
fixture_session_record_named(1, "alpha", SessionStatus::Error, 0, 0, 0.0),
fixture_session_record_named(2, "alpha", SessionStatus::Ok, 0, 0, 0.0),
];
let mut state = fixture_state_aligned_with_log(&records[..1]);
state.budget_consumed.consecutive_failures = 1;
let recon = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap();
assert_eq!(recon.budget_consumed.consecutive_failures, 0);
}
#[test]
fn reconstruct_rejects_when_state_claims_more_than_jsonl_has() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records = vec![fixture_session_record(1)];
let mut state = fixture_state_aligned_with_log(&records);
state.last_session_seq = 5;
let err = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap_err();
assert!(matches!(err, ResumeError::StateOutOfSync { .. }));
}
#[test]
fn reconstruct_rejects_when_scheduler_diverges_from_recorded_prompt() {
let (plan, prompts) = fixture_plan_one_prompt("alpha");
let records = vec![
fixture_session_record_named(1, "alpha", SessionStatus::Ok, 0, 0, 0.0),
fixture_session_record_named(2, "ghost", SessionStatus::Ok, 0, 0, 0.0),
];
let state = fixture_state_aligned_with_log(&records[..1]);
let err = reconstruct_state_from_log(&state, &records, &plan, &prompts).unwrap_err();
assert!(matches!(err, ResumeError::StateOutOfSync { .. }));
}
}