use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use tokio::sync::Mutex as TokioMutex;
use tracing::{debug, warn};
use crate::git::{CommitId, Git, ShellGit};
use super::prompt::PromptDoc;
use super::run_dir::{RunPaths, SessionStatus};
const PER_SESSION_SCRATCHPAD: &str = "scratchpad.md";
pub struct SessionWorktree {
seq: u32,
path: PathBuf,
branch: String,
failed_root: PathBuf,
scratchpad_path: PathBuf,
seed_scratchpad: String,
worktree_git: ShellGit,
}
impl SessionWorktree {
pub async fn create(
repo_git: &dyn Git,
run_paths: &RunPaths,
run_id: &str,
run_branch: &str,
seq: u32,
scratchpad_seed: &str,
) -> Result<Self> {
let path = run_paths.worktrees.join(format!("session-{seq:04}"));
let branch = session_branch_name(run_id, seq);
let failed_root = run_paths.worktrees.join("failed");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("worktree: ensuring parent dir {:?}", parent))?;
}
repo_git
.add_worktree(&path, &branch, run_branch)
.await
.with_context(|| {
format!(
"worktree: creating session-{seq:04} branch {branch:?} at {:?}",
path
)
})?;
let scratchpad_path = path.join(PER_SESSION_SCRATCHPAD);
std::fs::write(&scratchpad_path, scratchpad_seed).with_context(|| {
format!(
"worktree: seeding scratchpad at {:?} ({} bytes)",
scratchpad_path,
scratchpad_seed.len()
)
})?;
let worktree_git = ShellGit::new(path.clone());
Ok(Self {
seq,
path,
branch,
failed_root,
scratchpad_path,
seed_scratchpad: scratchpad_seed.to_string(),
worktree_git,
})
}
pub fn seq(&self) -> u32 {
self.seq
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn branch(&self) -> &str {
&self.branch
}
pub fn scratchpad_path(&self) -> &Path {
&self.scratchpad_path
}
pub fn scratchpad_seed(&self) -> &str {
&self.seed_scratchpad
}
pub fn worktree_git(&self) -> &ShellGit {
&self.worktree_git
}
pub async fn cleanup(&self, repo_git: &dyn Git) -> Result<()> {
repo_git
.remove_worktree(&self.path)
.await
.with_context(|| format!("worktree: removing {:?}", self.path))?;
repo_git
.delete_branch(&self.branch)
.await
.with_context(|| format!("worktree: deleting branch {:?}", self.branch))?;
Ok(())
}
pub async fn quarantine(&self, repo_git: &dyn Git) -> Result<PathBuf> {
std::fs::create_dir_all(&self.failed_root)
.with_context(|| format!("worktree: creating {:?}", self.failed_root))?;
let dest = self.failed_root.join(format!("session-{:04}", self.seq));
if dest.exists() {
std::fs::remove_dir_all(&dest)
.with_context(|| format!("worktree: clearing stale {:?}", dest))?;
}
if self.path.exists() {
std::fs::rename(&self.path, &dest)
.with_context(|| format!("worktree: quarantining {:?} to {:?}", self.path, dest))?;
}
let _ = repo_git.remove_worktree(&self.path).await;
repo_git
.delete_branch(&self.branch)
.await
.with_context(|| format!("worktree: deleting branch {:?}", self.branch))?;
Ok(dest)
}
#[allow(clippy::too_many_arguments)]
pub async fn merge_into<G: Git + ?Sized>(
&self,
repo_git: &G,
run_branch: &str,
run_branch_lock: &TokioMutex<()>,
prompt: &PromptDoc,
run_id: &str,
starting_status: SessionStatus,
starting_summary: String,
) -> Result<MergeOutcome> {
let g = self.worktree_git();
let _guard = run_branch_lock.lock().await;
let mut status = starting_status;
let mut summary = starting_summary;
let mut commit: Option<CommitId> = None;
let mut sync_ok = true;
if status == SessionStatus::Ok
|| status == SessionStatus::Error
|| status == SessionStatus::Skipped
{
if let Err(e) = g.merge_ff_only(run_branch).await {
warn!(
run_id = %run_id,
seq = self.seq,
error = %format!("{e:#}"),
prompt = %prompt.meta.name,
"grind: parallel_safe contract violation (worktree sync)"
);
status = SessionStatus::Error;
summary = parallel_safe_violation_summary(
&prompt.meta.name,
ParallelSafeViolationSite::WorktreeSync,
);
sync_ok = false;
}
}
let pitboss_rel = Path::new(".pitboss");
let scratchpad_rel = Path::new(PER_SESSION_SCRATCHPAD);
let parallel_exclusions: [&Path; 2] = [pitboss_rel, scratchpad_rel];
if sync_ok {
commit = match status {
SessionStatus::Ok | SessionStatus::Error | SessionStatus::Skipped => {
try_commit_session(g, self.seq, prompt, run_id, ¶llel_exclusions).await?
}
_ => None,
};
}
if sync_ok && commit.is_some() {
if let Err(e) = repo_git.merge_ff_only(self.branch()).await {
warn!(
run_id = %run_id,
seq = self.seq,
error = %format!("{e:#}"),
prompt = %prompt.meta.name,
"grind: parallel_safe contract violation (run-branch ff)"
);
status = SessionStatus::Error;
summary = parallel_safe_violation_summary(
&prompt.meta.name,
ParallelSafeViolationSite::RunBranchMerge,
);
commit = None;
}
}
if sync_ok {
let stash_label = format!("grind/{}/session-{:04}-leftover", run_id, self.seq);
match g.stash_push(&stash_label, ¶llel_exclusions).await {
Ok(true) => {
warn!(
run_id = %run_id,
seq = self.seq,
stash = %stash_label,
"grind: leftover changes stashed (parallel)"
);
if status == SessionStatus::Ok {
status = SessionStatus::Dirty;
}
}
Ok(false) => {}
Err(e) => {
warn!(
run_id = %run_id,
seq = self.seq,
error = %format!("{e:#}"),
"grind: stash_push failed (parallel) — treating as merge conflict"
);
if status == SessionStatus::Ok || status == SessionStatus::Dirty {
status = SessionStatus::Error;
summary = merge_conflict_summary(&prompt.meta.name, &e);
}
}
}
}
Ok(MergeOutcome {
status,
summary,
commit,
sync_ok,
})
}
}
#[derive(Debug, Clone)]
pub struct MergeOutcome {
pub status: SessionStatus,
pub summary: String,
pub commit: Option<CommitId>,
pub sync_ok: bool,
}
pub(crate) async fn try_commit_session<G: Git + ?Sized>(
git: &G,
seq: u32,
prompt: &PromptDoc,
run_id: &str,
exclude: &[&Path],
) -> Result<Option<CommitId>> {
git.stage_changes(exclude)
.await
.with_context(|| format!("grind: staging session {seq} changes"))?;
let has_staged = git
.has_staged_changes()
.await
.with_context(|| format!("grind: checking staged changes for session {seq}"))?;
if !has_staged {
debug!(seq, prompt = %prompt.meta.name, "grind: no code changes to commit");
return Ok(None);
}
let message = format!(
"[pitboss/grind] {} session-{:04} ({})",
prompt.meta.name, seq, run_id,
);
let id = git
.commit(&message)
.await
.with_context(|| format!("grind: committing session {seq}"))?;
Ok(Some(id))
}
pub(crate) fn merge_conflict_summary(prompt_name: &str, err: &anyhow::Error) -> String {
let one_line = format!("{err:#}")
.lines()
.next()
.unwrap_or("")
.trim()
.to_string();
if one_line.is_empty() {
format!("merge conflict at session end (prompt {prompt_name})")
} else {
format!("merge conflict at session end (prompt {prompt_name}): {one_line}")
}
}
pub async fn sweep_stale_session_worktrees(
repo_git: &dyn Git,
run_paths: &RunPaths,
run_id: &str,
last_session_seq: u32,
) -> usize {
let Ok(read_dir) = std::fs::read_dir(&run_paths.worktrees) else {
return 0;
};
let mut removed = 0usize;
for entry in read_dir.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if name == "failed" {
continue;
}
let Some(seq_str) = name.strip_prefix("session-") else {
continue;
};
let Ok(seq) = seq_str.parse::<u32>() else {
continue;
};
if seq <= last_session_seq {
continue;
}
let branch = session_branch_name(run_id, seq);
if let Err(e) = repo_git.remove_worktree(&path).await {
warn!(
run_id = %run_id,
seq,
path = %path.display(),
error = %format!("{e:#}"),
"grind: resume sweep: remove_worktree failed"
);
}
if path.exists() {
if let Err(e) = std::fs::remove_dir_all(&path) {
warn!(
run_id = %run_id,
seq,
path = %path.display(),
error = %format!("{e:#}"),
"grind: resume sweep: remove_dir_all failed"
);
continue;
}
}
if let Err(e) = repo_git.delete_branch(&branch).await {
warn!(
run_id = %run_id,
seq,
branch = %branch,
error = %format!("{e:#}"),
"grind: resume sweep: delete_branch failed"
);
}
removed += 1;
}
removed
}
pub fn session_branch_name(run_id: &str, seq: u32) -> String {
format!("pitboss/grind/{run_id}-session-{seq:04}")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParallelSafeViolationSite {
WorktreeSync,
RunBranchMerge,
}
impl ParallelSafeViolationSite {
fn label(self) -> &'static str {
match self {
ParallelSafeViolationSite::WorktreeSync => "worktree sync",
ParallelSafeViolationSite::RunBranchMerge => "run-branch merge",
}
}
}
pub fn parallel_safe_violation_summary(
prompt_name: &str,
site: ParallelSafeViolationSite,
) -> String {
format!(
"parallel_safe contract violated by prompt {prompt_name} ({})",
site.label()
)
}
pub fn merge_scratchpad_into_run(
run_scratchpad_path: &Path,
session_view: &str,
seed: &str,
seq: u32,
) -> Result<()> {
if session_view == seed {
return Ok(());
}
let current_run = std::fs::read_to_string(run_scratchpad_path).unwrap_or_default();
let new_content = if current_run == seed {
session_view.to_string()
} else {
let mut buf = current_run;
if !buf.is_empty() && !buf.ends_with('\n') {
buf.push('\n');
}
buf.push_str(&format!("<!-- pitboss:session-{seq:04} -->\n"));
if !session_view.is_empty() {
buf.push_str(session_view);
if !session_view.ends_with('\n') {
buf.push('\n');
}
}
buf.push_str(&format!("<!-- /pitboss:session-{seq:04} -->\n"));
buf
};
std::fs::write(run_scratchpad_path, new_content)
.with_context(|| format!("scratchpad merge: writing {:?}", run_scratchpad_path))
.map_err(|e| anyhow!(e))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn session_branch_name_uses_canonical_pattern() {
assert_eq!(
session_branch_name("20260430T120000Z-aaaa", 7),
"pitboss/grind/20260430T120000Z-aaaa-session-0007"
);
}
#[tokio::test]
async fn sweep_removes_only_seqs_above_last_session_seq() {
use crate::git::{MockGit, MockOp};
let dir = tempfile::tempdir().unwrap();
let run_root = dir.path().join("run-root");
std::fs::create_dir_all(&run_root).unwrap();
let paths = RunPaths {
root: run_root.clone(),
sessions_jsonl: run_root.join("sessions.jsonl"),
sessions_md: run_root.join("sessions.md"),
scratchpad: run_root.join("scratchpad.md"),
transcripts: run_root.join("transcripts"),
worktrees: run_root.join("worktrees"),
state: run_root.join("state.json"),
};
std::fs::create_dir_all(&paths.worktrees).unwrap();
for seq in 1..=5u32 {
let p = paths.worktrees.join(format!("session-{seq:04}"));
std::fs::create_dir_all(&p).unwrap();
std::fs::write(p.join("marker"), format!("{seq}")).unwrap();
}
let failed = paths.worktrees.join("failed").join("session-0009");
std::fs::create_dir_all(&failed).unwrap();
std::fs::write(failed.join("marker"), b"forensic").unwrap();
std::fs::create_dir_all(paths.worktrees.join("scratch")).unwrap();
std::fs::write(paths.worktrees.join("loose-file"), b"x").unwrap();
let git = MockGit::new();
let removed = sweep_stale_session_worktrees(&git, &paths, "rid", 2).await;
assert_eq!(removed, 3, "expected to remove session-0003..0005");
for seq in 1..=2u32 {
assert!(
paths.worktrees.join(format!("session-{seq:04}")).exists(),
"session-{seq:04} should still exist"
);
}
for seq in 3..=5u32 {
assert!(
!paths.worktrees.join(format!("session-{seq:04}")).exists(),
"session-{seq:04} should have been swept"
);
}
assert!(failed.exists(), "failed/ entry should survive sweep");
assert!(paths.worktrees.join("scratch").exists());
assert!(paths.worktrees.join("loose-file").exists());
let ops = git.ops();
let removes: Vec<&PathBuf> = ops
.iter()
.filter_map(|op| match op {
MockOp::RemoveWorktree(p) => Some(p),
_ => None,
})
.collect();
let deletes: Vec<&String> = ops
.iter()
.filter_map(|op| match op {
MockOp::DeleteBranch(b) => Some(b),
_ => None,
})
.collect();
assert_eq!(removes.len(), 3);
assert_eq!(deletes.len(), 3);
assert!(deletes
.iter()
.any(|b| *b == "pitboss/grind/rid-session-0003"));
assert!(deletes
.iter()
.any(|b| *b == "pitboss/grind/rid-session-0004"));
assert!(deletes
.iter()
.any(|b| *b == "pitboss/grind/rid-session-0005"));
}
#[tokio::test]
async fn sweep_no_op_when_worktrees_dir_missing() {
use crate::git::MockGit;
let dir = tempfile::tempdir().unwrap();
let run_root = dir.path().join("run-root");
std::fs::create_dir_all(&run_root).unwrap();
let paths = RunPaths {
root: run_root.clone(),
sessions_jsonl: run_root.join("sessions.jsonl"),
sessions_md: run_root.join("sessions.md"),
scratchpad: run_root.join("scratchpad.md"),
transcripts: run_root.join("transcripts"),
worktrees: run_root.join("worktrees"),
state: run_root.join("state.json"),
};
let git = MockGit::new();
let removed = sweep_stale_session_worktrees(&git, &paths, "rid", 0).await;
assert_eq!(removed, 0);
assert!(git.ops().is_empty());
}
#[test]
fn merge_conflict_summary_names_prompt_and_includes_first_error_line() {
let err = anyhow!("stash failed\nstderr: CONFLICT (content)");
let s = merge_conflict_summary("fp-hunter", &err);
assert!(s.contains("fp-hunter"), "summary missing prompt name: {s}");
assert!(s.contains("merge conflict"), "summary missing label: {s}");
assert!(
s.contains("stash failed"),
"summary missing first err line: {s}"
);
assert!(
!s.contains("CONFLICT"),
"summary leaked tail of multi-line err: {s}"
);
}
#[test]
fn parallel_safe_violation_summary_names_prompt_and_site() {
assert_eq!(
parallel_safe_violation_summary("fp-hunter", ParallelSafeViolationSite::WorktreeSync),
"parallel_safe contract violated by prompt fp-hunter (worktree sync)"
);
assert_eq!(
parallel_safe_violation_summary("fp-hunter", ParallelSafeViolationSite::RunBranchMerge),
"parallel_safe contract violated by prompt fp-hunter (run-branch merge)"
);
}
#[test]
fn merge_scratchpad_noop_when_session_did_not_touch_view() {
let dir = tempfile::tempdir().unwrap();
let run_pad = dir.path().join("scratchpad.md");
std::fs::write(&run_pad, "run state\n").unwrap();
let seed = "seed\n";
merge_scratchpad_into_run(&run_pad, seed, seed, 1).unwrap();
assert_eq!(std::fs::read_to_string(&run_pad).unwrap(), "run state\n");
}
#[test]
fn merge_scratchpad_fast_merges_when_run_level_is_unchanged() {
let dir = tempfile::tempdir().unwrap();
let run_pad = dir.path().join("scratchpad.md");
std::fs::write(&run_pad, "seed\n").unwrap();
let seed = "seed\n";
let session_view = "seed\n- session 1 added a line\n";
merge_scratchpad_into_run(&run_pad, session_view, seed, 1).unwrap();
assert_eq!(std::fs::read_to_string(&run_pad).unwrap(), session_view);
}
#[test]
fn merge_scratchpad_appends_labeled_view_on_concurrent_modification() {
let dir = tempfile::tempdir().unwrap();
let run_pad = dir.path().join("scratchpad.md");
std::fs::write(&run_pad, "seed\n- session 1 added a line\n").unwrap();
let seed = "seed\n";
let session_view = "seed\n- session 2 added a different line\n";
merge_scratchpad_into_run(&run_pad, session_view, seed, 2).unwrap();
let after = std::fs::read_to_string(&run_pad).unwrap();
assert!(after.contains("- session 1 added a line"));
assert!(after.contains("<!-- pitboss:session-0002 -->"));
assert!(after.contains("- session 2 added a different line"));
assert!(after.contains("<!-- /pitboss:session-0002 -->"));
}
}