use car_proto::{RunRecord, RunTermination};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
pub const DEFAULT_MAX_RUNS_PER_AGENT: usize = 50;
pub const DEFAULT_MAX_AGE_DAYS: i64 = 30;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RunStatus {
InProgress,
Completed,
Incomplete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunSummary {
pub run_id: String,
pub agent_id: String,
pub intent: String,
pub started_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ended_at: Option<DateTime<Utc>>,
pub status: RunStatus,
pub turn_count: usize,
}
#[derive(Debug, Clone, Copy)]
pub struct RetentionConfig {
pub max_per_agent: usize,
pub max_age_days: i64,
}
impl Default for RetentionConfig {
fn default() -> Self {
Self {
max_per_agent: DEFAULT_MAX_RUNS_PER_AGENT,
max_age_days: DEFAULT_MAX_AGE_DAYS,
}
}
}
#[derive(Debug, Clone, Default, Deserialize)]
struct RunsConfigFile {
#[serde(default)]
runs: RunsSection,
}
#[derive(Debug, Clone, Default, Deserialize)]
struct RunsSection {
#[serde(default)]
max_per_agent: Option<usize>,
#[serde(default)]
max_age_days: Option<i64>,
}
impl RetentionConfig {
pub fn from_car_dir(car_dir: &Path) -> Self {
let mut cfg = Self::default();
let path = car_dir.join("config.toml");
let Ok(text) = std::fs::read_to_string(&path) else {
return cfg;
};
let Ok(parsed) = toml::from_str::<RunsConfigFile>(&text) else {
return cfg;
};
if let Some(n) = parsed.runs.max_per_agent {
cfg.max_per_agent = n;
}
if let Some(d) = parsed.runs.max_age_days {
cfg.max_age_days = d;
}
cfg
}
}
#[derive(Debug, Clone)]
pub struct RunStore {
root: PathBuf,
retention: RetentionConfig,
}
impl RunStore {
pub fn new(runs_root: PathBuf, retention: RetentionConfig) -> Self {
Self {
root: runs_root,
retention,
}
}
pub fn from_journal_dir(journal_dir: &Path) -> Self {
let car_dir = journal_dir
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let root = car_dir.join("runs");
let retention = RetentionConfig::from_car_dir(&car_dir);
Self::new(root, retention)
}
pub fn root(&self) -> &Path {
&self.root
}
fn run_path(&self, agent_id: &str, run_id: &str) -> PathBuf {
self.root
.join(sanitize(agent_id))
.join(format!("{}.jsonl", sanitize(run_id)))
}
fn ensure_root(&self) -> std::io::Result<()> {
let created = !self.root.exists();
std::fs::create_dir_all(&self.root)?;
set_dir_perms(&self.root)?;
if created {
mark_backup_excluded(&self.root);
}
Ok(())
}
fn ensure_agent_dir(&self, agent_id: &str) -> std::io::Result<PathBuf> {
self.ensure_root()?;
let dir = self.root.join(sanitize(agent_id));
std::fs::create_dir_all(&dir)?;
set_dir_perms(&dir)?;
Ok(dir)
}
pub fn append_records(
&self,
agent_id: &str,
run_id: &str,
records: &[RunRecord],
) -> std::io::Result<()> {
if records.is_empty() {
return Ok(());
}
self.ensure_agent_dir(agent_id)?;
let path = self.run_path(agent_id, run_id);
let existed = path.exists();
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let mut buf: Vec<u8> = Vec::new();
if !existed {
set_file_perms(&path)?;
} else if last_byte_is_not_newline(&path)? {
buf.push(b'\n');
}
for rec in records {
let line = serde_json::to_string(rec)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
buf.extend_from_slice(line.as_bytes());
buf.push(b'\n');
}
file.write_all(&buf)?;
Ok(())
}
pub fn write_started(&self, started: &car_proto::RunStarted) -> std::io::Result<()> {
let rec = RunRecord::Started(started.clone());
self.append_records(&started.agent_id, &started.run_id, &[rec])
}
pub fn append_turns(
&self,
agent_id: &str,
run_id: &str,
turns: &[RunRecord],
) -> std::io::Result<()> {
self.append_records(agent_id, run_id, turns)
}
pub fn write_ended(&self, ended: &car_proto::RunEnded) -> std::io::Result<()> {
let rec = RunRecord::Ended(ended.clone());
self.append_records(&ended.agent_id, &ended.run_id, &[rec])
}
pub fn get_run_trace(&self, run_id: &str) -> Option<Vec<RunRecord>> {
let path = self.resolve_run_path(run_id)?;
Some(load_records(&path))
}
pub fn get_run_trace_for(&self, agent_id: &str, run_id: &str) -> Option<Vec<RunRecord>> {
let path = self.run_path(agent_id, run_id);
if path.exists() {
Some(load_records(&path))
} else {
None
}
}
pub fn list_runs(&self, agent_id: &str) -> Vec<RunSummary> {
let dir = self.root.join(sanitize(agent_id));
let mut out = Vec::new();
let Ok(entries) = std::fs::read_dir(&dir) else {
return out;
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
if let Some(summary) = summarize_file(&path) {
out.push(summary);
}
}
out.sort_by(|a, b| b.started_at.cmp(&a.started_at));
out
}
fn resolve_run_path(&self, run_id: &str) -> Option<PathBuf> {
let file_name = format!("{}.jsonl", sanitize(run_id));
let agent_dirs = std::fs::read_dir(&self.root).ok()?;
for agent in agent_dirs.flatten() {
let candidate = agent.path().join(&file_name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
pub fn agent_for_run(&self, run_id: &str) -> Option<String> {
let path = self.resolve_run_path(run_id)?;
path.parent()
.and_then(Path::file_name)
.and_then(|s| s.to_str())
.map(str::to_string)
}
pub fn gc(&self) -> usize {
let mut removed = 0;
let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
return 0;
};
let cutoff = Utc::now() - chrono::Duration::days(self.retention.max_age_days);
for agent in agent_dirs.flatten() {
let agent_path = agent.path();
if !agent_path.is_dir() {
continue;
}
removed += self.gc_agent_dir(&agent_path, cutoff);
}
removed
}
pub fn adopt_orphans(&self) -> usize {
let mut adopted = 0;
let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
return 0;
};
let now = Utc::now();
for agent in agent_dirs.flatten() {
let agent_path = agent.path();
if !agent_path.is_dir() {
continue;
}
let Ok(entries) = std::fs::read_dir(&agent_path) else {
continue;
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
let Some(summary) = summarize_file(&path) else {
continue;
};
if summary.status != RunStatus::InProgress {
continue;
}
let incomplete = RunRecord::Ended(car_proto::RunEnded {
run_id: summary.run_id.clone(),
agent_id: summary.agent_id.clone(),
termination: RunTermination::Incomplete,
ended_at: now,
});
if self
.append_records(&summary.agent_id, &summary.run_id, &[incomplete])
.is_ok()
{
adopted += 1;
}
}
}
adopted
}
fn gc_agent_dir(&self, agent_path: &Path, age_cutoff: DateTime<Utc>) -> usize {
let mut runs: Vec<(PathBuf, RunSummary)> = Vec::new();
let Ok(entries) = std::fs::read_dir(agent_path) else {
return 0;
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
if let Some(s) = summarize_file(&path) {
runs.push((path, s));
}
}
runs.sort_by(|a, b| b.1.started_at.cmp(&a.1.started_at));
let mut removed = 0;
let mut completed_rank = 0usize;
for (path, summary) in runs.iter() {
if summary.status == RunStatus::InProgress {
continue;
}
let over_count = completed_rank >= self.retention.max_per_agent;
completed_rank += 1;
let term_time = summary.ended_at.unwrap_or(summary.started_at);
let too_old = term_time < age_cutoff;
if over_count || too_old {
if std::fs::remove_file(path).is_ok() {
removed += 1;
}
}
}
removed
}
}
fn last_byte_is_not_newline(path: &Path) -> std::io::Result<bool> {
use std::io::{Read, Seek, SeekFrom};
let mut f = std::fs::File::open(path)?;
let len = f.seek(SeekFrom::End(0))?;
if len == 0 {
return Ok(false);
}
f.seek(SeekFrom::End(-1))?;
let mut buf = [0u8; 1];
f.read_exact(&mut buf)?;
Ok(buf[0] != b'\n')
}
fn load_records(path: &Path) -> Vec<RunRecord> {
let Ok(file) = std::fs::File::open(path) else {
return Vec::new();
};
let reader = std::io::BufReader::new(file);
let mut out = Vec::new();
for line in reader.lines() {
let Ok(line) = line else { break };
if line.trim().is_empty() {
continue;
}
if let Ok(rec) = serde_json::from_str::<RunRecord>(&line) {
out.push(rec);
}
}
out
}
fn summarize_file(path: &Path) -> Option<RunSummary> {
let records = load_records(path);
let mut started: Option<car_proto::RunStarted> = None;
let mut ended: Option<car_proto::RunEnded> = None;
let mut turn_count = 0usize;
for rec in &records {
match rec {
RunRecord::Started(s) => started = Some(s.clone()),
RunRecord::Ended(e) => ended = Some(e.clone()),
RunRecord::Turn(_) => turn_count += 1,
}
}
let started = started?;
let (status, ended_at) = match &ended {
Some(e) => {
let status = match &e.termination {
RunTermination::Outcome { .. } => RunStatus::Completed,
RunTermination::Incomplete => RunStatus::Incomplete,
};
(status, Some(e.ended_at))
}
None => (RunStatus::InProgress, None),
};
Some(RunSummary {
run_id: started.run_id,
agent_id: started.agent_id,
intent: started.intent,
started_at: started.started_at,
ended_at,
status,
turn_count,
})
}
fn sanitize(id: &str) -> String {
let cleaned: String = id
.chars()
.map(|c| match c {
'/' | '\\' | '\0' => '_',
c => c,
})
.collect();
let trimmed = cleaned.trim_matches('.');
if trimmed.is_empty() {
"_".to_string()
} else {
trimmed.to_string()
}
}
#[cfg(unix)]
fn set_dir_perms(path: &Path) -> std::io::Result<()> {
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))
}
#[cfg(not(unix))]
fn set_dir_perms(_path: &Path) -> std::io::Result<()> {
Ok(())
}
#[cfg(unix)]
fn set_file_perms(path: &Path) -> std::io::Result<()> {
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
}
#[cfg(not(unix))]
fn set_file_perms(_path: &Path) -> std::io::Result<()> {
Ok(())
}
fn mark_backup_excluded(dir: &Path) {
let _ = std::fs::write(
dir.join(".nobackup"),
b"car run traces - excluded from backup\n",
);
#[cfg(target_os = "macos")]
set_macos_backup_excluded(dir);
}
#[cfg(target_os = "macos")]
fn set_macos_backup_excluded(dir: &Path) {
let _ = std::process::Command::new("xattr")
.args(["-w", "com.apple.metadata:com_apple_backup_excludeItem", "1"])
.arg(dir)
.output();
}
#[cfg(test)]
mod tests {
use super::*;
use car_ir::{AgentOutcome, OutcomeMetrics, OutcomeStatus};
use car_proto::{RunEnded, RunStarted, RunTurn, VerifierVerdict};
use serde_json::json;
fn store(root: PathBuf) -> RunStore {
RunStore::new(root, RetentionConfig::default())
}
fn started(run_id: &str, agent_id: &str, when: DateTime<Utc>) -> RunStarted {
RunStarted {
run_id: run_id.to_string(),
agent_id: agent_id.to_string(),
intent: "do the thing".to_string(),
outcome_description: None,
started_at: when,
}
}
fn turn(index: usize, prompt: &str) -> RunRecord {
RunRecord::Turn(RunTurn {
index,
prompt: Some(prompt.to_string()),
tool: Some("drive_cli".to_string()),
parameters: json!({ "prompt": prompt }),
output: Some(json!({ "exit_code": 0 })),
cli_outcome: None,
verifier_verdict: VerifierVerdict::NotRun,
policy_rejected: None,
})
}
fn ended(run_id: &str, agent_id: &str, status: OutcomeStatus) -> RunRecord {
let outcome = AgentOutcome {
status,
summary: "done".to_string(),
evidence: vec![],
metrics: OutcomeMetrics::default(),
timestamp: Utc::now(),
};
RunRecord::Ended(RunEnded {
run_id: run_id.to_string(),
agent_id: agent_id.to_string(),
termination: RunTermination::Outcome { status, outcome },
ended_at: Utc::now(),
})
}
#[test]
fn completed_run_readable_after_restart() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s1 = store(root.clone());
s1.write_started(&started("run-1", "agent-a", Utc::now()))
.unwrap();
s1.append_turns("agent-a", "run-1", &[turn(0, "first")])
.unwrap();
s1.append_records(
"agent-a",
"run-1",
&[ended("run-1", "agent-a", OutcomeStatus::Success)],
)
.unwrap();
let s2 = store(root);
let trace = s2
.get_run_trace("run-1")
.expect("trace readable after restart");
assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
assert!(matches!(trace.last(), Some(RunRecord::Ended(_))));
let turns = trace
.iter()
.filter(|r| matches!(r, RunRecord::Turn(_)))
.count();
assert_eq!(turns, 1);
}
#[test]
fn runs_isolated_per_agent_and_run() {
let tmp = tempfile::TempDir::new().unwrap();
let s = store(tmp.path().join("runs"));
s.write_started(&started("run-1", "agent-a", Utc::now()))
.unwrap();
s.append_turns("agent-a", "run-1", &[turn(0, "a-first")])
.unwrap();
s.write_started(&started("run-2", "agent-a", Utc::now()))
.unwrap();
s.append_turns("agent-a", "run-2", &[turn(0, "a-second")])
.unwrap();
s.write_started(&started("run-3", "agent-b", Utc::now()))
.unwrap();
s.append_turns("agent-b", "run-3", &[turn(0, "b-first")])
.unwrap();
let t1 = s.get_run_trace("run-1").unwrap();
let t2 = s.get_run_trace("run-2").unwrap();
let t3 = s.get_run_trace("run-3").unwrap();
assert_eq!(turn_prompt(&t1), "a-first");
assert_eq!(turn_prompt(&t2), "a-second");
assert_eq!(turn_prompt(&t3), "b-first");
assert_eq!(s.agent_for_run("run-1").as_deref(), Some("agent-a"));
assert_eq!(s.agent_for_run("run-3").as_deref(), Some("agent-b"));
assert_eq!(s.list_runs("agent-a").len(), 2);
assert_eq!(s.list_runs("agent-b").len(), 1);
}
fn turn_prompt(trace: &[RunRecord]) -> String {
trace
.iter()
.find_map(|r| match r {
RunRecord::Turn(t) => t.prompt.clone(),
_ => None,
})
.unwrap_or_default()
}
#[cfg(unix)]
#[test]
fn perms_are_0600_files_0700_dirs() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s = store(root.clone());
s.write_started(&started("run-1", "agent-a", Utc::now()))
.unwrap();
let file = root.join("agent-a").join("run-1.jsonl");
let fmode = std::fs::metadata(&file).unwrap().permissions().mode() & 0o777;
assert_eq!(fmode, 0o600, "run file must be 0600, got {:o}", fmode);
let root_mode = std::fs::metadata(&root).unwrap().permissions().mode() & 0o777;
assert_eq!(
root_mode, 0o700,
"runs/ dir must be 0700, got {:o}",
root_mode
);
let agent_mode = std::fs::metadata(root.join("agent-a"))
.unwrap()
.permissions()
.mode()
& 0o777;
assert_eq!(
agent_mode, 0o700,
"agent dir must be 0700, got {:o}",
agent_mode
);
assert!(root.join(".nobackup").exists(), ".nobackup marker written");
}
#[test]
fn orphan_run_status_distinguishes_inprogress_from_incomplete() {
let tmp = tempfile::TempDir::new().unwrap();
let s = store(tmp.path().join("runs"));
let stale = Utc::now() - chrono::Duration::hours(6);
s.write_started(&started("run-1", "agent-a", stale)).unwrap();
s.append_turns("agent-a", "run-1", &[turn(0, "first")])
.unwrap();
let open = &s.list_runs("agent-a")[0];
assert_eq!(open.status, RunStatus::InProgress);
let incomplete = RunRecord::Ended(RunEnded {
run_id: "run-1".to_string(),
agent_id: "agent-a".to_string(),
termination: RunTermination::Incomplete,
ended_at: Utc::now(),
});
s.append_records("agent-a", "run-1", &[incomplete]).unwrap();
let closed = &s.list_runs("agent-a")[0];
assert_eq!(closed.status, RunStatus::Incomplete);
}
#[test]
fn gc_evicts_beyond_per_agent_cap_but_never_in_progress() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s = RunStore::new(
root,
RetentionConfig {
max_per_agent: 3,
max_age_days: 30,
},
);
let base = Utc::now() - chrono::Duration::days(1);
for i in 0..5 {
let id = format!("c{i}");
let when = base + chrono::Duration::minutes(i);
s.write_started(&started(&id, "agent-a", when)).unwrap();
s.append_records(
"agent-a",
&id,
&[ended(&id, "agent-a", OutcomeStatus::Success)],
)
.unwrap();
}
s.write_started(&started("live", "agent-a", Utc::now()))
.unwrap();
let removed = s.gc();
assert_eq!(removed, 2, "should evict the 2 oldest completed runs");
let remaining = s.list_runs("agent-a");
assert_eq!(remaining.len(), 4);
assert!(
remaining.iter().any(|r| r.run_id == "live"),
"in-progress run must never be evicted"
);
assert!(!remaining.iter().any(|r| r.run_id == "c0"));
assert!(!remaining.iter().any(|r| r.run_id == "c1"));
}
#[test]
fn gc_evicts_runs_older_than_age_cap() {
let tmp = tempfile::TempDir::new().unwrap();
let s = RunStore::new(
tmp.path().join("runs"),
RetentionConfig {
max_per_agent: 50,
max_age_days: 30,
},
);
let old = Utc::now() - chrono::Duration::days(40);
s.write_started(&started("old", "agent-a", old)).unwrap();
s.append_records(
"agent-a",
"old",
&[ended_at("old", "agent-a", OutcomeStatus::Success, old)],
)
.unwrap();
s.write_started(&started("fresh", "agent-a", Utc::now()))
.unwrap();
s.append_records(
"agent-a",
"fresh",
&[ended("fresh", "agent-a", OutcomeStatus::Success)],
)
.unwrap();
let removed = s.gc();
assert_eq!(removed, 1, "the 40-day-old run should be evicted");
let remaining = s.list_runs("agent-a");
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].run_id, "fresh");
}
#[test]
fn gc_never_evicts_stale_in_progress_run() {
let tmp = tempfile::TempDir::new().unwrap();
let s = RunStore::new(
tmp.path().join("runs"),
RetentionConfig {
max_per_agent: 1,
max_age_days: 1,
},
);
let old = Utc::now() - chrono::Duration::days(40);
s.write_started(&started("stale-live", "agent-a", old))
.unwrap();
let removed = s.gc();
assert_eq!(removed, 0);
assert!(s
.list_runs("agent-a")
.iter()
.any(|r| r.run_id == "stale-live"));
}
#[test]
fn corrupt_trailing_line_loads_prior_records() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s = store(root.clone());
s.write_started(&started("run-1", "agent-a", Utc::now()))
.unwrap();
s.append_turns("agent-a", "run-1", &[turn(0, "first"), turn(1, "second")])
.unwrap();
let path = root.join("agent-a").join("run-1.jsonl");
let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
writeln!(f, "{{\"record\":\"turn\",\"index\":2,\"prom").unwrap();
let trace = s.get_run_trace("run-1").expect("trace still loads");
let turns = trace
.iter()
.filter(|r| matches!(r, RunRecord::Turn(_)))
.count();
assert_eq!(turns, 2, "prior valid turns load; corrupt line skipped");
assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
}
#[test]
fn list_runs_empty_for_unknown_agent() {
let tmp = tempfile::TempDir::new().unwrap();
let s = store(tmp.path().join("runs"));
assert!(s.list_runs("nobody").is_empty());
assert!(s.get_run_trace("nope").is_none());
assert!(s.agent_for_run("nope").is_none());
}
#[test]
fn from_journal_dir_roots_at_car_runs() {
let s = RunStore::from_journal_dir(Path::new("/home/u/.car/journals"));
assert_eq!(s.root(), Path::new("/home/u/.car/runs"));
}
#[test]
fn retention_config_reads_overrides() {
let tmp = tempfile::TempDir::new().unwrap();
std::fs::write(
tmp.path().join("config.toml"),
"[runs]\nmax_per_agent = 10\n",
)
.unwrap();
let cfg = RetentionConfig::from_car_dir(tmp.path());
assert_eq!(cfg.max_per_agent, 10);
assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
}
#[test]
fn retention_config_defaults_on_missing_file() {
let tmp = tempfile::TempDir::new().unwrap();
let cfg = RetentionConfig::from_car_dir(tmp.path());
assert_eq!(cfg.max_per_agent, DEFAULT_MAX_RUNS_PER_AGENT);
assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
}
fn ended_at(
run_id: &str,
agent_id: &str,
status: OutcomeStatus,
when: DateTime<Utc>,
) -> RunRecord {
let outcome = AgentOutcome {
status,
summary: "done".to_string(),
evidence: vec![],
metrics: OutcomeMetrics::default(),
timestamp: when,
};
RunRecord::Ended(RunEnded {
run_id: run_id.to_string(),
agent_id: agent_id.to_string(),
termination: RunTermination::Outcome { status, outcome },
ended_at: when,
})
}
#[test]
fn gc_age_cap_uses_terminal_time_not_start() {
let tmp = tempfile::TempDir::new().unwrap();
let s = RunStore::new(
tmp.path().join("runs"),
RetentionConfig {
max_per_agent: 50,
max_age_days: 30,
},
);
let started_40d = Utc::now() - chrono::Duration::days(40);
let ended_1d = Utc::now() - chrono::Duration::days(1);
s.write_started(&started("long", "agent-a", started_40d))
.unwrap();
s.append_records(
"agent-a",
"long",
&[ended_at("long", "agent-a", OutcomeStatus::Success, ended_1d)],
)
.unwrap();
let removed = s.gc();
assert_eq!(
removed, 0,
"a run completed 1 day ago must survive the 30-day age cap, \
even if it started 40 days ago"
);
let remaining = s.list_runs("agent-a");
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].run_id, "long");
}
#[test]
fn adopt_orphans_marks_crashed_inprogress_runs_incomplete() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s1 = store(root.clone());
s1.write_started(&started("orphan", "agent-a", Utc::now()))
.unwrap();
s1.append_turns("agent-a", "orphan", &[turn(0, "first")])
.unwrap();
assert_eq!(
s1.list_runs("agent-a")[0].status,
RunStatus::InProgress,
"precondition: orphan reads InProgress before adoption"
);
let s2 = store(root);
let adopted = s2.adopt_orphans();
assert_eq!(adopted, 1, "the crash orphan should be adopted");
let after = &s2.list_runs("agent-a")[0];
assert_eq!(
after.status,
RunStatus::Incomplete,
"adopted orphan now reads Incomplete (terminal)"
);
assert!(after.ended_at.is_some(), "terminal record has an ended_at");
assert_eq!(s2.adopt_orphans(), 0);
}
#[test]
fn adopt_orphans_leaves_completed_runs_alone() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s = store(root);
s.write_started(&started("done", "agent-a", Utc::now()))
.unwrap();
s.append_records(
"agent-a",
"done",
&[ended("done", "agent-a", OutcomeStatus::Success)],
)
.unwrap();
assert_eq!(s.adopt_orphans(), 0);
assert_eq!(s.list_runs("agent-a")[0].status, RunStatus::Completed);
}
#[test]
fn torn_tail_does_not_drop_following_valid_record() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().join("runs");
let s = store(root.clone());
s.write_started(&started("run-1", "agent-a", Utc::now()))
.unwrap();
s.append_turns("agent-a", "run-1", &[turn(0, "first")])
.unwrap();
let path = root.join("agent-a").join("run-1.jsonl");
{
let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(b"{\"record\":\"turn\",\"index\":1,\"prom").unwrap();
}
assert!(
last_byte_is_not_newline(&path).unwrap(),
"precondition: tail is torn (no trailing newline)"
);
s.append_turns("agent-a", "run-1", &[turn(2, "third")])
.unwrap();
let trace = s.get_run_trace("run-1").expect("trace loads");
let turn_prompts: Vec<String> = trace
.iter()
.filter_map(|r| match r {
RunRecord::Turn(t) => t.prompt.clone(),
_ => None,
})
.collect();
assert!(
turn_prompts.contains(&"third".to_string()),
"the valid record appended after a torn tail must survive, got {:?}",
turn_prompts
);
assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
}
}