Skip to main content

car_server_core/
run_store.rs

1//! Disk-backed run-trace store (agent run tracing, U3).
2//!
3//! Persists the per-run trace stream — the `RunStarted` line, each
4//! `RunTurn`, then the terminal `RunEnded`/`Incomplete` — as JSONL so a
5//! run survives daemon restarts and stays grouped by run independent of
6//! the WS connection that produced it (R4). One file per run:
7//!
8//! ```text
9//! ~/.car/runs/{agent_id}/{run_id}.jsonl
10//! ```
11//!
12//! ## Source-of-truth split
13//!
14//! This disk store is the source of truth for **replay** (U5):
15//! [`RunStore::get_run_trace`] / [`RunStore::list_runs`] read it directly
16//! and work after a restart when memory is empty. The in-memory
17//! `RunMeta.turns` buffer (U2) stays the source for the **live** stream
18//! (U4) — U3 does not remove it; it mirrors what was recorded onto disk.
19//!
20//! ## Layout, perms, backup exclusion (R14)
21//!
22//! Prompts and CLI output may carry secrets, so the `runs/` tree is
23//! created `0700` and every file `0600` (Unix). The `runs/` dir is
24//! marked backup-excluded — a `.nobackup` marker file plus, on macOS,
25//! the `com.apple.metadata:com_apple_backup_excludeItem` xattr — so Time
26//! Machine / iCloud don't silently copy plaintext traces off the box.
27//!
28//! ## Index
29//!
30//! There is no separate index file to drift: the directory tree *is* the
31//! index. `list_runs(agent_id)` scans `runs/{agent_id}/`; `run_id ->
32//! agent_id` resolves by scanning `runs/*/` for the matching
33//! `{run_id}.jsonl` (U5's `runs.get_trace` takes only a `run_id`). Each
34//! file's first line is its `RunStarted`, so a run's `started_at` /
35//! `intent` / status come from reading the file head + tail, not a
36//! sidecar.
37//!
38//! ## Retention (R6)
39//!
40//! GC runs on daemon boot ([`RunStore::gc`]): per agent it keeps the **50
41//! most recent completed runs** and drops anything older than **30 days**,
42//! whichever is more restrictive. A still-in-progress run (no terminal
43//! record) is **never** evicted. Both limits are configurable via
44//! `~/.car/config.toml` (`[runs] max_per_agent` / `max_age_days`) with
45//! that restrictive default.
46//!
47//! Records are appended at turn granularity (not per token) — the same
48//! coarse boundary the in-memory buffer uses. A corrupt/partial trailing
49//! JSONL line loads the prior valid records rather than failing the whole
50//! run (the error-path test).
51
52use car_proto::{RunRecord, RunTermination};
53use chrono::{DateTime, Utc};
54use serde::{Deserialize, Serialize};
55use std::io::{BufRead, Write};
56use std::path::{Path, PathBuf};
57
58/// Default per-agent cap: keep the 50 most recent completed runs.
59pub const DEFAULT_MAX_RUNS_PER_AGENT: usize = 50;
60/// Default age cap: drop completed runs older than 30 days.
61pub const DEFAULT_MAX_AGE_DAYS: i64 = 30;
62
63/// Terminal/in-progress status of a run, derived from its records.
64///
65/// Distinct from the run-level `OutcomeStatus` carried inside a terminal
66/// `Outcome` — this is the coarse "what state is this run in?" the run
67/// list renders. `Incomplete` is the orphan case (no terminal record + no
68/// live harness, R5); `InProgress` is a run still being written.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum RunStatus {
72    /// No terminal record yet — still being written by a live harness.
73    InProgress,
74    /// `runs.complete` reported a terminal `AgentOutcome`. The concrete
75    /// `OutcomeStatus` lives on the `RunEnded` record itself.
76    Completed,
77    /// The harness disconnected without reporting an outcome (R5).
78    Incomplete,
79}
80
81/// One row of [`RunStore::list_runs`] — the summary U5's `runs.list`
82/// returns. Cheap to build (head + tail of the JSONL file), so listing an
83/// agent's runs doesn't load every turn.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct RunSummary {
86    pub run_id: String,
87    pub agent_id: String,
88    pub intent: String,
89    pub started_at: DateTime<Utc>,
90    /// When the terminal record was written, if any.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub ended_at: Option<DateTime<Utc>>,
93    pub status: RunStatus,
94    /// Number of `RunTurn` records persisted for this run.
95    pub turn_count: usize,
96}
97
98/// Retention policy for the run store (R6). Defaults are the restrictive
99/// 50-per-agent / 30-day caps; `~/.car/config.toml [runs]` overrides.
100#[derive(Debug, Clone, Copy)]
101pub struct RetentionConfig {
102    pub max_per_agent: usize,
103    pub max_age_days: i64,
104}
105
106impl Default for RetentionConfig {
107    fn default() -> Self {
108        Self {
109            max_per_agent: DEFAULT_MAX_RUNS_PER_AGENT,
110            max_age_days: DEFAULT_MAX_AGE_DAYS,
111        }
112    }
113}
114
115/// `[runs]` section of `~/.car/config.toml`. Both keys optional; an
116/// absent key keeps the restrictive default.
117#[derive(Debug, Clone, Default, Deserialize)]
118struct RunsConfigFile {
119    #[serde(default)]
120    runs: RunsSection,
121}
122
123#[derive(Debug, Clone, Default, Deserialize)]
124struct RunsSection {
125    #[serde(default)]
126    max_per_agent: Option<usize>,
127    #[serde(default)]
128    max_age_days: Option<i64>,
129}
130
131impl RetentionConfig {
132    /// Load the retention policy from `<car_dir>/config.toml`'s `[runs]`
133    /// section, falling back to the restrictive default for any missing
134    /// key or an unreadable/malformed file (config errors must never make
135    /// the daemon refuse to start — same posture as the rest of `.car`).
136    pub fn from_car_dir(car_dir: &Path) -> Self {
137        let mut cfg = Self::default();
138        let path = car_dir.join("config.toml");
139        let Ok(text) = std::fs::read_to_string(&path) else {
140            return cfg;
141        };
142        let Ok(parsed) = toml::from_str::<RunsConfigFile>(&text) else {
143            return cfg;
144        };
145        if let Some(n) = parsed.runs.max_per_agent {
146            cfg.max_per_agent = n;
147        }
148        if let Some(d) = parsed.runs.max_age_days {
149            cfg.max_age_days = d;
150        }
151        cfg
152    }
153}
154
155/// JSONL run-trace store rooted at `<car_dir>/runs/`.
156///
157/// Stateless across calls — each append opens, writes, and closes the
158/// run's file. Flush points are sparse (turn granularity), so there is no
159/// long-lived file handle to manage, and a concurrently-restarting daemon
160/// always sees a consistent on-disk tail.
161#[derive(Debug, Clone)]
162pub struct RunStore {
163    /// `<car_dir>/runs` — the tree root, created `0700`.
164    root: PathBuf,
165    retention: RetentionConfig,
166}
167
168impl RunStore {
169    /// Construct a store rooted at `runs_root` (the `runs/` dir itself).
170    /// Use [`RunStore::from_journal_dir`] from the daemon, which derives
171    /// the root from the configured journal dir; this constructor is the
172    /// test/embedder seam.
173    pub fn new(runs_root: PathBuf, retention: RetentionConfig) -> Self {
174        Self {
175            root: runs_root,
176            retention,
177        }
178    }
179
180    /// Derive the store from the daemon's journal dir. The journal lives
181    /// at `~/.car/journals`, so the run store is its sibling
182    /// `~/.car/runs`; retention is read from `~/.car/config.toml`. When
183    /// the journal dir has no parent (a bare relative path), the store
184    /// falls back to `journal_dir/../runs` resolved lexically.
185    pub fn from_journal_dir(journal_dir: &Path) -> Self {
186        let car_dir = journal_dir
187            .parent()
188            .map(Path::to_path_buf)
189            .unwrap_or_else(|| PathBuf::from("."));
190        let root = car_dir.join("runs");
191        let retention = RetentionConfig::from_car_dir(&car_dir);
192        Self::new(root, retention)
193    }
194
195    /// The `runs/` tree root.
196    pub fn root(&self) -> &Path {
197        &self.root
198    }
199
200    /// Path to a run's JSONL file: `runs/{agent_id}/{run_id}.jsonl`.
201    fn run_path(&self, agent_id: &str, run_id: &str) -> PathBuf {
202        self.root
203            .join(sanitize(agent_id))
204            .join(format!("{}.jsonl", sanitize(run_id)))
205    }
206
207    /// Ensure the `runs/` root exists with `0700` perms and is marked
208    /// backup-excluded. Idempotent. Called lazily on the first append so
209    /// constructing a `RunStore` is free (no disk touch until a run is
210    /// actually recorded).
211    fn ensure_root(&self) -> std::io::Result<()> {
212        let created = !self.root.exists();
213        std::fs::create_dir_all(&self.root)?;
214        set_dir_perms(&self.root)?;
215        if created {
216            mark_backup_excluded(&self.root);
217        }
218        Ok(())
219    }
220
221    /// Ensure an agent's run dir exists `0700`.
222    fn ensure_agent_dir(&self, agent_id: &str) -> std::io::Result<PathBuf> {
223        self.ensure_root()?;
224        let dir = self.root.join(sanitize(agent_id));
225        std::fs::create_dir_all(&dir)?;
226        set_dir_perms(&dir)?;
227        Ok(dir)
228    }
229
230    /// Append one or more `RunRecord`s to a run's file, creating it `0600`
231    /// on first write. Records are written one JSONL line each, in order.
232    ///
233    /// This is the single low-level flush primitive the wiring calls at
234    /// each boundary: `RunStarted` on `runs.start`, `RunTurn`s as the
235    /// recorder produces them, and the terminal `RunEnded`/`Incomplete` on
236    /// `runs.complete`/disconnect.
237    pub fn append_records(
238        &self,
239        agent_id: &str,
240        run_id: &str,
241        records: &[RunRecord],
242    ) -> std::io::Result<()> {
243        if records.is_empty() {
244            return Ok(());
245        }
246        self.ensure_agent_dir(agent_id)?;
247        let path = self.run_path(agent_id, run_id);
248        let existed = path.exists();
249        let mut file = std::fs::OpenOptions::new()
250            .create(true)
251            .append(true)
252            .open(&path)?;
253        // FIX 6/7: build the WHOLE batch into one buffer and write it with a
254        // single `write_all`. With `O_APPEND` a single write is positioned
255        // and appended atomically relative to other appenders, so two
256        // concurrent batches for the same file can't interleave mid-record
257        // (FIX 6, defense-in-depth). A leading '\n' repairs a torn tail
258        // (FIX 7) as part of the same atomic write.
259        let mut buf: Vec<u8> = Vec::new();
260        if !existed {
261            // New file: tighten to 0600 before any secret-bearing line
262            // lands. Done after create so the mode isn't masked by umask.
263            set_file_perms(&path)?;
264        } else if last_byte_is_not_newline(&path)? {
265            // Torn tail (FIX 7): a prior append was cut short (ENOSPC/EINTR
266            // mid-line, or a crash) leaving a record with no trailing '\n'.
267            // Appending here would concatenate our valid record onto the
268            // torn one, so `load_records` would drop OUR good record too,
269            // not just the trailing garbage. Prepend a '\n' so the torn
270            // fragment becomes its own (skippable) line and our record lands
271            // intact on the next line.
272            buf.push(b'\n');
273        }
274        for rec in records {
275            let line = serde_json::to_string(rec)
276                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
277            buf.extend_from_slice(line.as_bytes());
278            buf.push(b'\n');
279        }
280        file.write_all(&buf)?;
281        Ok(())
282    }
283
284    /// Append the `RunStarted` line + create the run file (`runs.start`).
285    pub fn write_started(&self, started: &car_proto::RunStarted) -> std::io::Result<()> {
286        let rec = RunRecord::Started(started.clone());
287        self.append_records(&started.agent_id, &started.run_id, &[rec])
288    }
289
290    /// Append `RunTurn` records (the recorder's per-proposal output).
291    pub fn append_turns(
292        &self,
293        agent_id: &str,
294        run_id: &str,
295        turns: &[RunRecord],
296    ) -> std::io::Result<()> {
297        self.append_records(agent_id, run_id, turns)
298    }
299
300    /// Append the terminal `RunEnded` line (`runs.complete` or the
301    /// disconnect-`Incomplete` path).
302    pub fn write_ended(&self, ended: &car_proto::RunEnded) -> std::io::Result<()> {
303        let rec = RunRecord::Ended(ended.clone());
304        self.append_records(&ended.agent_id, &ended.run_id, &[rec])
305    }
306
307    /// Load a run's full ordered trace from disk by `run_id` — the U5
308    /// `runs.get_trace` read path. Works after a restart when memory is
309    /// empty. Resolves `run_id -> agent_id` by scanning the tree, then
310    /// reads the JSONL. A corrupt/partial trailing line is skipped, so the
311    /// prior valid records still load (never fails the whole run).
312    /// Returns `None` when no file exists for the `run_id`.
313    pub fn get_run_trace(&self, run_id: &str) -> Option<Vec<RunRecord>> {
314        let path = self.resolve_run_path(run_id)?;
315        Some(load_records(&path))
316    }
317
318    /// Load a run's trace given both keys (cheaper — no tree scan). Used
319    /// by `list_runs` internally and available to callers that already
320    /// know the owning agent.
321    pub fn get_run_trace_for(&self, agent_id: &str, run_id: &str) -> Option<Vec<RunRecord>> {
322        let path = self.run_path(agent_id, run_id);
323        if path.exists() {
324            Some(load_records(&path))
325        } else {
326            None
327        }
328    }
329
330    /// List an agent's runs newest-first — the U5 `runs.list` read path.
331    /// Each summary is built from the run file's records (head for
332    /// `RunStarted`, tail for the terminal record, count of `Turn`s).
333    /// Returns an empty Vec for an agent with no runs (the empty-state).
334    pub fn list_runs(&self, agent_id: &str) -> Vec<RunSummary> {
335        let dir = self.root.join(sanitize(agent_id));
336        let mut out = Vec::new();
337        let Ok(entries) = std::fs::read_dir(&dir) else {
338            return out;
339        };
340        for entry in entries.flatten() {
341            let path = entry.path();
342            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
343                continue;
344            }
345            if let Some(summary) = summarize_file(&path) {
346                out.push(summary);
347            }
348        }
349        // Newest first by start time.
350        out.sort_by(|a, b| b.started_at.cmp(&a.started_at));
351        out
352    }
353
354    /// Resolve `run_id -> path` by scanning `runs/*/{run_id}.jsonl`. U5's
355    /// `runs.get_trace` takes only a `run_id`, so the owning agent must be
356    /// discovered. Returns the first match (run ids are uuids — unique).
357    fn resolve_run_path(&self, run_id: &str) -> Option<PathBuf> {
358        let file_name = format!("{}.jsonl", sanitize(run_id));
359        let agent_dirs = std::fs::read_dir(&self.root).ok()?;
360        for agent in agent_dirs.flatten() {
361            let candidate = agent.path().join(&file_name);
362            if candidate.is_file() {
363                return Some(candidate);
364            }
365        }
366        None
367    }
368
369    /// Resolve the owning `agent_id` for a `run_id` from disk. Mirrors
370    /// [`Self::resolve_run_path`] but returns the agent dir name — U5's
371    /// authorization check (KTD10) needs `run_id -> agent_id` to verify
372    /// ownership before serving a trace.
373    pub fn agent_for_run(&self, run_id: &str) -> Option<String> {
374        let path = self.resolve_run_path(run_id)?;
375        path.parent()
376            .and_then(Path::file_name)
377            .and_then(|s| s.to_str())
378            .map(str::to_string)
379    }
380
381    /// Retention GC (R6) — call on daemon boot. Per agent: keep the most
382    /// recent `max_per_agent` **completed** runs and drop any completed
383    /// run older than `max_age_days`, whichever is more restrictive. An
384    /// in-progress run (no terminal record) is NEVER evicted. Returns the
385    /// number of run files removed.
386    pub fn gc(&self) -> usize {
387        let mut removed = 0;
388        let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
389            return 0;
390        };
391        let cutoff = Utc::now() - chrono::Duration::days(self.retention.max_age_days);
392        for agent in agent_dirs.flatten() {
393            let agent_path = agent.path();
394            if !agent_path.is_dir() {
395                continue;
396            }
397            removed += self.gc_agent_dir(&agent_path, cutoff);
398        }
399        removed
400    }
401
402    /// Adopt crash-orphaned runs at boot (FIX 4). A daemon crash mid-run
403    /// leaves an on-disk run with `RunStarted` (+ `Turn`s) but no terminal
404    /// `RunEnded`, so it reads `InProgress` forever and the GC — which never
405    /// evicts an in-progress run — can never reclaim it. The file leaks
406    /// across every crash.
407    ///
408    /// This runs at store construction/boot, BEFORE `gc()`. At that moment
409    /// the in-memory `runs` map is always empty, so any run that is
410    /// `InProgress` on disk cannot have a live harness writing to it — it is
411    /// necessarily a crashed prior process. We append an `Incomplete`
412    /// terminal `RunEnded` marker to adopt it, making it terminal and thus
413    /// age-GC-eligible (so a later `gc()` in the same boot can reclaim it).
414    ///
415    /// Returns the number of runs adopted. Best-effort: an unwritable file is
416    /// skipped rather than failing startup.
417    pub fn adopt_orphans(&self) -> usize {
418        let mut adopted = 0;
419        let Ok(agent_dirs) = std::fs::read_dir(&self.root) else {
420            return 0;
421        };
422        let now = Utc::now();
423        for agent in agent_dirs.flatten() {
424            let agent_path = agent.path();
425            if !agent_path.is_dir() {
426                continue;
427            }
428            let Ok(entries) = std::fs::read_dir(&agent_path) else {
429                continue;
430            };
431            for entry in entries.flatten() {
432                let path = entry.path();
433                if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
434                    continue;
435                }
436                let Some(summary) = summarize_file(&path) else {
437                    continue;
438                };
439                if summary.status != RunStatus::InProgress {
440                    continue;
441                }
442                // No terminal record + memory empty at boot => crash orphan.
443                let incomplete = RunRecord::Ended(car_proto::RunEnded {
444                    run_id: summary.run_id.clone(),
445                    agent_id: summary.agent_id.clone(),
446                    termination: RunTermination::Incomplete,
447                    ended_at: now,
448                });
449                if self
450                    .append_records(&summary.agent_id, &summary.run_id, &[incomplete])
451                    .is_ok()
452                {
453                    adopted += 1;
454                }
455            }
456        }
457        adopted
458    }
459
460    /// GC one agent's run dir against the retention caps.
461    fn gc_agent_dir(&self, agent_path: &Path, age_cutoff: DateTime<Utc>) -> usize {
462        // Collect (path, summary) for every run file, ignoring unreadable
463        // ones (a malformed file with no RunStarted can't be summarized;
464        // leave it rather than risk evicting something we can't classify).
465        let mut runs: Vec<(PathBuf, RunSummary)> = Vec::new();
466        let Ok(entries) = std::fs::read_dir(agent_path) else {
467            return 0;
468        };
469        for entry in entries.flatten() {
470            let path = entry.path();
471            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
472                continue;
473            }
474            if let Some(s) = summarize_file(&path) {
475                runs.push((path, s));
476            }
477        }
478        // Newest-first so the most recent completed runs rank first.
479        runs.sort_by(|a, b| b.1.started_at.cmp(&a.1.started_at));
480
481        let mut removed = 0;
482        // Rank among COMPLETED/Incomplete runs only — an in-progress run
483        // must not consume a keeper slot, so the count cap is measured by
484        // completed-run rank, not the combined sorted index (R6).
485        let mut completed_rank = 0usize;
486        for (path, summary) in runs.iter() {
487            // NEVER evict an in-progress run — it has no terminal record
488            // and a live harness may still be writing to it (R6).
489            if summary.status == RunStatus::InProgress {
490                continue;
491            }
492            let over_count = completed_rank >= self.retention.max_per_agent;
493            completed_rank += 1;
494            // Age the run by its TERMINAL time, not its start. A long run
495            // that started >max_age_days ago but completed recently is still
496            // a recent result and must not be evicted (FIX 2). Completed and
497            // Incomplete runs always have an `ended_at`; fall back to
498            // `started_at` only if a terminal record somehow lacks one.
499            let term_time = summary.ended_at.unwrap_or(summary.started_at);
500            let too_old = term_time < age_cutoff;
501            if over_count || too_old {
502                if std::fs::remove_file(path).is_ok() {
503                    removed += 1;
504                }
505            }
506        }
507        removed
508    }
509}
510
511/// Return `true` when the file's last byte is not `\n` — i.e. the previous
512/// append was torn (cut short mid-line by ENOSPC/EINTR/crash) and a fresh
513/// append must insert a separating newline first (FIX 7). An empty or
514/// nonexistent file returns `false` (nothing to repair). Reads only the
515/// final byte via a seek, so it's cheap regardless of file size.
516fn last_byte_is_not_newline(path: &Path) -> std::io::Result<bool> {
517    use std::io::{Read, Seek, SeekFrom};
518    let mut f = std::fs::File::open(path)?;
519    let len = f.seek(SeekFrom::End(0))?;
520    if len == 0 {
521        return Ok(false);
522    }
523    f.seek(SeekFrom::End(-1))?;
524    let mut buf = [0u8; 1];
525    f.read_exact(&mut buf)?;
526    Ok(buf[0] != b'\n')
527}
528
529/// Load every parseable `RunRecord` from a JSONL file, in file order.
530/// Blank lines and unparseable lines (a corrupt/partial trailing line) are
531/// skipped — the prior valid records still load (error-path).
532fn load_records(path: &Path) -> Vec<RunRecord> {
533    let Ok(file) = std::fs::File::open(path) else {
534        return Vec::new();
535    };
536    let reader = std::io::BufReader::new(file);
537    let mut out = Vec::new();
538    for line in reader.lines() {
539        let Ok(line) = line else { break };
540        if line.trim().is_empty() {
541            continue;
542        }
543        if let Ok(rec) = serde_json::from_str::<RunRecord>(&line) {
544            out.push(rec);
545        }
546        // else: skip the bad line, keep going (handles a partial trailing
547        // write left by a crash mid-append).
548    }
549    out
550}
551
552/// Build a [`RunSummary`] from a run's JSONL file: `RunStarted` from the
553/// first valid record, terminal status from the last, and `turn_count`
554/// from the `Turn`s in between. Returns `None` when the file has no
555/// `RunStarted` (it can't be keyed/summarized).
556fn summarize_file(path: &Path) -> Option<RunSummary> {
557    let records = load_records(path);
558    let mut started: Option<car_proto::RunStarted> = None;
559    let mut ended: Option<car_proto::RunEnded> = None;
560    let mut turn_count = 0usize;
561    for rec in &records {
562        match rec {
563            RunRecord::Started(s) => started = Some(s.clone()),
564            RunRecord::Ended(e) => ended = Some(e.clone()),
565            RunRecord::Turn(_) => turn_count += 1,
566        }
567    }
568    let started = started?;
569    let (status, ended_at) = match &ended {
570        Some(e) => {
571            let status = match &e.termination {
572                RunTermination::Outcome { .. } => RunStatus::Completed,
573                RunTermination::Incomplete => RunStatus::Incomplete,
574            };
575            (status, Some(e.ended_at))
576        }
577        None => (RunStatus::InProgress, None),
578    };
579    Some(RunSummary {
580        run_id: started.run_id,
581        agent_id: started.agent_id,
582        intent: started.intent,
583        started_at: started.started_at,
584        ended_at,
585        status,
586        turn_count,
587    })
588}
589
590/// Sanitize an id for use as a path segment — strip any path separators
591/// and `..` so a hostile `agent_id`/`run_id` can't escape the `runs/`
592/// tree. Ids are uuids / slugs in practice; this is defense in depth.
593fn sanitize(id: &str) -> String {
594    let cleaned: String = id
595        .chars()
596        .map(|c| match c {
597            '/' | '\\' | '\0' => '_',
598            c => c,
599        })
600        .collect();
601    let trimmed = cleaned.trim_matches('.');
602    if trimmed.is_empty() {
603        "_".to_string()
604    } else {
605        trimmed.to_string()
606    }
607}
608
609/// Set a directory to `0700` (owner-only). No-op on non-Unix.
610#[cfg(unix)]
611fn set_dir_perms(path: &Path) -> std::io::Result<()> {
612    use std::os::unix::fs::PermissionsExt;
613    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700))
614}
615
616#[cfg(not(unix))]
617fn set_dir_perms(_path: &Path) -> std::io::Result<()> {
618    Ok(())
619}
620
621/// Set a file to `0600` (owner read/write only). No-op on non-Unix.
622#[cfg(unix)]
623fn set_file_perms(path: &Path) -> std::io::Result<()> {
624    use std::os::unix::fs::PermissionsExt;
625    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
626}
627
628#[cfg(not(unix))]
629fn set_file_perms(_path: &Path) -> std::io::Result<()> {
630    Ok(())
631}
632
633/// Mark a directory backup-excluded so Time Machine / iCloud don't copy
634/// plaintext traces off the machine (R14). Best-effort: writes a
635/// `.nobackup` marker file and, on macOS, sets the
636/// `com.apple.metadata:com_apple_backup_excludeItem` xattr. Failures are
637/// swallowed — exclusion is a hardening measure, not a correctness gate.
638fn mark_backup_excluded(dir: &Path) {
639    // Portable marker — picked up by some backup tools and a clear human
640    // signal regardless of platform.
641    let _ = std::fs::write(
642        dir.join(".nobackup"),
643        b"car run traces - excluded from backup\n",
644    );
645    #[cfg(target_os = "macos")]
646    set_macos_backup_excluded(dir);
647}
648
649/// Set the macOS Time Machine exclusion xattr on `dir`. Uses the `xattr`
650/// CLI (always present on macOS) rather than linking a native crate. Time
651/// Machine treats any non-empty value on this attr as "exclude". Best-
652/// effort — a missing `xattr` or a failure is ignored.
653#[cfg(target_os = "macos")]
654fn set_macos_backup_excluded(dir: &Path) {
655    // `xattr -w` writes a string value; Time Machine only checks for the
656    // attr's presence/non-emptiness, not its exact bytes.
657    let _ = std::process::Command::new("xattr")
658        .args(["-w", "com.apple.metadata:com_apple_backup_excludeItem", "1"])
659        .arg(dir)
660        .output();
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666    use car_ir::{AgentOutcome, OutcomeMetrics, OutcomeStatus};
667    use car_proto::{RunEnded, RunStarted, RunTurn, VerifierVerdict};
668    use serde_json::json;
669
670    fn store(root: PathBuf) -> RunStore {
671        RunStore::new(root, RetentionConfig::default())
672    }
673
674    fn started(run_id: &str, agent_id: &str, when: DateTime<Utc>) -> RunStarted {
675        RunStarted {
676            run_id: run_id.to_string(),
677            agent_id: agent_id.to_string(),
678            intent: "do the thing".to_string(),
679            outcome_description: None,
680            started_at: when,
681        }
682    }
683
684    fn turn(index: usize, prompt: &str) -> RunRecord {
685        RunRecord::Turn(RunTurn {
686            index,
687            prompt: Some(prompt.to_string()),
688            tool: Some("drive_cli".to_string()),
689            parameters: json!({ "prompt": prompt }),
690            output: Some(json!({ "exit_code": 0 })),
691            cli_outcome: None,
692            verifier_verdict: VerifierVerdict::NotRun,
693            policy_rejected: None,
694        })
695    }
696
697    fn ended(run_id: &str, agent_id: &str, status: OutcomeStatus) -> RunRecord {
698        let outcome = AgentOutcome {
699            status,
700            summary: "done".to_string(),
701            evidence: vec![],
702            metrics: OutcomeMetrics::default(),
703            timestamp: Utc::now(),
704        };
705        RunRecord::Ended(RunEnded {
706            run_id: run_id.to_string(),
707            agent_id: agent_id.to_string(),
708            termination: RunTermination::Outcome { status, outcome },
709            ended_at: Utc::now(),
710        })
711    }
712
713    /// A completed run is readable from a brand-new store instance — the
714    /// "simulated daemon restart" (new store, empty memory): the trace
715    /// must come back from disk (R4).
716    #[test]
717    fn completed_run_readable_after_restart() {
718        let tmp = tempfile::TempDir::new().unwrap();
719        let root = tmp.path().join("runs");
720        let s1 = store(root.clone());
721        s1.write_started(&started("run-1", "agent-a", Utc::now()))
722            .unwrap();
723        s1.append_turns("agent-a", "run-1", &[turn(0, "first")])
724            .unwrap();
725        s1.append_records(
726            "agent-a",
727            "run-1",
728            &[ended("run-1", "agent-a", OutcomeStatus::Success)],
729        )
730        .unwrap();
731
732        // Brand-new store over the same root = simulated restart with empty memory.
733        let s2 = store(root);
734        let trace = s2
735            .get_run_trace("run-1")
736            .expect("trace readable after restart");
737        assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
738        assert!(matches!(trace.last(), Some(RunRecord::Ended(_))));
739        let turns = trace
740            .iter()
741            .filter(|r| matches!(r, RunRecord::Turn(_)))
742            .count();
743        assert_eq!(turns, 1);
744    }
745
746    /// Each run persists to its own `(agent_id, run_id)` file — no
747    /// cross-contamination between runs or agents (R1).
748    #[test]
749    fn runs_isolated_per_agent_and_run() {
750        let tmp = tempfile::TempDir::new().unwrap();
751        let s = store(tmp.path().join("runs"));
752        s.write_started(&started("run-1", "agent-a", Utc::now()))
753            .unwrap();
754        s.append_turns("agent-a", "run-1", &[turn(0, "a-first")])
755            .unwrap();
756        s.write_started(&started("run-2", "agent-a", Utc::now()))
757            .unwrap();
758        s.append_turns("agent-a", "run-2", &[turn(0, "a-second")])
759            .unwrap();
760        s.write_started(&started("run-3", "agent-b", Utc::now()))
761            .unwrap();
762        s.append_turns("agent-b", "run-3", &[turn(0, "b-first")])
763            .unwrap();
764
765        // Distinct files; each holds only its own turn.
766        let t1 = s.get_run_trace("run-1").unwrap();
767        let t2 = s.get_run_trace("run-2").unwrap();
768        let t3 = s.get_run_trace("run-3").unwrap();
769        assert_eq!(turn_prompt(&t1), "a-first");
770        assert_eq!(turn_prompt(&t2), "a-second");
771        assert_eq!(turn_prompt(&t3), "b-first");
772        // run_id -> agent_id resolution works for replay authz.
773        assert_eq!(s.agent_for_run("run-1").as_deref(), Some("agent-a"));
774        assert_eq!(s.agent_for_run("run-3").as_deref(), Some("agent-b"));
775        // agent-a lists 2 runs, agent-b lists 1.
776        assert_eq!(s.list_runs("agent-a").len(), 2);
777        assert_eq!(s.list_runs("agent-b").len(), 1);
778    }
779
780    fn turn_prompt(trace: &[RunRecord]) -> String {
781        trace
782            .iter()
783            .find_map(|r| match r {
784                RunRecord::Turn(t) => t.prompt.clone(),
785                _ => None,
786            })
787            .unwrap_or_default()
788    }
789
790    /// Files are `0600` and dirs are `0700` (R14 — assert the modes).
791    #[cfg(unix)]
792    #[test]
793    fn perms_are_0600_files_0700_dirs() {
794        use std::os::unix::fs::PermissionsExt;
795        let tmp = tempfile::TempDir::new().unwrap();
796        let root = tmp.path().join("runs");
797        let s = store(root.clone());
798        s.write_started(&started("run-1", "agent-a", Utc::now()))
799            .unwrap();
800
801        let file = root.join("agent-a").join("run-1.jsonl");
802        let fmode = std::fs::metadata(&file).unwrap().permissions().mode() & 0o777;
803        assert_eq!(fmode, 0o600, "run file must be 0600, got {:o}", fmode);
804
805        let root_mode = std::fs::metadata(&root).unwrap().permissions().mode() & 0o777;
806        assert_eq!(
807            root_mode, 0o700,
808            "runs/ dir must be 0700, got {:o}",
809            root_mode
810        );
811        let agent_mode = std::fs::metadata(root.join("agent-a"))
812            .unwrap()
813            .permissions()
814            .mode()
815            & 0o777;
816        assert_eq!(
817            agent_mode, 0o700,
818            "agent dir must be 0700, got {:o}",
819            agent_mode
820        );
821
822        // The backup-exclusion marker is present.
823        assert!(root.join(".nobackup").exists(), ".nobackup marker written");
824    }
825
826    /// A run with no `RunEnded` reads `InProgress`; once the disconnect
827    /// path writes the `Incomplete` terminal it reads `Incomplete` — never
828    /// silently `Completed`. This is the R5 distinguishability the
829    /// dashboard renders.
830    #[test]
831    fn orphan_run_status_distinguishes_inprogress_from_incomplete() {
832        let tmp = tempfile::TempDir::new().unwrap();
833        let s = store(tmp.path().join("runs"));
834        // Stale start time (no harness writing — an orphan).
835        let stale = Utc::now() - chrono::Duration::hours(6);
836        s.write_started(&started("run-1", "agent-a", stale)).unwrap();
837        s.append_turns("agent-a", "run-1", &[turn(0, "first")])
838            .unwrap();
839
840        // Still open: no terminal record yet.
841        let open = &s.list_runs("agent-a")[0];
842        assert_eq!(open.status, RunStatus::InProgress);
843
844        // Disconnect path writes the Incomplete terminal.
845        let incomplete = RunRecord::Ended(RunEnded {
846            run_id: "run-1".to_string(),
847            agent_id: "agent-a".to_string(),
848            termination: RunTermination::Incomplete,
849            ended_at: Utc::now(),
850        });
851        s.append_records("agent-a", "run-1", &[incomplete]).unwrap();
852        let closed = &s.list_runs("agent-a")[0];
853        assert_eq!(closed.status, RunStatus::Incomplete);
854    }
855
856    /// Retention evicts beyond the per-agent cap, never an in-progress
857    /// run (R6).
858    #[test]
859    fn gc_evicts_beyond_per_agent_cap_but_never_in_progress() {
860        let tmp = tempfile::TempDir::new().unwrap();
861        let root = tmp.path().join("runs");
862        let s = RunStore::new(
863            root,
864            RetentionConfig {
865                max_per_agent: 3,
866                max_age_days: 30,
867            },
868        );
869        // 5 completed runs with increasing start times.
870        let base = Utc::now() - chrono::Duration::days(1);
871        for i in 0..5 {
872            let id = format!("c{i}");
873            let when = base + chrono::Duration::minutes(i);
874            s.write_started(&started(&id, "agent-a", when)).unwrap();
875            s.append_records(
876                "agent-a",
877                &id,
878                &[ended(&id, "agent-a", OutcomeStatus::Success)],
879            )
880            .unwrap();
881        }
882        // 1 in-progress run (no terminal) — must survive GC.
883        s.write_started(&started("live", "agent-a", Utc::now()))
884            .unwrap();
885
886        let removed = s.gc();
887        // Keep 3 most-recent completed; evict the 2 oldest completed. The
888        // in-progress run is never counted/evicted.
889        assert_eq!(removed, 2, "should evict the 2 oldest completed runs");
890        let remaining = s.list_runs("agent-a");
891        // 3 kept completed + the live one = 4.
892        assert_eq!(remaining.len(), 4);
893        assert!(
894            remaining.iter().any(|r| r.run_id == "live"),
895            "in-progress run must never be evicted"
896        );
897        // The two oldest (c0, c1) are gone.
898        assert!(!remaining.iter().any(|r| r.run_id == "c0"));
899        assert!(!remaining.iter().any(|r| r.run_id == "c1"));
900    }
901
902    /// Retention evicts completed runs older than the age cap (R6).
903    #[test]
904    fn gc_evicts_runs_older_than_age_cap() {
905        let tmp = tempfile::TempDir::new().unwrap();
906        let s = RunStore::new(
907            tmp.path().join("runs"),
908            RetentionConfig {
909                max_per_agent: 50,
910                max_age_days: 30,
911            },
912        );
913        // One old completed run — started AND ended 40 days ago (its
914        // terminal time is what the age cap measures, FIX 2) — and one
915        // fresh run.
916        let old = Utc::now() - chrono::Duration::days(40);
917        s.write_started(&started("old", "agent-a", old)).unwrap();
918        s.append_records(
919            "agent-a",
920            "old",
921            &[ended_at("old", "agent-a", OutcomeStatus::Success, old)],
922        )
923        .unwrap();
924        s.write_started(&started("fresh", "agent-a", Utc::now()))
925            .unwrap();
926        s.append_records(
927            "agent-a",
928            "fresh",
929            &[ended("fresh", "agent-a", OutcomeStatus::Success)],
930        )
931        .unwrap();
932
933        let removed = s.gc();
934        assert_eq!(removed, 1, "the 40-day-old run should be evicted");
935        let remaining = s.list_runs("agent-a");
936        assert_eq!(remaining.len(), 1);
937        assert_eq!(remaining[0].run_id, "fresh");
938    }
939
940    /// An old run that is still in progress is NOT evicted by the age cap
941    /// (R6 — never evict an open run, even a stale one).
942    #[test]
943    fn gc_never_evicts_stale_in_progress_run() {
944        let tmp = tempfile::TempDir::new().unwrap();
945        let s = RunStore::new(
946            tmp.path().join("runs"),
947            RetentionConfig {
948                max_per_agent: 1,
949                max_age_days: 1,
950            },
951        );
952        let old = Utc::now() - chrono::Duration::days(40);
953        // Old + in-progress (no terminal).
954        s.write_started(&started("stale-live", "agent-a", old))
955            .unwrap();
956        let removed = s.gc();
957        assert_eq!(removed, 0);
958        assert!(s
959            .list_runs("agent-a")
960            .iter()
961            .any(|r| r.run_id == "stale-live"));
962    }
963
964    /// A corrupt/partial trailing JSONL line loads the prior valid
965    /// records rather than failing the whole run (error path).
966    #[test]
967    fn corrupt_trailing_line_loads_prior_records() {
968        let tmp = tempfile::TempDir::new().unwrap();
969        let root = tmp.path().join("runs");
970        let s = store(root.clone());
971        s.write_started(&started("run-1", "agent-a", Utc::now()))
972            .unwrap();
973        s.append_turns("agent-a", "run-1", &[turn(0, "first"), turn(1, "second")])
974            .unwrap();
975
976        // Append a partial/garbage line directly (simulating a crash
977        // mid-append).
978        let path = root.join("agent-a").join("run-1.jsonl");
979        let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
980        writeln!(f, "{{\"record\":\"turn\",\"index\":2,\"prom").unwrap();
981
982        let trace = s.get_run_trace("run-1").expect("trace still loads");
983        // RunStarted + 2 valid turns; the garbage trailing line is dropped.
984        let turns = trace
985            .iter()
986            .filter(|r| matches!(r, RunRecord::Turn(_)))
987            .count();
988        assert_eq!(turns, 2, "prior valid turns load; corrupt line skipped");
989        assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
990    }
991
992    /// `list_runs` for an agent with no runs returns empty (empty-state).
993    #[test]
994    fn list_runs_empty_for_unknown_agent() {
995        let tmp = tempfile::TempDir::new().unwrap();
996        let s = store(tmp.path().join("runs"));
997        assert!(s.list_runs("nobody").is_empty());
998        assert!(s.get_run_trace("nope").is_none());
999        assert!(s.agent_for_run("nope").is_none());
1000    }
1001
1002    /// `from_journal_dir` roots the store at `<car_dir>/runs` as a sibling
1003    /// of the journal dir.
1004    #[test]
1005    fn from_journal_dir_roots_at_car_runs() {
1006        let s = RunStore::from_journal_dir(Path::new("/home/u/.car/journals"));
1007        assert_eq!(s.root(), Path::new("/home/u/.car/runs"));
1008    }
1009
1010    /// Retention config loads `[runs]` overrides from config.toml; missing
1011    /// keys keep the restrictive default.
1012    #[test]
1013    fn retention_config_reads_overrides() {
1014        let tmp = tempfile::TempDir::new().unwrap();
1015        std::fs::write(
1016            tmp.path().join("config.toml"),
1017            "[runs]\nmax_per_agent = 10\n",
1018        )
1019        .unwrap();
1020        let cfg = RetentionConfig::from_car_dir(tmp.path());
1021        assert_eq!(cfg.max_per_agent, 10);
1022        // Missing key keeps the default.
1023        assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
1024    }
1025
1026    /// A missing/malformed config.toml falls back to the restrictive
1027    /// default — never refuses.
1028    #[test]
1029    fn retention_config_defaults_on_missing_file() {
1030        let tmp = tempfile::TempDir::new().unwrap();
1031        let cfg = RetentionConfig::from_car_dir(tmp.path());
1032        assert_eq!(cfg.max_per_agent, DEFAULT_MAX_RUNS_PER_AGENT);
1033        assert_eq!(cfg.max_age_days, DEFAULT_MAX_AGE_DAYS);
1034    }
1035
1036    /// Append the terminal `RunEnded` with an explicit `ended_at` (the
1037    /// disconnect/complete path uses `Utc::now()`, but GC ages by terminal
1038    /// time, so tests need to control it).
1039    fn ended_at(
1040        run_id: &str,
1041        agent_id: &str,
1042        status: OutcomeStatus,
1043        when: DateTime<Utc>,
1044    ) -> RunRecord {
1045        let outcome = AgentOutcome {
1046            status,
1047            summary: "done".to_string(),
1048            evidence: vec![],
1049            metrics: OutcomeMetrics::default(),
1050            timestamp: when,
1051        };
1052        RunRecord::Ended(RunEnded {
1053            run_id: run_id.to_string(),
1054            agent_id: agent_id.to_string(),
1055            termination: RunTermination::Outcome { status, outcome },
1056            ended_at: when,
1057        })
1058    }
1059
1060    /// FIX 2: a run that STARTED >max_age_days ago but COMPLETED recently is
1061    /// a fresh result and must NOT be evicted by the age cap. GC must age by
1062    /// the terminal time, not the start time.
1063    #[test]
1064    fn gc_age_cap_uses_terminal_time_not_start() {
1065        let tmp = tempfile::TempDir::new().unwrap();
1066        let s = RunStore::new(
1067            tmp.path().join("runs"),
1068            RetentionConfig {
1069                max_per_agent: 50,
1070                max_age_days: 30,
1071            },
1072        );
1073        // Long-running run: started 40 days ago, completed 1 day ago.
1074        let started_40d = Utc::now() - chrono::Duration::days(40);
1075        let ended_1d = Utc::now() - chrono::Duration::days(1);
1076        s.write_started(&started("long", "agent-a", started_40d))
1077            .unwrap();
1078        s.append_records(
1079            "agent-a",
1080            "long",
1081            &[ended_at("long", "agent-a", OutcomeStatus::Success, ended_1d)],
1082        )
1083        .unwrap();
1084
1085        let removed = s.gc();
1086        assert_eq!(
1087            removed, 0,
1088            "a run completed 1 day ago must survive the 30-day age cap, \
1089             even if it started 40 days ago"
1090        );
1091        let remaining = s.list_runs("agent-a");
1092        assert_eq!(remaining.len(), 1);
1093        assert_eq!(remaining[0].run_id, "long");
1094    }
1095
1096    /// FIX 4: a crash-orphaned run (RunStarted + Turn, no RunEnded) on disk
1097    /// reads `InProgress`. A fresh store's `adopt_orphans()` at boot — when
1098    /// the in-memory map is empty — appends an `Incomplete` terminal so the
1099    /// run becomes terminal (and thus age-GC-eligible), no longer leaking.
1100    #[test]
1101    fn adopt_orphans_marks_crashed_inprogress_runs_incomplete() {
1102        let tmp = tempfile::TempDir::new().unwrap();
1103        let root = tmp.path().join("runs");
1104        // Prior process wrote a start + a turn, then crashed (no terminal).
1105        let s1 = store(root.clone());
1106        s1.write_started(&started("orphan", "agent-a", Utc::now()))
1107            .unwrap();
1108        s1.append_turns("agent-a", "orphan", &[turn(0, "first")])
1109            .unwrap();
1110        assert_eq!(
1111            s1.list_runs("agent-a")[0].status,
1112            RunStatus::InProgress,
1113            "precondition: orphan reads InProgress before adoption"
1114        );
1115
1116        // Fresh store = new process boot, empty in-memory map.
1117        let s2 = store(root);
1118        let adopted = s2.adopt_orphans();
1119        assert_eq!(adopted, 1, "the crash orphan should be adopted");
1120
1121        let after = &s2.list_runs("agent-a")[0];
1122        assert_eq!(
1123            after.status,
1124            RunStatus::Incomplete,
1125            "adopted orphan now reads Incomplete (terminal)"
1126        );
1127        assert!(after.ended_at.is_some(), "terminal record has an ended_at");
1128
1129        // Idempotent: a second boot finds no orphans (already terminal).
1130        assert_eq!(s2.adopt_orphans(), 0);
1131    }
1132
1133    /// FIX 4 (corollary): a completed run is NOT adopted (it already has a
1134    /// terminal record).
1135    #[test]
1136    fn adopt_orphans_leaves_completed_runs_alone() {
1137        let tmp = tempfile::TempDir::new().unwrap();
1138        let root = tmp.path().join("runs");
1139        let s = store(root);
1140        s.write_started(&started("done", "agent-a", Utc::now()))
1141            .unwrap();
1142        s.append_records(
1143            "agent-a",
1144            "done",
1145            &[ended("done", "agent-a", OutcomeStatus::Success)],
1146        )
1147        .unwrap();
1148        assert_eq!(s.adopt_orphans(), 0);
1149        assert_eq!(s.list_runs("agent-a")[0].status, RunStatus::Completed);
1150    }
1151
1152    /// FIX 7: a torn tail (a partial line with no trailing '\n') must not
1153    /// cause the NEXT valid append to be dropped. `append_records` inserts a
1154    /// separating '\n' so the torn fragment is its own skippable line and
1155    /// the new record loads intact.
1156    #[test]
1157    fn torn_tail_does_not_drop_following_valid_record() {
1158        let tmp = tempfile::TempDir::new().unwrap();
1159        let root = tmp.path().join("runs");
1160        let s = store(root.clone());
1161        s.write_started(&started("run-1", "agent-a", Utc::now()))
1162            .unwrap();
1163        s.append_turns("agent-a", "run-1", &[turn(0, "first")])
1164            .unwrap();
1165
1166        // Simulate a torn append: a partial line with NO trailing newline.
1167        let path = root.join("agent-a").join("run-1.jsonl");
1168        {
1169            let mut f = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
1170            // No writeln! — deliberately leaves the file's last byte != '\n'.
1171            f.write_all(b"{\"record\":\"turn\",\"index\":1,\"prom").unwrap();
1172        }
1173        assert!(
1174            last_byte_is_not_newline(&path).unwrap(),
1175            "precondition: tail is torn (no trailing newline)"
1176        );
1177
1178        // Now append a FULLY VALID record via the store. Without the FIX-7
1179        // leading newline this would concatenate onto the torn fragment and
1180        // be dropped by load_records.
1181        s.append_turns("agent-a", "run-1", &[turn(2, "third")])
1182            .unwrap();
1183
1184        let trace = s.get_run_trace("run-1").expect("trace loads");
1185        // Started + turn 0 ("first") + the new valid turn ("third"). The
1186        // torn middle fragment is skipped; the valid record after it loads.
1187        let turn_prompts: Vec<String> = trace
1188            .iter()
1189            .filter_map(|r| match r {
1190                RunRecord::Turn(t) => t.prompt.clone(),
1191                _ => None,
1192            })
1193            .collect();
1194        assert!(
1195            turn_prompts.contains(&"third".to_string()),
1196            "the valid record appended after a torn tail must survive, got {:?}",
1197            turn_prompts
1198        );
1199        assert!(matches!(trace.first(), Some(RunRecord::Started(_))));
1200    }
1201}