Skip to main content

nika_event/
trace.rs

1//! NDJSON Trace Writer
2//!
3//! Writes events to newline-delimited JSON files for debugging and replay.
4
5use crate::error::Result;
6use crate::log::{Event, EventLog};
7use std::fs::{self, File};
8use std::io::{self, BufWriter, Write};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12
13use parking_lot::Mutex;
14
15/// Directory for trace files
16const TRACE_DIR: &str = ".nika/traces";
17
18/// NDJSON trace writer
19pub struct TraceWriter {
20    writer: Arc<Mutex<BufWriter<File>>>,
21    path: PathBuf,
22}
23
24impl TraceWriter {
25    /// Create a new trace writer for a generation
26    ///
27    /// # Security
28    ///
29    /// The generation_id is validated to prevent path traversal attacks.
30    /// Only alphanumeric characters, hyphens, and underscores are allowed.
31    pub fn new(generation_id: &str) -> Result<Self> {
32        // Validate generation_id to prevent path traversal
33        if generation_id.is_empty()
34            || generation_id.contains("..")
35            || generation_id.contains('/')
36            || generation_id.contains('\\')
37            || !generation_id
38                .chars()
39                .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T')
40        {
41            return Err(crate::error::EventError::TraceWrite(
42                std::io::Error::new(
43                    std::io::ErrorKind::InvalidInput,
44                    format!(
45                        "Invalid generation_id: must be alphanumeric with hyphens/underscores only, got: {}",
46                        generation_id
47                    ),
48                ),
49            ));
50        }
51
52        // Ensure trace directory exists
53        let trace_dir = Path::new(TRACE_DIR);
54        fs::create_dir_all(trace_dir)?;
55
56        // Create trace file
57        let filename = format!("{}.ndjson", generation_id);
58        let path = trace_dir.join(&filename);
59        let file = File::create(&path)?;
60        let writer = BufWriter::new(file);
61
62        tracing::info!(path = %path.display(), "Created trace file");
63
64        Ok(Self {
65            writer: Arc::new(Mutex::new(writer)),
66            path,
67        })
68    }
69
70    /// Write a single event to the trace file
71    pub fn write_event(&self, event: &Event) -> Result<()> {
72        let json = serde_json::to_string(event)?;
73
74        let mut writer = self.writer.lock();
75        writeln!(writer, "{}", json)?;
76        writer.flush()?;
77
78        Ok(())
79    }
80
81    /// Append a single event line and flush immediately.
82    ///
83    /// Designed for incremental writing during execution so that partial
84    /// trace data survives a crash. Returns `io::Result` for ergonomic
85    /// use with `let _ =` in hot paths.
86    pub fn append_event(&self, event: &Event) -> io::Result<()> {
87        let json = serde_json::to_string(event)
88            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
89        let mut writer = self.writer.lock();
90        writeln!(writer, "{}", json)?;
91        writer.flush()?;
92        Ok(())
93    }
94
95    /// Write all events from an EventLog
96    pub fn write_all(&self, event_log: &EventLog) -> Result<()> {
97        let events = event_log.events();
98        for event in events {
99            self.write_event(&event)?;
100        }
101        Ok(())
102    }
103
104    /// Get the trace file path
105    pub fn path(&self) -> &Path {
106        &self.path
107    }
108
109    /// Close the trace writer (flushes buffer)
110    pub fn close(&self) -> Result<()> {
111        let mut writer = self.writer.lock();
112        writer.flush()?;
113        Ok(())
114    }
115}
116
117/// Generate a unique generation ID
118///
119/// Format: `YYYY-MM-DDTHH-MM-SS-XXXX` where XXXX is random hex
120pub fn generate_generation_id() -> String {
121    use chrono::Utc;
122
123    let now = Utc::now();
124    let timestamp = now.format("%Y-%m-%dT%H-%M-%S");
125    let random: u32 = rand::random::<u32>() % 0x10000; // 0-65535 for 4 hex digits
126
127    format!("{}-{:04x}", timestamp, random)
128}
129
130/// Calculate workflow hash (for cache invalidation)
131///
132/// Uses xxh3 (fast, non-cryptographic) hash.
133/// Format: `xxh3:XXXXXXXXXXXXXXXX` (16 hex chars)
134pub fn calculate_workflow_hash(yaml: &str) -> String {
135    use xxhash_rust::xxh3::xxh3_64;
136
137    let hash = xxh3_64(yaml.as_bytes());
138    format!("xxh3:{:016x}", hash)
139}
140
141/// Prune old trace files, enforcing both `max_traces` and `retention_days`.
142///
143/// Deletes the oldest traces beyond `max_traces`, and any trace older than
144/// `retention_days`. Logs a warning when files are pruned.
145///
146/// Safe to call on every write -- performs a single directory listing and
147/// at most N unlink calls.
148pub fn prune_traces(max_traces: u32, retention_days: u32) {
149    prune_traces_in_dir(Path::new(TRACE_DIR), max_traces, retention_days);
150}
151
152/// Core pruning logic, parameterised by directory for testability.
153fn prune_traces_in_dir(trace_dir: &Path, max_traces: u32, retention_days: u32) {
154    if !trace_dir.exists() {
155        return;
156    }
157
158    let dir_iter = match fs::read_dir(trace_dir) {
159        Ok(iter) => iter,
160        Err(e) => {
161            tracing::warn!(error = %e, "Failed to read trace directory for pruning");
162            return;
163        }
164    };
165
166    // Collect .ndjson entries with their creation times
167    let mut entries: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
168
169    for entry in dir_iter {
170        let entry = match entry {
171            Ok(e) => e,
172            Err(_) => continue,
173        };
174        let path = entry.path();
175
176        if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
177            let created = entry.metadata().ok().and_then(|m| m.created().ok());
178            entries.push((path, created));
179        }
180    }
181
182    // Sort newest first (None-timestamps sort last so unknown files get pruned first)
183    entries.sort_by(|a, b| b.1.cmp(&a.1));
184
185    // Pass 1: retention_days -- mark entries older than cutoff for deletion
186    let cutoff = if retention_days > 0 {
187        SystemTime::now().checked_sub(Duration::from_secs(u64::from(retention_days) * 86400))
188    } else {
189        None
190    };
191
192    let mut to_delete: Vec<PathBuf> = Vec::new();
193    let mut kept: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
194
195    for (path, created) in entries {
196        let expired = match (&cutoff, &created) {
197            (Some(cutoff_time), Some(create_time)) => create_time < cutoff_time,
198            _ => false,
199        };
200
201        if expired {
202            to_delete.push(path);
203        } else {
204            kept.push((path, created));
205        }
206    }
207
208    // Pass 2: max_traces -- from the kept entries, remove oldest beyond the limit
209    if kept.len() > max_traces as usize {
210        let excess = kept.split_off(max_traces as usize);
211        to_delete.extend(excess.into_iter().map(|(path, _)| path));
212    }
213
214    // Delete files
215    let mut pruned_count: u32 = 0;
216    for path in &to_delete {
217        if let Err(e) = fs::remove_file(path) {
218            tracing::debug!(
219                path = %path.display(),
220                error = %e,
221                "Failed to prune trace file"
222            );
223        } else {
224            pruned_count += 1;
225        }
226    }
227
228    if pruned_count > 0 {
229        tracing::debug!(
230            pruned = pruned_count,
231            max_traces = max_traces,
232            retention_days = retention_days,
233            remaining = kept.len(),
234            "Pruned old trace files"
235        );
236    }
237}
238
239/// List all trace files
240pub fn list_traces() -> Result<Vec<TraceInfo>> {
241    let trace_dir = Path::new(TRACE_DIR);
242
243    if !trace_dir.exists() {
244        return Ok(vec![]);
245    }
246
247    let mut traces = Vec::new();
248
249    for entry in fs::read_dir(trace_dir)? {
250        let entry = entry?;
251        let path = entry.path();
252
253        if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
254            let metadata = entry.metadata()?;
255            let generation_id = path
256                .file_stem()
257                .and_then(|s| s.to_str())
258                .unwrap_or("unknown")
259                .to_string();
260
261            traces.push(TraceInfo {
262                generation_id,
263                path,
264                size_bytes: metadata.len(),
265                created: metadata.created().ok(),
266            });
267        }
268    }
269
270    // Sort by creation time (newest first)
271    traces.sort_by(|a, b| b.created.cmp(&a.created));
272
273    Ok(traces)
274}
275
276/// Information about a trace file
277#[derive(Debug)]
278pub struct TraceInfo {
279    pub generation_id: String,
280    pub path: PathBuf,
281    pub size_bytes: u64,
282    pub created: Option<std::time::SystemTime>,
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_generation_id_format() {
291        let id = generate_generation_id();
292        // Format: YYYY-MM-DDTHH-MM-SS-XXXX
293        assert!(id.len() > 20);
294        assert!(id.contains('T'));
295    }
296
297    #[test]
298    fn test_workflow_hash() {
299        let yaml = "schema: test\ntasks: []";
300        let hash = calculate_workflow_hash(yaml);
301        assert!(hash.starts_with("xxh3:"));
302        assert_eq!(hash.len(), 21); // "xxh3:" + 16 hex chars
303    }
304
305    #[test]
306    fn test_workflow_hash_deterministic() {
307        let yaml = "schema: test";
308        let hash1 = calculate_workflow_hash(yaml);
309        let hash2 = calculate_workflow_hash(yaml);
310        assert_eq!(hash1, hash2);
311    }
312
313    #[test]
314    fn test_workflow_hash_different_inputs() {
315        let hash1 = calculate_workflow_hash("a");
316        let hash2 = calculate_workflow_hash("b");
317        assert_ne!(hash1, hash2);
318    }
319
320    #[test]
321    fn test_trace_writer_creates_file() {
322        use tempfile::TempDir;
323
324        // Create temp directory and override TRACE_DIR behavior
325        let temp_dir = TempDir::new().unwrap();
326        let trace_dir = temp_dir.path().join(".nika/traces");
327        fs::create_dir_all(&trace_dir).unwrap();
328
329        let gen_id = "test-gen-123";
330        let path = trace_dir.join(format!("{}.ndjson", gen_id));
331        let file = File::create(&path).unwrap();
332        let writer = BufWriter::new(file);
333
334        let trace_writer = TraceWriter {
335            writer: Arc::new(Mutex::new(writer)),
336            path: path.clone(),
337        };
338
339        assert_eq!(trace_writer.path(), path);
340    }
341
342    #[test]
343    fn test_trace_writer_writes_event() {
344        use crate::log::EventKind;
345        use serde_json::json;
346        use tempfile::TempDir;
347
348        let temp_dir = TempDir::new().unwrap();
349        let trace_dir = temp_dir.path().join(".nika/traces");
350        fs::create_dir_all(&trace_dir).unwrap();
351
352        let gen_id = "test-write-event";
353        let path = trace_dir.join(format!("{}.ndjson", gen_id));
354        let file = File::create(&path).unwrap();
355        let writer = BufWriter::new(file);
356
357        let trace_writer = TraceWriter {
358            writer: Arc::new(Mutex::new(writer)),
359            path: path.clone(),
360        };
361
362        let event = Event {
363            id: 0,
364            timestamp_ms: 100,
365            kind: EventKind::TaskStarted {
366                verb: "infer".into(),
367                task_id: "test_task".into(),
368                inputs: json!({}),
369            },
370        };
371
372        trace_writer.write_event(&event).unwrap();
373        trace_writer.close().unwrap();
374
375        // Read back and verify
376        let content = fs::read_to_string(&path).unwrap();
377        assert!(content.contains("test_task"));
378        assert!(content.contains("task_started"));
379    }
380
381    #[test]
382    fn test_list_traces_empty_dir() {
383        // When trace dir doesn't exist, should return empty vec
384        let result = list_traces();
385        // This may or may not return empty depending on filesystem state
386        assert!(result.is_ok());
387    }
388
389    #[test]
390    fn test_trace_writer_rejects_path_traversal() {
391        // Path traversal attempts should be rejected
392        let result = TraceWriter::new("../evil");
393        assert!(result.is_err());
394
395        let result = TraceWriter::new("foo/../bar");
396        assert!(result.is_err());
397
398        let result = TraceWriter::new("foo/bar");
399        assert!(result.is_err());
400
401        let result = TraceWriter::new("foo\\bar");
402        assert!(result.is_err());
403    }
404
405    #[test]
406    fn test_trace_writer_rejects_empty_id() {
407        let result = TraceWriter::new("");
408        assert!(result.is_err());
409    }
410
411    #[test]
412    fn test_trace_writer_accepts_valid_ids() {
413        // These should be valid format (even if file creation fails)
414        assert!("2024-01-01T12-00-00-abc0"
415            .chars()
416            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T'));
417    }
418
419    // ───────────────────────────────────────────────────────────────
420    // prune_traces_in_dir tests
421    // ───────────────────────────────────────────────────────────────
422
423    /// Helper: create N empty .ndjson files in a temp dir, return the dir path.
424    fn make_trace_dir(count: usize) -> tempfile::TempDir {
425        let tmp = tempfile::TempDir::new().unwrap();
426        for i in 0..count {
427            let name = format!("trace-{:04}.ndjson", i);
428            fs::write(tmp.path().join(&name), "").unwrap();
429            // Tiny sleep so creation times differ (needed on macOS HFS+ which
430            // has 1s resolution). 10ms is enough for APFS/ext4 nanosecond
431            // resolution and still keeps tests fast.
432            std::thread::sleep(std::time::Duration::from_millis(10));
433        }
434        tmp
435    }
436
437    fn count_ndjson(dir: &Path) -> usize {
438        fs::read_dir(dir)
439            .unwrap()
440            .filter_map(|e| e.ok())
441            .filter(|e| {
442                e.path()
443                    .extension()
444                    .map(|ext| ext == "ndjson")
445                    .unwrap_or(false)
446            })
447            .count()
448    }
449
450    #[test]
451    fn test_prune_noop_when_under_limit() {
452        let tmp = make_trace_dir(5);
453        prune_traces_in_dir(tmp.path(), 100, 0);
454        assert_eq!(count_ndjson(tmp.path()), 5);
455    }
456
457    #[test]
458    fn test_prune_enforces_max_traces() {
459        let tmp = make_trace_dir(10);
460        assert_eq!(count_ndjson(tmp.path()), 10);
461
462        prune_traces_in_dir(tmp.path(), 3, 0);
463        assert_eq!(count_ndjson(tmp.path()), 3);
464    }
465
466    #[test]
467    fn test_prune_keeps_newest_files() {
468        let tmp = make_trace_dir(5);
469
470        // The files are created in order 0000..0004, with 0004 being newest.
471        prune_traces_in_dir(tmp.path(), 2, 0);
472
473        let remaining: Vec<String> = fs::read_dir(tmp.path())
474            .unwrap()
475            .filter_map(|e| e.ok())
476            .filter(|e| {
477                e.path()
478                    .extension()
479                    .map(|ext| ext == "ndjson")
480                    .unwrap_or(false)
481            })
482            .map(|e| e.file_name().to_string_lossy().to_string())
483            .collect();
484
485        assert_eq!(remaining.len(), 2);
486        // The two newest should survive (0003, 0004)
487        assert!(remaining.iter().any(|f| f.contains("0004")));
488        assert!(remaining.iter().any(|f| f.contains("0003")));
489    }
490
491    #[test]
492    fn test_prune_nonexistent_dir_is_noop() {
493        let dir = Path::new("/tmp/nika-test-nonexistent-dir-12345");
494        // Should not panic
495        prune_traces_in_dir(dir, 10, 7);
496    }
497
498    #[test]
499    fn test_prune_empty_dir_is_noop() {
500        let tmp = tempfile::TempDir::new().unwrap();
501        prune_traces_in_dir(tmp.path(), 5, 7);
502        assert_eq!(count_ndjson(tmp.path()), 0);
503    }
504
505    #[test]
506    fn test_prune_ignores_non_ndjson_files() {
507        let tmp = tempfile::TempDir::new().unwrap();
508        // Create some .ndjson and some .txt files
509        for i in 0..5 {
510            fs::write(tmp.path().join(format!("trace-{}.ndjson", i)), "").unwrap();
511            std::thread::sleep(std::time::Duration::from_millis(10));
512        }
513        fs::write(tmp.path().join("notes.txt"), "keep me").unwrap();
514        fs::write(tmp.path().join("data.json"), "keep me too").unwrap();
515
516        prune_traces_in_dir(tmp.path(), 2, 0);
517
518        // Only 2 ndjson should remain, and the non-ndjson files should be untouched
519        assert_eq!(count_ndjson(tmp.path()), 2);
520        assert!(tmp.path().join("notes.txt").exists());
521        assert!(tmp.path().join("data.json").exists());
522    }
523
524    #[test]
525    fn test_prune_max_traces_zero_deletes_all() {
526        let tmp = make_trace_dir(5);
527        prune_traces_in_dir(tmp.path(), 0, 0);
528        assert_eq!(count_ndjson(tmp.path()), 0);
529    }
530}