1use car_proto::{RunRecord, RunTermination};
53use chrono::{DateTime, Utc};
54use serde::{Deserialize, Serialize};
55use std::io::{BufRead, Write};
56use std::path::{Path, PathBuf};
57
58pub const DEFAULT_MAX_RUNS_PER_AGENT: usize = 50;
60pub const DEFAULT_MAX_AGE_DAYS: i64 = 30;
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum RunStatus {
72 InProgress,
74 Completed,
77 Incomplete,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct RunSummary {
86 pub run_id: String,
87 pub agent_id: String,
88 pub intent: String,
89 pub started_at: DateTime<Utc>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub ended_at: Option<DateTime<Utc>>,
93 pub status: RunStatus,
94 pub turn_count: usize,
96}
97
98#[derive(Debug, Clone, Copy)]
101pub struct RetentionConfig {
102 pub max_per_agent: usize,
103 pub max_age_days: i64,
104}
105
106impl Default for RetentionConfig {
107 fn default() -> Self {
108 Self {
109 max_per_agent: DEFAULT_MAX_RUNS_PER_AGENT,
110 max_age_days: DEFAULT_MAX_AGE_DAYS,
111 }
112 }
113}
114
115#[derive(Debug, Clone, Default, Deserialize)]
118struct RunsConfigFile {
119 #[serde(default)]
120 runs: RunsSection,
121}
122
123#[derive(Debug, Clone, Default, Deserialize)]
124struct RunsSection {
125 #[serde(default)]
126 max_per_agent: Option<usize>,
127 #[serde(default)]
128 max_age_days: Option<i64>,
129}
130
131impl RetentionConfig {
132 pub fn from_car_dir(car_dir: &Path) -> Self {
137 let mut cfg = Self::default();
138 let path = car_dir.join("config.toml");
139 let Ok(text) = std::fs::read_to_string(&path) else {
140 return cfg;
141 };
142 let Ok(parsed) = toml::from_str::<RunsConfigFile>(&text) else {
143 return cfg;
144 };
145 if let Some(n) = parsed.runs.max_per_agent {
146 cfg.max_per_agent = n;
147 }
148 if let Some(d) = parsed.runs.max_age_days {
149 cfg.max_age_days = d;
150 }
151 cfg
152 }
153}
154
155#[derive(Debug, Clone)]
162pub struct RunStore {
163 root: PathBuf,
165 retention: RetentionConfig,
166}
167
168impl RunStore {
169 pub fn new(runs_root: PathBuf, retention: RetentionConfig) -> Self {
174 Self {
175 root: runs_root,
176 retention,
177 }
178 }
179
180 pub fn from_journal_dir(journal_dir: &Path) -> Self {
186 let car_dir = journal_dir
187 .parent()
188 .map(Path::to_path_buf)
189 .unwrap_or_else(|| PathBuf::from("."));
190 let root = car_dir.join("runs");
191 let retention = RetentionConfig::from_car_dir(&car_dir);
192 Self::new(root, retention)
193 }
194
195 pub fn root(&self) -> &Path {
197 &self.root
198 }
199
200 fn run_path(&self, agent_id: &str, run_id: &str) -> PathBuf {
202 self.root
203 .join(sanitize(agent_id))
204 .join(format!("{}.jsonl", sanitize(run_id)))
205 }
206
207 fn ensure_root(&self) -> std::io::Result<()> {
212 let created = !self.root.exists();
213 std::fs::create_dir_all(&self.root)?;
214 set_dir_perms(&self.root)?;
215 if created {
216 mark_backup_excluded(&self.root);
217 }
218 Ok(())
219 }
220
221 fn ensure_agent_dir(&self, agent_id: &str) -> std::io::Result<PathBuf> {
223 self.ensure_root()?;
224 let dir = self.root.join(sanitize(agent_id));
225 std::fs::create_dir_all(&dir)?;
226 set_dir_perms(&dir)?;
227 Ok(dir)
228 }
229
230 pub fn append_records(
238 &self,
239 agent_id: &str,
240 run_id: &str,
241 records: &[RunRecord],
242 ) -> std::io::Result<()> {
243 if records.is_empty() {
244 return Ok(());
245 }
246 self.ensure_agent_dir(agent_id)?;
247 let path = self.run_path(agent_id, run_id);
248 let existed = path.exists();
249 let mut file = std::fs::OpenOptions::new()
250 .create(true)
251 .append(true)
252 .open(&path)?;
253 let mut buf: Vec<u8> = Vec::new();
260 if !existed {
261 set_file_perms(&path)?;
264 } else if last_byte_is_not_newline(&path)? {
265 buf.push(b'\n');
273 }
274 for rec in records {
275 let line = serde_json::to_string(rec)
276 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
277 buf.extend_from_slice(line.as_bytes());
278 buf.push(b'\n');
279 }
280 file.write_all(&buf)?;
281 Ok(())
282 }
283
284 pub fn write_started(&self, started: &car_proto::RunStarted) -> std::io::Result<()> {
286 let rec = RunRecord::Started(started.clone());
287 self.append_records(&started.agent_id, &started.run_id, &[rec])
288 }
289
290 pub fn append_turns(
292 &self,
293 agent_id: &str,
294 run_id: &str,
295 turns: &[RunRecord],
296 ) -> std::io::Result<()> {
297 self.append_records(agent_id, run_id, turns)
298 }
299
300 pub fn write_ended(&self, ended: &car_proto::RunEnded) -> std::io::Result<()> {
303 let rec = RunRecord::Ended(ended.clone());
304 self.append_records(&ended.agent_id, &ended.run_id, &[rec])
305 }
306
307 pub fn get_run_trace(&self, run_id: &str) -> Option<Vec<RunRecord>> {
314 let path = self.resolve_run_path(run_id)?;
315 Some(load_records(&path))
316 }
317
318 pub fn get_run_trace_for(&self, agent_id: &str, run_id: &str) -> Option<Vec<RunRecord>> {
322 let path = self.run_path(agent_id, run_id);
323 if path.exists() {
324 Some(load_records(&path))
325 } else {
326 None
327 }
328 }
329
330 pub fn list_runs(&self, agent_id: &str) -> Vec<RunSummary> {
335 let dir = self.root.join(sanitize(agent_id));
336 let mut out = Vec::new();
337 let Ok(entries) = std::fs::read_dir(&dir) else {
338 return out;
339 };
340 for entry in entries.flatten() {
341 let path = entry.path();
342 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
343 continue;
344 }
345 if let Some(summary) = summarize_file(&path) {
346 out.push(summary);
347 }
348 }
349 out.sort_by(|a, b| b.started_at.cmp(&a.started_at));
351 out
352 }
353
354 fn resolve_run_path(&self, run_id: &str) -> Option<PathBuf> {
358 let file_name = format!("{}.jsonl", sanitize(run_id));
359 let agent_dirs = std::fs::read_dir(&self.root).ok()?;
360 for agent in agent_dirs.flatten() {
361 let candidate = agent.path().join(&file_name);
362 if candidate.is_file() {
363 return Some(candidate);
364 }
365 }
366 None
367 }
368
369 pub fn agent_for_run(&self, run_id: &str) -> Option<String> {
374 let path = self.resolve_run_path(run_id)?;
375 path.parent()
376 .and_then(Path::file_name)
377 .and_then(|s| s.to_str())
378 .map(str::to_string)
379 }
380
381 pub fn gc(&self) -> usize {
387 let mut removed = 0;
388 let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
389 return 0;
390 };
391 let cutoff = Utc::now() - chrono::Duration::days(self.retention.max_age_days);
392 for agent in agent_dirs.flatten() {
393 let agent_path = agent.path();
394 if !agent_path.is_dir() {
395 continue;
396 }
397 removed += self.gc_agent_dir(&agent_path, cutoff);
398 }
399 removed
400 }
401
402 pub fn adopt_orphans(&self) -> usize {
418 let mut adopted = 0;
419 let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
420 return 0;
421 };
422 let now = Utc::now();
423 for agent in agent_dirs.flatten() {
424 let agent_path = agent.path();
425 if !agent_path.is_dir() {
426 continue;
427 }
428 let Ok(entries) = std::fs::read_dir(&agent_path) else {
429 continue;
430 };
431 for entry in entries.flatten() {
432 let path = entry.path();
433 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
434 continue;
435 }
436 let Some(summary) = summarize_file(&path) else {
437 continue;
438 };
439 if summary.status != RunStatus::InProgress {
440 continue;
441 }
442 let incomplete = RunRecord::Ended(car_proto::RunEnded {
444 run_id: summary.run_id.clone(),
445 agent_id: summary.agent_id.clone(),
446 termination: RunTermination::Incomplete,
447 ended_at: now,
448 });
449 if self
450 .append_records(&summary.agent_id, &summary.run_id, &[incomplete])
451 .is_ok()
452 {
453 adopted += 1;
454 }
455 }
456 }
457 adopted
458 }
459
460 fn gc_agent_dir(&self, agent_path: &Path, age_cutoff: DateTime<Utc>) -> usize {
462 let mut runs: Vec<(PathBuf, RunSummary)> = Vec::new();
466 let Ok(entries) = std::fs::read_dir(agent_path) else {
467 return 0;
468 };
469 for entry in entries.flatten() {
470 let path = entry.path();
471 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
472 continue;
473 }
474 if let Some(s) = summarize_file(&path) {
475 runs.push((path, s));
476 }
477 }
478 runs.sort_by(|a, b| b.1.started_at.cmp(&a.1.started_at));
480
481 let mut removed = 0;
482 let mut completed_rank = 0usize;
486 for (path, summary) in runs.iter() {
487 if summary.status == RunStatus::InProgress {
490 continue;
491 }
492 let over_count = completed_rank >= self.retention.max_per_agent;
493 completed_rank += 1;
494 let term_time = summary.ended_at.unwrap_or(summary.started_at);
500 let too_old = term_time < age_cutoff;
501 if over_count || too_old {
502 if std::fs::remove_file(path).is_ok() {
503 removed += 1;
504 }
505 }
506 }
507 removed
508 }
509}
510
511fn last_byte_is_not_newline(path: &Path) -> std::io::Result<bool> {
517 use std::io::{Read, Seek, SeekFrom};
518 let mut f = std::fs::File::open(path)?;
519 let len = f.seek(SeekFrom::End(0))?;
520 if len == 0 {
521 return Ok(false);
522 }
523 f.seek(SeekFrom::End(-1))?;
524 let mut buf = [0u8; 1];
525 f.read_exact(&mut buf)?;
526 Ok(buf[0] != b'\n')
527}
528
529fn load_records(path: &Path) -> Vec<RunRecord> {
533 let Ok(file) = std::fs::File::open(path) else {
534 return Vec::new();
535 };
536 let reader = std::io::BufReader::new(file);
537 let mut out = Vec::new();
538 for line in reader.lines() {
539 let Ok(line) = line else { break };
540 if line.trim().is_empty() {
541 continue;
542 }
543 if let Ok(rec) = serde_json::from_str::<RunRecord>(&line) {
544 out.push(rec);
545 }
546 }
549 out
550}
551
552fn summarize_file(path: &Path) -> Option<RunSummary> {
557 let records = load_records(path);
558 let mut started: Option<car_proto::RunStarted> = None;
559 let mut ended: Option<car_proto::RunEnded> = None;
560 let mut turn_count = 0usize;
561 for rec in &records {
562 match rec {
563 RunRecord::Started(s) => started = Some(s.clone()),
564 RunRecord::Ended(e) => ended = Some(e.clone()),
565 RunRecord::Turn(_) => turn_count += 1,
566 }
567 }
568 let started = started?;
569 let (status, ended_at) = match &ended {
570 Some(e) => {
571 let status = match &e.termination {
572 RunTermination::Outcome { .. } => RunStatus::Completed,
573 RunTermination::Incomplete => RunStatus::Incomplete,
574 };
575 (status, Some(e.ended_at))
576 }
577 None => (RunStatus::InProgress, None),
578 };
579 Some(RunSummary {
580 run_id: started.run_id,
581 agent_id: started.agent_id,
582 intent: started.intent,
583 started_at: started.started_at,
584 ended_at,
585 status,
586 turn_count,
587 })
588}
589
590fn sanitize(id: &str) -> String {
594 let cleaned: String = id
595 .chars()
596 .map(|c| match c {
597 '/' | '\\' | '\0' => '_',
598 c => c,
599 })
600 .collect();
601 let trimmed = cleaned.trim_matches('.');
602 if trimmed.is_empty() {
603 "_".to_string()
604 } else {
605 trimmed.to_string()
606 }
607}
608
609#[cfg(unix)]
611fn set_dir_perms(path: &Path) -> std::io::Result<()> {
612 use std::os::unix::fs::PermissionsExt;
613 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))
614}
615
616#[cfg(not(unix))]
617fn set_dir_perms(_path: &Path) -> std::io::Result<()> {
618 Ok(())
619}
620
621#[cfg(unix)]
623fn set_file_perms(path: &Path) -> std::io::Result<()> {
624 use std::os::unix::fs::PermissionsExt;
625 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
626}
627
628#[cfg(not(unix))]
629fn set_file_perms(_path: &Path) -> std::io::Result<()> {
630 Ok(())
631}
632
633fn mark_backup_excluded(dir: &Path) {
639 let _ = std::fs::write(
642 dir.join(".nobackup"),
643 b"car run traces - excluded from backup\n",
644 );
645 #[cfg(target_os = "macos")]
646 set_macos_backup_excluded(dir);
647}
648
649#[cfg(target_os = "macos")]
654fn set_macos_backup_excluded(dir: &Path) {
655 let _ = std::process::Command::new("xattr")
658 .args(["-w", "com.apple.metadata:com_apple_backup_excludeItem", "1"])
659 .arg(dir)
660 .output();
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use car_ir::{AgentOutcome, OutcomeMetrics, OutcomeStatus};
667 use car_proto::{RunEnded, RunStarted, RunTurn, VerifierVerdict};
668 use serde_json::json;
669
670 fn store(root: PathBuf) -> RunStore {
671 RunStore::new(root, RetentionConfig::default())
672 }
673
674 fn started(run_id: &str, agent_id: &str, when: DateTime<Utc>) -> RunStarted {
675 RunStarted {
676 run_id: run_id.to_string(),
677 agent_id: agent_id.to_string(),
678 intent: "do the thing".to_string(),
679 outcome_description: None,
680 started_at: when,
681 }
682 }
683
684 fn turn(index: usize, prompt: &str) -> RunRecord {
685 RunRecord::Turn(RunTurn {
686 index,
687 prompt: Some(prompt.to_string()),
688 tool: Some("drive_cli".to_string()),
689 parameters: json!({ "prompt": prompt }),
690 output: Some(json!({ "exit_code": 0 })),
691 cli_outcome: None,
692 verifier_verdict: VerifierVerdict::NotRun,
693 policy_rejected: None,
694 })
695 }
696
697 fn ended(run_id: &str, agent_id: &str, status: OutcomeStatus) -> RunRecord {
698 let outcome = AgentOutcome {
699 status,
700 summary: "done".to_string(),
701 evidence: vec![],
702 metrics: OutcomeMetrics::default(),
703 timestamp: Utc::now(),
704 };
705 RunRecord::Ended(RunEnded {
706 run_id: run_id.to_string(),
707 agent_id: agent_id.to_string(),
708 termination: RunTermination::Outcome { status, outcome },
709 ended_at: Utc::now(),
710 })
711 }
712
713 #[test]
717 fn completed_run_readable_after_restart() {
718 let tmp = tempfile::TempDir::new().unwrap();
719 let root = tmp.path().join("runs");
720 let s1 = store(root.clone());
721 s1.write_started(&started("run-1", "agent-a", Utc::now()))
722 .unwrap();
723 s1.append_turns("agent-a", "run-1", &[turn(0, "first")])
724 .unwrap();
725 s1.append_records(
726 "agent-a",
727 "run-1",
728 &[ended("run-1", "agent-a", OutcomeStatus::Success)],
729 )
730 .unwrap();
731
732 let s2 = store(root);
734 let trace = s2
735 .get_run_trace("run-1")
736 .expect("trace readable after restart");
737 assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
738 assert!(matches!(trace.last(), Some(RunRecord::Ended(_))));
739 let turns = trace
740 .iter()
741 .filter(|r| matches!(r, RunRecord::Turn(_)))
742 .count();
743 assert_eq!(turns, 1);
744 }
745
746 #[test]
749 fn runs_isolated_per_agent_and_run() {
750 let tmp = tempfile::TempDir::new().unwrap();
751 let s = store(tmp.path().join("runs"));
752 s.write_started(&started("run-1", "agent-a", Utc::now()))
753 .unwrap();
754 s.append_turns("agent-a", "run-1", &[turn(0, "a-first")])
755 .unwrap();
756 s.write_started(&started("run-2", "agent-a", Utc::now()))
757 .unwrap();
758 s.append_turns("agent-a", "run-2", &[turn(0, "a-second")])
759 .unwrap();
760 s.write_started(&started("run-3", "agent-b", Utc::now()))
761 .unwrap();
762 s.append_turns("agent-b", "run-3", &[turn(0, "b-first")])
763 .unwrap();
764
765 let t1 = s.get_run_trace("run-1").unwrap();
767 let t2 = s.get_run_trace("run-2").unwrap();
768 let t3 = s.get_run_trace("run-3").unwrap();
769 assert_eq!(turn_prompt(&t1), "a-first");
770 assert_eq!(turn_prompt(&t2), "a-second");
771 assert_eq!(turn_prompt(&t3), "b-first");
772 assert_eq!(s.agent_for_run("run-1").as_deref(), Some("agent-a"));
774 assert_eq!(s.agent_for_run("run-3").as_deref(), Some("agent-b"));
775 assert_eq!(s.list_runs("agent-a").len(), 2);
777 assert_eq!(s.list_runs("agent-b").len(), 1);
778 }
779
780 fn turn_prompt(trace: &[RunRecord]) -> String {
781 trace
782 .iter()
783 .find_map(|r| match r {
784 RunRecord::Turn(t) => t.prompt.clone(),
785 _ => None,
786 })
787 .unwrap_or_default()
788 }
789
790 #[cfg(unix)]
792 #[test]
793 fn perms_are_0600_files_0700_dirs() {
794 use std::os::unix::fs::PermissionsExt;
795 let tmp = tempfile::TempDir::new().unwrap();
796 let root = tmp.path().join("runs");
797 let s = store(root.clone());
798 s.write_started(&started("run-1", "agent-a", Utc::now()))
799 .unwrap();
800
801 let file = root.join("agent-a").join("run-1.jsonl");
802 let fmode = std::fs::metadata(&file).unwrap().permissions().mode() & 0o777;
803 assert_eq!(fmode, 0o600, "run file must be 0600, got {:o}", fmode);
804
805 let root_mode = std::fs::metadata(&root).unwrap().permissions().mode() & 0o777;
806 assert_eq!(
807 root_mode, 0o700,
808 "runs/ dir must be 0700, got {:o}",
809 root_mode
810 );
811 let agent_mode = std::fs::metadata(root.join("agent-a"))
812 .unwrap()
813 .permissions()
814 .mode()
815 & 0o777;
816 assert_eq!(
817 agent_mode, 0o700,
818 "agent dir must be 0700, got {:o}",
819 agent_mode
820 );
821
822 assert!(root.join(".nobackup").exists(), ".nobackup marker written");
824 }
825
826 #[test]
831 fn orphan_run_status_distinguishes_inprogress_from_incomplete() {
832 let tmp = tempfile::TempDir::new().unwrap();
833 let s = store(tmp.path().join("runs"));
834 let stale = Utc::now() - chrono::Duration::hours(6);
836 s.write_started(&started("run-1", "agent-a", stale)).unwrap();
837 s.append_turns("agent-a", "run-1", &[turn(0, "first")])
838 .unwrap();
839
840 let open = &s.list_runs("agent-a")[0];
842 assert_eq!(open.status, RunStatus::InProgress);
843
844 let incomplete = RunRecord::Ended(RunEnded {
846 run_id: "run-1".to_string(),
847 agent_id: "agent-a".to_string(),
848 termination: RunTermination::Incomplete,
849 ended_at: Utc::now(),
850 });
851 s.append_records("agent-a", "run-1", &[incomplete]).unwrap();
852 let closed = &s.list_runs("agent-a")[0];
853 assert_eq!(closed.status, RunStatus::Incomplete);
854 }
855
856 #[test]
859 fn gc_evicts_beyond_per_agent_cap_but_never_in_progress() {
860 let tmp = tempfile::TempDir::new().unwrap();
861 let root = tmp.path().join("runs");
862 let s = RunStore::new(
863 root,
864 RetentionConfig {
865 max_per_agent: 3,
866 max_age_days: 30,
867 },
868 );
869 let base = Utc::now() - chrono::Duration::days(1);
871 for i in 0..5 {
872 let id = format!("c{i}");
873 let when = base + chrono::Duration::minutes(i);
874 s.write_started(&started(&id, "agent-a", when)).unwrap();
875 s.append_records(
876 "agent-a",
877 &id,
878 &[ended(&id, "agent-a", OutcomeStatus::Success)],
879 )
880 .unwrap();
881 }
882 s.write_started(&started("live", "agent-a", Utc::now()))
884 .unwrap();
885
886 let removed = s.gc();
887 assert_eq!(removed, 2, "should evict the 2 oldest completed runs");
890 let remaining = s.list_runs("agent-a");
891 assert_eq!(remaining.len(), 4);
893 assert!(
894 remaining.iter().any(|r| r.run_id == "live"),
895 "in-progress run must never be evicted"
896 );
897 assert!(!remaining.iter().any(|r| r.run_id == "c0"));
899 assert!(!remaining.iter().any(|r| r.run_id == "c1"));
900 }
901
902 #[test]
904 fn gc_evicts_runs_older_than_age_cap() {
905 let tmp = tempfile::TempDir::new().unwrap();
906 let s = RunStore::new(
907 tmp.path().join("runs"),
908 RetentionConfig {
909 max_per_agent: 50,
910 max_age_days: 30,
911 },
912 );
913 let old = Utc::now() - chrono::Duration::days(40);
917 s.write_started(&started("old", "agent-a", old)).unwrap();
918 s.append_records(
919 "agent-a",
920 "old",
921 &[ended_at("old", "agent-a", OutcomeStatus::Success, old)],
922 )
923 .unwrap();
924 s.write_started(&started("fresh", "agent-a", Utc::now()))
925 .unwrap();
926 s.append_records(
927 "agent-a",
928 "fresh",
929 &[ended("fresh", "agent-a", OutcomeStatus::Success)],
930 )
931 .unwrap();
932
933 let removed = s.gc();
934 assert_eq!(removed, 1, "the 40-day-old run should be evicted");
935 let remaining = s.list_runs("agent-a");
936 assert_eq!(remaining.len(), 1);
937 assert_eq!(remaining[0].run_id, "fresh");
938 }
939
940 #[test]
943 fn gc_never_evicts_stale_in_progress_run() {
944 let tmp = tempfile::TempDir::new().unwrap();
945 let s = RunStore::new(
946 tmp.path().join("runs"),
947 RetentionConfig {
948 max_per_agent: 1,
949 max_age_days: 1,
950 },
951 );
952 let old = Utc::now() - chrono::Duration::days(40);
953 s.write_started(&started("stale-live", "agent-a", old))
955 .unwrap();
956 let removed = s.gc();
957 assert_eq!(removed, 0);
958 assert!(s
959 .list_runs("agent-a")
960 .iter()
961 .any(|r| r.run_id == "stale-live"));
962 }
963
964 #[test]
967 fn corrupt_trailing_line_loads_prior_records() {
968 let tmp = tempfile::TempDir::new().unwrap();
969 let root = tmp.path().join("runs");
970 let s = store(root.clone());
971 s.write_started(&started("run-1", "agent-a", Utc::now()))
972 .unwrap();
973 s.append_turns("agent-a", "run-1", &[turn(0, "first"), turn(1, "second")])
974 .unwrap();
975
976 let path = root.join("agent-a").join("run-1.jsonl");
979 let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
980 writeln!(f, "{{\"record\":\"turn\",\"index\":2,\"prom").unwrap();
981
982 let trace = s.get_run_trace("run-1").expect("trace still loads");
983 let turns = trace
985 .iter()
986 .filter(|r| matches!(r, RunRecord::Turn(_)))
987 .count();
988 assert_eq!(turns, 2, "prior valid turns load; corrupt line skipped");
989 assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
990 }
991
992 #[test]
994 fn list_runs_empty_for_unknown_agent() {
995 let tmp = tempfile::TempDir::new().unwrap();
996 let s = store(tmp.path().join("runs"));
997 assert!(s.list_runs("nobody").is_empty());
998 assert!(s.get_run_trace("nope").is_none());
999 assert!(s.agent_for_run("nope").is_none());
1000 }
1001
1002 #[test]
1005 fn from_journal_dir_roots_at_car_runs() {
1006 let s = RunStore::from_journal_dir(Path::new("/home/u/.car/journals"));
1007 assert_eq!(s.root(), Path::new("/home/u/.car/runs"));
1008 }
1009
1010 #[test]
1013 fn retention_config_reads_overrides() {
1014 let tmp = tempfile::TempDir::new().unwrap();
1015 std::fs::write(
1016 tmp.path().join("config.toml"),
1017 "[runs]\nmax_per_agent = 10\n",
1018 )
1019 .unwrap();
1020 let cfg = RetentionConfig::from_car_dir(tmp.path());
1021 assert_eq!(cfg.max_per_agent, 10);
1022 assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
1024 }
1025
1026 #[test]
1029 fn retention_config_defaults_on_missing_file() {
1030 let tmp = tempfile::TempDir::new().unwrap();
1031 let cfg = RetentionConfig::from_car_dir(tmp.path());
1032 assert_eq!(cfg.max_per_agent, DEFAULT_MAX_RUNS_PER_AGENT);
1033 assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
1034 }
1035
1036 fn ended_at(
1040 run_id: &str,
1041 agent_id: &str,
1042 status: OutcomeStatus,
1043 when: DateTime<Utc>,
1044 ) -> RunRecord {
1045 let outcome = AgentOutcome {
1046 status,
1047 summary: "done".to_string(),
1048 evidence: vec![],
1049 metrics: OutcomeMetrics::default(),
1050 timestamp: when,
1051 };
1052 RunRecord::Ended(RunEnded {
1053 run_id: run_id.to_string(),
1054 agent_id: agent_id.to_string(),
1055 termination: RunTermination::Outcome { status, outcome },
1056 ended_at: when,
1057 })
1058 }
1059
1060 #[test]
1064 fn gc_age_cap_uses_terminal_time_not_start() {
1065 let tmp = tempfile::TempDir::new().unwrap();
1066 let s = RunStore::new(
1067 tmp.path().join("runs"),
1068 RetentionConfig {
1069 max_per_agent: 50,
1070 max_age_days: 30,
1071 },
1072 );
1073 let started_40d = Utc::now() - chrono::Duration::days(40);
1075 let ended_1d = Utc::now() - chrono::Duration::days(1);
1076 s.write_started(&started("long", "agent-a", started_40d))
1077 .unwrap();
1078 s.append_records(
1079 "agent-a",
1080 "long",
1081 &[ended_at("long", "agent-a", OutcomeStatus::Success, ended_1d)],
1082 )
1083 .unwrap();
1084
1085 let removed = s.gc();
1086 assert_eq!(
1087 removed, 0,
1088 "a run completed 1 day ago must survive the 30-day age cap, \
1089 even if it started 40 days ago"
1090 );
1091 let remaining = s.list_runs("agent-a");
1092 assert_eq!(remaining.len(), 1);
1093 assert_eq!(remaining[0].run_id, "long");
1094 }
1095
1096 #[test]
1101 fn adopt_orphans_marks_crashed_inprogress_runs_incomplete() {
1102 let tmp = tempfile::TempDir::new().unwrap();
1103 let root = tmp.path().join("runs");
1104 let s1 = store(root.clone());
1106 s1.write_started(&started("orphan", "agent-a", Utc::now()))
1107 .unwrap();
1108 s1.append_turns("agent-a", "orphan", &[turn(0, "first")])
1109 .unwrap();
1110 assert_eq!(
1111 s1.list_runs("agent-a")[0].status,
1112 RunStatus::InProgress,
1113 "precondition: orphan reads InProgress before adoption"
1114 );
1115
1116 let s2 = store(root);
1118 let adopted = s2.adopt_orphans();
1119 assert_eq!(adopted, 1, "the crash orphan should be adopted");
1120
1121 let after = &s2.list_runs("agent-a")[0];
1122 assert_eq!(
1123 after.status,
1124 RunStatus::Incomplete,
1125 "adopted orphan now reads Incomplete (terminal)"
1126 );
1127 assert!(after.ended_at.is_some(), "terminal record has an ended_at");
1128
1129 assert_eq!(s2.adopt_orphans(), 0);
1131 }
1132
1133 #[test]
1136 fn adopt_orphans_leaves_completed_runs_alone() {
1137 let tmp = tempfile::TempDir::new().unwrap();
1138 let root = tmp.path().join("runs");
1139 let s = store(root);
1140 s.write_started(&started("done", "agent-a", Utc::now()))
1141 .unwrap();
1142 s.append_records(
1143 "agent-a",
1144 "done",
1145 &[ended("done", "agent-a", OutcomeStatus::Success)],
1146 )
1147 .unwrap();
1148 assert_eq!(s.adopt_orphans(), 0);
1149 assert_eq!(s.list_runs("agent-a")[0].status, RunStatus::Completed);
1150 }
1151
1152 #[test]
1157 fn torn_tail_does_not_drop_following_valid_record() {
1158 let tmp = tempfile::TempDir::new().unwrap();
1159 let root = tmp.path().join("runs");
1160 let s = store(root.clone());
1161 s.write_started(&started("run-1", "agent-a", Utc::now()))
1162 .unwrap();
1163 s.append_turns("agent-a", "run-1", &[turn(0, "first")])
1164 .unwrap();
1165
1166 let path = root.join("agent-a").join("run-1.jsonl");
1168 {
1169 let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
1170 f.write_all(b"{\"record\":\"turn\",\"index\":1,\"prom").unwrap();
1172 }
1173 assert!(
1174 last_byte_is_not_newline(&path).unwrap(),
1175 "precondition: tail is torn (no trailing newline)"
1176 );
1177
1178 s.append_turns("agent-a", "run-1", &[turn(2, "third")])
1182 .unwrap();
1183
1184 let trace = s.get_run_trace("run-1").expect("trace loads");
1185 let turn_prompts: Vec<String> = trace
1188 .iter()
1189 .filter_map(|r| match r {
1190 RunRecord::Turn(t) => t.prompt.clone(),
1191 _ => None,
1192 })
1193 .collect();
1194 assert!(
1195 turn_prompts.contains(&"third".to_string()),
1196 "the valid record appended after a torn tail must survive, got {:?}",
1197 turn_prompts
1198 );
1199 assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
1200 }
1201}