Skip to main content

aivcs_core/
trace_artifact.rs

1//! Run trace artifact persistence and retention policy.
2//!
3//! A [`RunTraceArtifact`] is a self-contained, content-verified record of a
4//! completed run. It includes the event sequence, a SHA-256 replay digest,
5//! and provenance fields from the [`RunRecord`].
6//!
7//! Artifacts are written to `<dir>/<run_id>/trace.json` with a companion
8//! `<dir>/<run_id>/trace.digest` file for integrity checks.
9//!
10//! [`RetentionPolicy`] can prune an artifact directory by age or count.
11
12use std::path::{Path, PathBuf};
13
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16
17use oxidized_state::storage_traits::{ContentDigest, RunEvent, RunRecord};
18
19use crate::domain::{AivcsError, Result};
20
21/// A self-contained, integrity-checked record of a completed run.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RunTraceArtifact {
24    /// The run identifier.
25    pub run_id: String,
26    /// Hex string of the spec digest recorded at run creation.
27    pub spec_digest: String,
28    /// Agent name from the run metadata.
29    pub agent_name: String,
30    /// Final status string (`"Completed"` or `"Failed"`).
31    pub status: String,
32    /// When the run was created.
33    pub created_at: DateTime<Utc>,
34    /// When the run reached a terminal state, if known.
35    pub completed_at: Option<DateTime<Utc>>,
36    /// All events in seq order.
37    pub events: Vec<RunEvent>,
38    /// SHA-256 hex digest of `serde_json::to_vec(&events)`.
39    pub replay_digest: String,
40    /// Number of events.
41    pub event_count: usize,
42}
43
44impl RunTraceArtifact {
45    /// Construct a `RunTraceArtifact` from a run record, event list, and pre-computed digest.
46    pub fn from_replay(record: &RunRecord, events: Vec<RunEvent>, replay_digest: String) -> Self {
47        let status = format!("{:?}", record.status);
48        Self {
49            run_id: record.run_id.to_string(),
50            spec_digest: record.spec_digest.as_str().to_string(),
51            agent_name: record.metadata.agent_name.clone(),
52            status,
53            created_at: record.created_at,
54            completed_at: record.completed_at,
55            event_count: events.len(),
56            events,
57            replay_digest,
58        }
59    }
60}
61
62/// Write a `RunTraceArtifact` to `<dir>/<run_id>/trace.json`.
63///
64/// Also writes `<dir>/<run_id>/trace.digest` containing the replay digest for
65/// out-of-band verification.
66///
67/// Returns the path to `trace.json`.
68pub fn write_trace_artifact(artifact: &RunTraceArtifact, dir: &Path) -> Result<PathBuf> {
69    let run_dir = dir.join(&artifact.run_id);
70    std::fs::create_dir_all(&run_dir)?;
71
72    let trace_path = run_dir.join("trace.json");
73    let digest_path = run_dir.join("trace.digest");
74
75    let json = serde_json::to_vec_pretty(artifact)?;
76    std::fs::write(&trace_path, &json)?;
77    std::fs::write(&digest_path, artifact.replay_digest.as_bytes())?;
78
79    Ok(trace_path)
80}
81
82/// Read and integrity-verify a `RunTraceArtifact` from `<dir>/<run_id>/trace.json`.
83///
84/// Performs two integrity checks:
85/// 1. Reads `trace.digest` (companion file) and compares it to the `replay_digest`
86///    stored inside `trace.json`. Detects out-of-band tampering of `trace.json`
87///    when the companion digest file was not also updated.
88/// 2. Recomputes the SHA-256 digest of the event list and compares it to the
89///    stored `replay_digest`. Detects in-place event tampering.
90///
91/// Returns `AivcsError::DigestMismatch` if either check fails.
92pub fn read_trace_artifact(run_id: &str, dir: &Path) -> Result<RunTraceArtifact> {
93    let run_dir = dir.join(run_id);
94    let trace_path = run_dir.join("trace.json");
95    let digest_path = run_dir.join("trace.digest");
96
97    let json = std::fs::read(&trace_path)?;
98    let artifact: RunTraceArtifact = serde_json::from_slice(&json)?;
99
100    // Check 1: verify companion trace.digest matches the JSON's replay_digest
101    if digest_path.exists() {
102        let companion_digest = std::fs::read_to_string(&digest_path)?.trim().to_string();
103        if companion_digest != artifact.replay_digest {
104            return Err(AivcsError::DigestMismatch {
105                expected: companion_digest,
106                actual: artifact.replay_digest.clone(),
107            });
108        }
109    }
110
111    // Check 2: re-derive the digest from events and verify it matches
112    let events_json = serde_json::to_vec(&artifact.events)?;
113    let actual_digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
114
115    if actual_digest != artifact.replay_digest {
116        return Err(AivcsError::DigestMismatch {
117            expected: artifact.replay_digest.clone(),
118            actual: actual_digest,
119        });
120    }
121
122    Ok(artifact)
123}
124
125/// Retention policy for pruning run trace artifact directories.
126#[derive(Debug, Clone, Default)]
127pub struct RetentionPolicy {
128    /// Remove runs older than this many days. `None` means no age limit.
129    pub max_age_days: Option<u64>,
130    /// Keep at most this many runs (newest first). `None` means no count limit.
131    pub max_runs: Option<usize>,
132}
133
134impl RetentionPolicy {
135    /// Scan `<dir>/*/trace.json`, apply retention rules, and delete runs that
136    /// exceed the policy.
137    ///
138    /// Returns the number of pruned entries.
139    ///
140    /// Rules are applied in order:
141    /// 1. Age: runs with `created_at` older than `max_age_days` are deleted.
142    /// 2. Count: after age pruning, if more than `max_runs` remain, the oldest
143    ///    are deleted until the count limit is satisfied.
144    pub fn prune(&self, dir: &Path) -> Result<usize> {
145        // Collect all run artifact directories that contain a trace.json
146        let mut entries: Vec<(DateTime<Utc>, PathBuf)> = Vec::new();
147
148        let read_dir = match std::fs::read_dir(dir) {
149            Ok(rd) => rd,
150            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
151            Err(e) => return Err(AivcsError::Io(e)),
152        };
153
154        for entry in read_dir {
155            let entry = entry?;
156            let trace_path = entry.path().join("trace.json");
157            if !trace_path.exists() {
158                continue;
159            }
160            let json = std::fs::read(&trace_path)?;
161            if let Ok(artifact) = serde_json::from_slice::<RunTraceArtifact>(&json) {
162                entries.push((artifact.created_at, entry.path()));
163            }
164        }
165
166        // Sort by created_at descending (newest first) for count-based pruning
167        entries.sort_by(|a, b| b.0.cmp(&a.0));
168
169        let mut pruned = 0usize;
170        let now = Utc::now();
171
172        // Age-based pruning
173        if let Some(max_days) = self.max_age_days {
174            let cutoff = now - chrono::Duration::days(max_days as i64);
175            let mut kept = Vec::new();
176            for (created_at, path) in entries {
177                if created_at < cutoff {
178                    std::fs::remove_dir_all(&path)?;
179                    pruned += 1;
180                } else {
181                    kept.push((created_at, path));
182                }
183            }
184            entries = kept;
185        }
186
187        // Count-based pruning (entries is already newest-first)
188        if let Some(max_runs) = self.max_runs {
189            if entries.len() > max_runs {
190                for (_, path) in entries.drain(max_runs..) {
191                    std::fs::remove_dir_all(&path)?;
192                    pruned += 1;
193                }
194            }
195        }
196
197        Ok(pruned)
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use chrono::Utc;
205    use oxidized_state::storage_traits::{RunId, RunMetadata, RunStatus, RunSummary};
206    use tempfile::tempdir;
207
208    fn make_record(run_id: &str, created_at: DateTime<Utc>) -> RunRecord {
209        RunRecord {
210            run_id: RunId(run_id.to_string()),
211            spec_digest: ContentDigest::from_bytes(b"spec"),
212            metadata: RunMetadata {
213                git_sha: None,
214                agent_name: "agent".to_string(),
215                tags: serde_json::json!({}),
216            },
217            status: RunStatus::Completed,
218            summary: Some(RunSummary {
219                total_events: 1,
220                final_state_digest: None,
221                duration_ms: 10,
222                success: true,
223            }),
224            created_at,
225            completed_at: Some(created_at),
226        }
227    }
228
229    fn make_events(ts: DateTime<Utc>) -> Vec<RunEvent> {
230        vec![RunEvent {
231            seq: 1,
232            kind: "graph_started".to_string(),
233            payload: serde_json::json!({}),
234            timestamp: ts,
235        }]
236    }
237
238    #[test]
239    fn test_write_and_read_trace_artifact_roundtrip() {
240        let dir = tempdir().expect("tempdir");
241        let ts = Utc::now();
242        let events = make_events(ts);
243        let events_json = serde_json::to_vec(&events).unwrap();
244        let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
245
246        let record = make_record("run-abc", ts);
247        let artifact = RunTraceArtifact::from_replay(&record, events.clone(), digest.clone());
248
249        let path = write_trace_artifact(&artifact, dir.path()).expect("write");
250        assert!(path.exists());
251
252        let loaded = read_trace_artifact("run-abc", dir.path()).expect("read");
253
254        assert_eq!(loaded.run_id, "run-abc");
255        assert_eq!(loaded.agent_name, "agent");
256        assert_eq!(loaded.replay_digest, digest);
257        assert_eq!(loaded.event_count, 1);
258        assert_eq!(loaded.events.len(), 1);
259    }
260
261    #[test]
262    fn test_read_trace_artifact_digest_mismatch_rejected() {
263        let dir = tempdir().expect("tempdir");
264        let ts = Utc::now();
265        let events = make_events(ts);
266
267        let record = make_record("run-xyz", ts);
268        // Use a deliberately wrong digest
269        let artifact = RunTraceArtifact::from_replay(&record, events, "a".repeat(64));
270
271        // Write with tampered digest
272        let run_dir = dir.path().join("run-xyz");
273        std::fs::create_dir_all(&run_dir).unwrap();
274        let json = serde_json::to_vec_pretty(&artifact).unwrap();
275        std::fs::write(run_dir.join("trace.json"), &json).unwrap();
276
277        let result = read_trace_artifact("run-xyz", dir.path());
278        assert!(result.is_err());
279        match result.unwrap_err() {
280            AivcsError::DigestMismatch { .. } => {}
281            other => panic!("Expected DigestMismatch, got {:?}", other),
282        }
283    }
284
285    #[test]
286    fn test_retention_policy_prunes_old_runs() {
287        let dir = tempdir().expect("tempdir");
288        let now = Utc::now();
289
290        // Create three runs: one recent, two old
291        for (id, days_ago) in [("run-new", 0i64), ("run-old1", 10), ("run-old2", 20)] {
292            let ts = now - chrono::Duration::days(days_ago);
293            let events = make_events(ts);
294            let events_json = serde_json::to_vec(&events).unwrap();
295            let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
296            let record = make_record(id, ts);
297            let artifact = RunTraceArtifact::from_replay(&record, events, digest);
298            write_trace_artifact(&artifact, dir.path()).expect("write");
299        }
300
301        let policy = RetentionPolicy {
302            max_age_days: Some(5),
303            max_runs: None,
304        };
305
306        let pruned = policy.prune(dir.path()).expect("prune");
307        assert_eq!(pruned, 2, "should prune the two old runs");
308
309        // Only the recent run should remain
310        assert!(dir.path().join("run-new").join("trace.json").exists());
311        assert!(!dir.path().join("run-old1").exists());
312        assert!(!dir.path().join("run-old2").exists());
313    }
314
315    #[test]
316    fn test_retention_policy_max_runs() {
317        let dir = tempdir().expect("tempdir");
318        let now = Utc::now();
319
320        // Create 4 runs with different ages
321        for (id, days_ago) in [("run-1", 0i64), ("run-2", 1), ("run-3", 2), ("run-4", 3)] {
322            let ts = now - chrono::Duration::days(days_ago);
323            let events = make_events(ts);
324            let events_json = serde_json::to_vec(&events).unwrap();
325            let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
326            let record = make_record(id, ts);
327            let artifact = RunTraceArtifact::from_replay(&record, events, digest);
328            write_trace_artifact(&artifact, dir.path()).expect("write");
329        }
330
331        let policy = RetentionPolicy {
332            max_age_days: None,
333            max_runs: Some(2),
334        };
335
336        let pruned = policy.prune(dir.path()).expect("prune");
337        assert_eq!(pruned, 2, "should prune 2 oldest runs");
338
339        // The two newest (run-1, run-2) should remain
340        assert!(dir.path().join("run-1").join("trace.json").exists());
341        assert!(dir.path().join("run-2").join("trace.json").exists());
342        assert!(!dir.path().join("run-3").exists());
343        assert!(!dir.path().join("run-4").exists());
344    }
345}