use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use oxidized_state::storage_traits::{ContentDigest, RunEvent, RunRecord};
use crate::domain::{AivcsError, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunTraceArtifact {
pub run_id: String,
pub spec_digest: String,
pub agent_name: String,
pub status: String,
pub created_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub events: Vec<RunEvent>,
pub replay_digest: String,
pub event_count: usize,
}
impl RunTraceArtifact {
pub fn from_replay(record: &RunRecord, events: Vec<RunEvent>, replay_digest: String) -> Self {
let status = format!("{:?}", record.status);
Self {
run_id: record.run_id.to_string(),
spec_digest: record.spec_digest.as_str().to_string(),
agent_name: record.metadata.agent_name.clone(),
status,
created_at: record.created_at,
completed_at: record.completed_at,
event_count: events.len(),
events,
replay_digest,
}
}
}
pub fn write_trace_artifact(artifact: &RunTraceArtifact, dir: &Path) -> Result<PathBuf> {
let run_dir = dir.join(&artifact.run_id);
std::fs::create_dir_all(&run_dir)?;
let trace_path = run_dir.join("trace.json");
let digest_path = run_dir.join("trace.digest");
let json = serde_json::to_vec_pretty(artifact)?;
std::fs::write(&trace_path, &json)?;
std::fs::write(&digest_path, artifact.replay_digest.as_bytes())?;
Ok(trace_path)
}
pub fn read_trace_artifact(run_id: &str, dir: &Path) -> Result<RunTraceArtifact> {
let run_dir = dir.join(run_id);
let trace_path = run_dir.join("trace.json");
let digest_path = run_dir.join("trace.digest");
let json = std::fs::read(&trace_path)?;
let artifact: RunTraceArtifact = serde_json::from_slice(&json)?;
if digest_path.exists() {
let companion_digest = std::fs::read_to_string(&digest_path)?.trim().to_string();
if companion_digest != artifact.replay_digest {
return Err(AivcsError::DigestMismatch {
expected: companion_digest,
actual: artifact.replay_digest.clone(),
});
}
}
let events_json = serde_json::to_vec(&artifact.events)?;
let actual_digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
if actual_digest != artifact.replay_digest {
return Err(AivcsError::DigestMismatch {
expected: artifact.replay_digest.clone(),
actual: actual_digest,
});
}
Ok(artifact)
}
#[derive(Debug, Clone, Default)]
pub struct RetentionPolicy {
pub max_age_days: Option<u64>,
pub max_runs: Option<usize>,
}
impl RetentionPolicy {
pub fn prune(&self, dir: &Path) -> Result<usize> {
let mut entries: Vec<(DateTime<Utc>, PathBuf)> = Vec::new();
let read_dir = match std::fs::read_dir(dir) {
Ok(rd) => rd,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(AivcsError::Io(e)),
};
for entry in read_dir {
let entry = entry?;
let trace_path = entry.path().join("trace.json");
if !trace_path.exists() {
continue;
}
let json = std::fs::read(&trace_path)?;
if let Ok(artifact) = serde_json::from_slice::<RunTraceArtifact>(&json) {
entries.push((artifact.created_at, entry.path()));
}
}
entries.sort_by(|a, b| b.0.cmp(&a.0));
let mut pruned = 0usize;
let now = Utc::now();
if let Some(max_days) = self.max_age_days {
let cutoff = now - chrono::Duration::days(max_days as i64);
let mut kept = Vec::new();
for (created_at, path) in entries {
if created_at < cutoff {
std::fs::remove_dir_all(&path)?;
pruned += 1;
} else {
kept.push((created_at, path));
}
}
entries = kept;
}
if let Some(max_runs) = self.max_runs {
if entries.len() > max_runs {
for (_, path) in entries.drain(max_runs..) {
std::fs::remove_dir_all(&path)?;
pruned += 1;
}
}
}
Ok(pruned)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use oxidized_state::storage_traits::{RunId, RunMetadata, RunStatus, RunSummary};
use tempfile::tempdir;
fn make_record(run_id: &str, created_at: DateTime<Utc>) -> RunRecord {
RunRecord {
run_id: RunId(run_id.to_string()),
spec_digest: ContentDigest::from_bytes(b"spec"),
metadata: RunMetadata {
git_sha: None,
agent_name: "agent".to_string(),
tags: serde_json::json!({}),
},
status: RunStatus::Completed,
summary: Some(RunSummary {
total_events: 1,
final_state_digest: None,
duration_ms: 10,
success: true,
}),
created_at,
completed_at: Some(created_at),
}
}
fn make_events(ts: DateTime<Utc>) -> Vec<RunEvent> {
vec![RunEvent {
seq: 1,
kind: "graph_started".to_string(),
payload: serde_json::json!({}),
timestamp: ts,
}]
}
#[test]
fn test_write_and_read_trace_artifact_roundtrip() {
let dir = tempdir().expect("tempdir");
let ts = Utc::now();
let events = make_events(ts);
let events_json = serde_json::to_vec(&events).unwrap();
let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
let record = make_record("run-abc", ts);
let artifact = RunTraceArtifact::from_replay(&record, events.clone(), digest.clone());
let path = write_trace_artifact(&artifact, dir.path()).expect("write");
assert!(path.exists());
let loaded = read_trace_artifact("run-abc", dir.path()).expect("read");
assert_eq!(loaded.run_id, "run-abc");
assert_eq!(loaded.agent_name, "agent");
assert_eq!(loaded.replay_digest, digest);
assert_eq!(loaded.event_count, 1);
assert_eq!(loaded.events.len(), 1);
}
#[test]
fn test_read_trace_artifact_digest_mismatch_rejected() {
let dir = tempdir().expect("tempdir");
let ts = Utc::now();
let events = make_events(ts);
let record = make_record("run-xyz", ts);
let artifact = RunTraceArtifact::from_replay(&record, events, "a".repeat(64));
let run_dir = dir.path().join("run-xyz");
std::fs::create_dir_all(&run_dir).unwrap();
let json = serde_json::to_vec_pretty(&artifact).unwrap();
std::fs::write(run_dir.join("trace.json"), &json).unwrap();
let result = read_trace_artifact("run-xyz", dir.path());
assert!(result.is_err());
match result.unwrap_err() {
AivcsError::DigestMismatch { .. } => {}
other => panic!("Expected DigestMismatch, got {:?}", other),
}
}
#[test]
fn test_retention_policy_prunes_old_runs() {
let dir = tempdir().expect("tempdir");
let now = Utc::now();
for (id, days_ago) in [("run-new", 0i64), ("run-old1", 10), ("run-old2", 20)] {
let ts = now - chrono::Duration::days(days_ago);
let events = make_events(ts);
let events_json = serde_json::to_vec(&events).unwrap();
let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
let record = make_record(id, ts);
let artifact = RunTraceArtifact::from_replay(&record, events, digest);
write_trace_artifact(&artifact, dir.path()).expect("write");
}
let policy = RetentionPolicy {
max_age_days: Some(5),
max_runs: None,
};
let pruned = policy.prune(dir.path()).expect("prune");
assert_eq!(pruned, 2, "should prune the two old runs");
assert!(dir.path().join("run-new").join("trace.json").exists());
assert!(!dir.path().join("run-old1").exists());
assert!(!dir.path().join("run-old2").exists());
}
#[test]
fn test_retention_policy_max_runs() {
let dir = tempdir().expect("tempdir");
let now = Utc::now();
for (id, days_ago) in [("run-1", 0i64), ("run-2", 1), ("run-3", 2), ("run-4", 3)] {
let ts = now - chrono::Duration::days(days_ago);
let events = make_events(ts);
let events_json = serde_json::to_vec(&events).unwrap();
let digest = ContentDigest::from_bytes(&events_json).as_str().to_string();
let record = make_record(id, ts);
let artifact = RunTraceArtifact::from_replay(&record, events, digest);
write_trace_artifact(&artifact, dir.path()).expect("write");
}
let policy = RetentionPolicy {
max_age_days: None,
max_runs: Some(2),
};
let pruned = policy.prune(dir.path()).expect("prune");
assert_eq!(pruned, 2, "should prune 2 oldest runs");
assert!(dir.path().join("run-1").join("trace.json").exists());
assert!(dir.path().join("run-2").join("trace.json").exists());
assert!(!dir.path().join("run-3").exists());
assert!(!dir.path().join("run-4").exists());
}
}