1use std::path::{Path, PathBuf};
13
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16
17use oxidized_state::storage_traits::{ContentDigest, RunEvent, RunRecord};
18
19use crate::domain::{AivcsError, Result};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RunTraceArtifact {
24 pub run_id: String,
26 pub spec_digest: String,
28 pub agent_name: String,
30 pub status: String,
32 pub created_at: DateTime<Utc>,
34 pub completed_at: Option<DateTime<Utc>>,
36 pub events: Vec<RunEvent>,
38 pub replay_digest: String,
40 pub event_count: usize,
42}
43
44impl RunTraceArtifact {
45 pub fn from_replay(record: &RunRecord, events: Vec<RunEvent>, replay_digest: String) -> Self {
47 let status = format!("{:?}", record.status);
48 Self {
49 run_id: record.run_id.to_string(),
50 spec_digest: record.spec_digest.as_str().to_string(),
51 agent_name: record.metadata.agent_name.clone(),
52 status,
53 created_at: record.created_at,
54 completed_at: record.completed_at,
55 event_count: events.len(),
56 events,
57 replay_digest,
58 }
59 }
60}
61
62pub fn write_trace_artifact(artifact: &RunTraceArtifact, dir: &Path) -> Result<PathBuf> {
69 let run_dir = dir.join(&artifact.run_id);
70 std::fs::create_dir_all(&run_dir)?;
71
72 let trace_path = run_dir.join("trace.json");
73 let digest_path = run_dir.join("trace.digest");
74
75 let json = serde_json::to_vec_pretty(artifact)?;
76 std::fs::write(&trace_path, &json)?;
77 std::fs::write(&digest_path, artifact.replay_digest.as_bytes())?;
78
79 Ok(trace_path)
80}
81
82pub fn read_trace_artifact(run_id: &str, dir: &Path) -> Result<RunTraceArtifact> {
93 let run_dir = dir.join(run_id);
94 let trace_path = run_dir.join("trace.json");
95 let digest_path = run_dir.join("trace.digest");
96
97 let json = std::fs::read(&trace_path)?;
98 let artifact: RunTraceArtifact = serde_json::from_slice(&json)?;
99
100 if digest_path.exists() {
102 let companion_digest = std::fs::read_to_string(&digest_path)?.trim().to_string();
103 if companion_digest != artifact.replay_digest {
104 return Err(AivcsError::DigestMismatch {
105 expected: companion_digest,
106 actual: artifact.replay_digest.clone(),
107 });
108 }
109 }
110
111 let events_json = serde_json::to_vec(&artifact.events)?;
113 let actual_digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
114
115 if actual_digest != artifact.replay_digest {
116 return Err(AivcsError::DigestMismatch {
117 expected: artifact.replay_digest.clone(),
118 actual: actual_digest,
119 });
120 }
121
122 Ok(artifact)
123}
124
125#[derive(Debug, Clone, Default)]
127pub struct RetentionPolicy {
128 pub max_age_days: Option<u64>,
130 pub max_runs: Option<usize>,
132}
133
134impl RetentionPolicy {
135 pub fn prune(&self, dir: &Path) -> Result<usize> {
145 let mut entries: Vec<(DateTime<Utc>, PathBuf)> = Vec::new();
147
148 let read_dir = match std::fs::read_dir(dir) {
149 Ok(rd) => rd,
150 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
151 Err(e) => return Err(AivcsError::Io(e)),
152 };
153
154 for entry in read_dir {
155 let entry = entry?;
156 let trace_path = entry.path().join("trace.json");
157 if !trace_path.exists() {
158 continue;
159 }
160 let json = std::fs::read(&trace_path)?;
161 if let Ok(artifact) = serde_json::from_slice::<RunTraceArtifact>(&json) {
162 entries.push((artifact.created_at, entry.path()));
163 }
164 }
165
166 entries.sort_by(|a, b| b.0.cmp(&a.0));
168
169 let mut pruned = 0usize;
170 let now = Utc::now();
171
172 if let Some(max_days) = self.max_age_days {
174 let cutoff = now - chrono::Duration::days(max_days as i64);
175 let mut kept = Vec::new();
176 for (created_at, path) in entries {
177 if created_at < cutoff {
178 std::fs::remove_dir_all(&path)?;
179 pruned += 1;
180 } else {
181 kept.push((created_at, path));
182 }
183 }
184 entries = kept;
185 }
186
187 if let Some(max_runs) = self.max_runs {
189 if entries.len() > max_runs {
190 for (_, path) in entries.drain(max_runs..) {
191 std::fs::remove_dir_all(&path)?;
192 pruned += 1;
193 }
194 }
195 }
196
197 Ok(pruned)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use chrono::Utc;
205 use oxidized_state::storage_traits::{RunId, RunMetadata, RunStatus, RunSummary};
206 use tempfile::tempdir;
207
208 fn make_record(run_id: &str, created_at: DateTime<Utc>) -> RunRecord {
209 RunRecord {
210 run_id: RunId(run_id.to_string()),
211 spec_digest: ContentDigest::from_bytes(b"spec"),
212 metadata: RunMetadata {
213 git_sha: None,
214 agent_name: "agent".to_string(),
215 tags: serde_json::json!({}),
216 },
217 status: RunStatus::Completed,
218 summary: Some(RunSummary {
219 total_events: 1,
220 final_state_digest: None,
221 duration_ms: 10,
222 success: true,
223 }),
224 created_at,
225 completed_at: Some(created_at),
226 }
227 }
228
229 fn make_events(ts: DateTime<Utc>) -> Vec<RunEvent> {
230 vec![RunEvent {
231 seq: 1,
232 kind: "graph_started".to_string(),
233 payload: serde_json::json!({}),
234 timestamp: ts,
235 }]
236 }
237
238 #[test]
239 fn test_write_and_read_trace_artifact_roundtrip() {
240 let dir = tempdir().expect("tempdir");
241 let ts = Utc::now();
242 let events = make_events(ts);
243 let events_json = serde_json::to_vec(&events).unwrap();
244 let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
245
246 let record = make_record("run-abc", ts);
247 let artifact = RunTraceArtifact::from_replay(&record, events.clone(), digest.clone());
248
249 let path = write_trace_artifact(&artifact, dir.path()).expect("write");
250 assert!(path.exists());
251
252 let loaded = read_trace_artifact("run-abc", dir.path()).expect("read");
253
254 assert_eq!(loaded.run_id, "run-abc");
255 assert_eq!(loaded.agent_name, "agent");
256 assert_eq!(loaded.replay_digest, digest);
257 assert_eq!(loaded.event_count, 1);
258 assert_eq!(loaded.events.len(), 1);
259 }
260
261 #[test]
262 fn test_read_trace_artifact_digest_mismatch_rejected() {
263 let dir = tempdir().expect("tempdir");
264 let ts = Utc::now();
265 let events = make_events(ts);
266
267 let record = make_record("run-xyz", ts);
268 let artifact = RunTraceArtifact::from_replay(&record, events, "a".repeat(64));
270
271 let run_dir = dir.path().join("run-xyz");
273 std::fs::create_dir_all(&run_dir).unwrap();
274 let json = serde_json::to_vec_pretty(&artifact).unwrap();
275 std::fs::write(run_dir.join("trace.json"), &json).unwrap();
276
277 let result = read_trace_artifact("run-xyz", dir.path());
278 assert!(result.is_err());
279 match result.unwrap_err() {
280 AivcsError::DigestMismatch { .. } => {}
281 other => panic!("Expected DigestMismatch, got {:?}", other),
282 }
283 }
284
285 #[test]
286 fn test_retention_policy_prunes_old_runs() {
287 let dir = tempdir().expect("tempdir");
288 let now = Utc::now();
289
290 for (id, days_ago) in [("run-new", 0i64), ("run-old1", 10), ("run-old2", 20)] {
292 let ts = now - chrono::Duration::days(days_ago);
293 let events = make_events(ts);
294 let events_json = serde_json::to_vec(&events).unwrap();
295 let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
296 let record = make_record(id, ts);
297 let artifact = RunTraceArtifact::from_replay(&record, events, digest);
298 write_trace_artifact(&artifact, dir.path()).expect("write");
299 }
300
301 let policy = RetentionPolicy {
302 max_age_days: Some(5),
303 max_runs: None,
304 };
305
306 let pruned = policy.prune(dir.path()).expect("prune");
307 assert_eq!(pruned, 2, "should prune the two old runs");
308
309 assert!(dir.path().join("run-new").join("trace.json").exists());
311 assert!(!dir.path().join("run-old1").exists());
312 assert!(!dir.path().join("run-old2").exists());
313 }
314
315 #[test]
316 fn test_retention_policy_max_runs() {
317 let dir = tempdir().expect("tempdir");
318 let now = Utc::now();
319
320 for (id, days_ago) in [("run-1", 0i64), ("run-2", 1), ("run-3", 2), ("run-4", 3)] {
322 let ts = now - chrono::Duration::days(days_ago);
323 let events = make_events(ts);
324 let events_json = serde_json::to_vec(&events).unwrap();
325 let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
326 let record = make_record(id, ts);
327 let artifact = RunTraceArtifact::from_replay(&record, events, digest);
328 write_trace_artifact(&artifact, dir.path()).expect("write");
329 }
330
331 let policy = RetentionPolicy {
332 max_age_days: None,
333 max_runs: Some(2),
334 };
335
336 let pruned = policy.prune(dir.path()).expect("prune");
337 assert_eq!(pruned, 2, "should prune 2 oldest runs");
338
339 assert!(dir.path().join("run-1").join("trace.json").exists());
341 assert!(dir.path().join("run-2").join("trace.json").exists());
342 assert!(!dir.path().join("run-3").exists());
343 assert!(!dir.path().join("run-4").exists());
344 }
345}