use crate::error::Result;
use crate::event::{Event, EventLog};
use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use parking_lot::Mutex;
const TRACE_DIR: &str = ".nika/traces";
pub struct TraceWriter {
writer: Arc<Mutex<BufWriter<File>>>,
path: PathBuf,
}
impl TraceWriter {
pub fn new(generation_id: &str) -> Result<Self> {
if generation_id.is_empty()
|| generation_id.contains("..")
|| generation_id.contains('/')
|| generation_id.contains('\\')
|| !generation_id
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T')
{
return Err(crate::error::NikaError::ValidationError {
reason: format!(
"Invalid generation_id: must be alphanumeric with hyphens/underscores only, got: {}",
generation_id
),
});
}
let trace_dir = Path::new(TRACE_DIR);
fs::create_dir_all(trace_dir)?;
let filename = format!("{}.ndjson", generation_id);
let path = trace_dir.join(&filename);
let file = File::create(&path)?;
let writer = BufWriter::new(file);
tracing::info!(path = %path.display(), "Created trace file");
Ok(Self {
writer: Arc::new(Mutex::new(writer)),
path,
})
}
pub fn write_event(&self, event: &Event) -> Result<()> {
let json = serde_json::to_string(event)?;
let mut writer = self.writer.lock();
writeln!(writer, "{}", json)?;
writer.flush()?;
Ok(())
}
pub fn write_all(&self, event_log: &EventLog) -> Result<()> {
let events = event_log.events();
for event in events {
self.write_event(&event)?;
}
Ok(())
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn close(&self) -> Result<()> {
let mut writer = self.writer.lock();
writer.flush()?;
Ok(())
}
}
pub fn generate_generation_id() -> String {
use chrono::Utc;
let now = Utc::now();
let timestamp = now.format("%Y-%m-%dT%H-%M-%S");
let random: u32 = rand::random::<u32>() % 0x10000;
format!("{}-{:04x}", timestamp, random)
}
pub fn calculate_workflow_hash(yaml: &str) -> String {
use xxhash_rust::xxh3::xxh3_64;
let hash = xxh3_64(yaml.as_bytes());
format!("xxh3:{:016x}", hash)
}
pub fn prune_traces(max_traces: u32, retention_days: u32) {
prune_traces_in_dir(Path::new(TRACE_DIR), max_traces, retention_days);
}
fn prune_traces_in_dir(trace_dir: &Path, max_traces: u32, retention_days: u32) {
if !trace_dir.exists() {
return;
}
let dir_iter = match fs::read_dir(trace_dir) {
Ok(iter) => iter,
Err(e) => {
tracing::warn!(error = %e, "Failed to read trace directory for pruning");
return;
}
};
let mut entries: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
for entry in dir_iter {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
let created = entry.metadata().ok().and_then(|m| m.created().ok());
entries.push((path, created));
}
}
entries.sort_by(|a, b| b.1.cmp(&a.1));
let cutoff = if retention_days > 0 {
SystemTime::now().checked_sub(Duration::from_secs(u64::from(retention_days) * 86400))
} else {
None
};
let mut to_delete: Vec<PathBuf> = Vec::new();
let mut kept: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
for (path, created) in entries {
let expired = match (&cutoff, &created) {
(Some(cutoff_time), Some(create_time)) => create_time < cutoff_time,
_ => false,
};
if expired {
to_delete.push(path);
} else {
kept.push((path, created));
}
}
if kept.len() > max_traces as usize {
let excess = kept.split_off(max_traces as usize);
to_delete.extend(excess.into_iter().map(|(path, _)| path));
}
let mut pruned_count: u32 = 0;
for path in &to_delete {
if let Err(e) = fs::remove_file(path) {
tracing::debug!(
path = %path.display(),
error = %e,
"Failed to prune trace file"
);
} else {
pruned_count += 1;
}
}
if pruned_count > 0 {
tracing::debug!(
pruned = pruned_count,
max_traces = max_traces,
retention_days = retention_days,
remaining = kept.len(),
"Pruned old trace files"
);
}
}
pub fn list_traces() -> Result<Vec<TraceInfo>> {
let trace_dir = Path::new(TRACE_DIR);
if !trace_dir.exists() {
return Ok(vec![]);
}
let mut traces = Vec::new();
for entry in fs::read_dir(trace_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
let metadata = entry.metadata()?;
let generation_id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
traces.push(TraceInfo {
generation_id,
path,
size_bytes: metadata.len(),
created: metadata.created().ok(),
});
}
}
traces.sort_by(|a, b| b.created.cmp(&a.created));
Ok(traces)
}
#[derive(Debug)]
pub struct TraceInfo {
pub generation_id: String,
pub path: PathBuf,
pub size_bytes: u64,
pub created: Option<std::time::SystemTime>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generation_id_format() {
let id = generate_generation_id();
assert!(id.len() > 20);
assert!(id.contains('T'));
}
#[test]
fn test_workflow_hash() {
let yaml = "schema: test\ntasks: []";
let hash = calculate_workflow_hash(yaml);
assert!(hash.starts_with("xxh3:"));
assert_eq!(hash.len(), 21); }
#[test]
fn test_workflow_hash_deterministic() {
let yaml = "schema: test";
let hash1 = calculate_workflow_hash(yaml);
let hash2 = calculate_workflow_hash(yaml);
assert_eq!(hash1, hash2);
}
#[test]
fn test_workflow_hash_different_inputs() {
let hash1 = calculate_workflow_hash("a");
let hash2 = calculate_workflow_hash("b");
assert_ne!(hash1, hash2);
}
#[test]
fn test_trace_writer_creates_file() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let trace_dir = temp_dir.path().join(".nika/traces");
fs::create_dir_all(&trace_dir).unwrap();
let gen_id = "test-gen-123";
let path = trace_dir.join(format!("{}.ndjson", gen_id));
let file = File::create(&path).unwrap();
let writer = BufWriter::new(file);
let trace_writer = TraceWriter {
writer: Arc::new(Mutex::new(writer)),
path: path.clone(),
};
assert_eq!(trace_writer.path(), path);
}
#[test]
fn test_trace_writer_writes_event() {
use crate::event::EventKind;
use serde_json::json;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let trace_dir = temp_dir.path().join(".nika/traces");
fs::create_dir_all(&trace_dir).unwrap();
let gen_id = "test-write-event";
let path = trace_dir.join(format!("{}.ndjson", gen_id));
let file = File::create(&path).unwrap();
let writer = BufWriter::new(file);
let trace_writer = TraceWriter {
writer: Arc::new(Mutex::new(writer)),
path: path.clone(),
};
let event = Event {
id: 0,
timestamp_ms: 100,
kind: EventKind::TaskStarted {
verb: "infer".into(),
task_id: "test_task".into(),
inputs: json!({}),
},
};
trace_writer.write_event(&event).unwrap();
trace_writer.close().unwrap();
let content = fs::read_to_string(&path).unwrap();
assert!(content.contains("test_task"));
assert!(content.contains("task_started"));
}
#[test]
fn test_list_traces_empty_dir() {
let result = list_traces();
assert!(result.is_ok());
}
#[test]
fn test_trace_writer_rejects_path_traversal() {
let result = TraceWriter::new("../evil");
assert!(result.is_err());
let result = TraceWriter::new("foo/../bar");
assert!(result.is_err());
let result = TraceWriter::new("foo/bar");
assert!(result.is_err());
let result = TraceWriter::new("foo\\bar");
assert!(result.is_err());
}
#[test]
fn test_trace_writer_rejects_empty_id() {
let result = TraceWriter::new("");
assert!(result.is_err());
}
#[test]
fn test_trace_writer_accepts_valid_ids() {
assert!("2024-01-01T12-00-00-abc0"
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T'));
}
fn make_trace_dir(count: usize) -> tempfile::TempDir {
let tmp = tempfile::TempDir::new().unwrap();
for i in 0..count {
let name = format!("trace-{:04}.ndjson", i);
fs::write(tmp.path().join(&name), "").unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
tmp
}
fn count_ndjson(dir: &Path) -> usize {
fs::read_dir(dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == "ndjson")
.unwrap_or(false)
})
.count()
}
#[test]
fn test_prune_noop_when_under_limit() {
let tmp = make_trace_dir(5);
prune_traces_in_dir(tmp.path(), 100, 0);
assert_eq!(count_ndjson(tmp.path()), 5);
}
#[test]
fn test_prune_enforces_max_traces() {
let tmp = make_trace_dir(10);
assert_eq!(count_ndjson(tmp.path()), 10);
prune_traces_in_dir(tmp.path(), 3, 0);
assert_eq!(count_ndjson(tmp.path()), 3);
}
#[test]
fn test_prune_keeps_newest_files() {
let tmp = make_trace_dir(5);
prune_traces_in_dir(tmp.path(), 2, 0);
let remaining: Vec<String> = fs::read_dir(tmp.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == "ndjson")
.unwrap_or(false)
})
.map(|e| e.file_name().to_string_lossy().to_string())
.collect();
assert_eq!(remaining.len(), 2);
assert!(remaining.iter().any(|f| f.contains("0004")));
assert!(remaining.iter().any(|f| f.contains("0003")));
}
#[test]
fn test_prune_nonexistent_dir_is_noop() {
let dir = Path::new("/tmp/nika-test-nonexistent-dir-12345");
prune_traces_in_dir(dir, 10, 7);
}
#[test]
fn test_prune_empty_dir_is_noop() {
let tmp = tempfile::TempDir::new().unwrap();
prune_traces_in_dir(tmp.path(), 5, 7);
assert_eq!(count_ndjson(tmp.path()), 0);
}
#[test]
fn test_prune_ignores_non_ndjson_files() {
let tmp = tempfile::TempDir::new().unwrap();
for i in 0..5 {
fs::write(tmp.path().join(format!("trace-{}.ndjson", i)), "").unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
fs::write(tmp.path().join("notes.txt"), "keep me").unwrap();
fs::write(tmp.path().join("data.json"), "keep me too").unwrap();
prune_traces_in_dir(tmp.path(), 2, 0);
assert_eq!(count_ndjson(tmp.path()), 2);
assert!(tmp.path().join("notes.txt").exists());
assert!(tmp.path().join("data.json").exists());
}
#[test]
fn test_prune_max_traces_zero_deletes_all() {
let tmp = make_trace_dir(5);
prune_traces_in_dir(tmp.path(), 0, 0);
assert_eq!(count_ndjson(tmp.path()), 0);
}
}