1use crate::error::Result;
6use crate::log::{Event, EventLog};
7use std::fs::{self, File};
8use std::io::{self, BufWriter, Write};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12
13use parking_lot::Mutex;
14
15const TRACE_DIR: &str = ".nika/traces";
17
18pub struct TraceWriter {
20 writer: Arc<Mutex<BufWriter<File>>>,
21 path: PathBuf,
22}
23
24impl TraceWriter {
25 pub fn new(generation_id: &str) -> Result<Self> {
32 if generation_id.is_empty()
34 || generation_id.contains("..")
35 || generation_id.contains('/')
36 || generation_id.contains('\\')
37 || !generation_id
38 .chars()
39 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T')
40 {
41 return Err(crate::error::EventError::TraceWrite(
42 std::io::Error::new(
43 std::io::ErrorKind::InvalidInput,
44 format!(
45 "Invalid generation_id: must be alphanumeric with hyphens/underscores only, got: {}",
46 generation_id
47 ),
48 ),
49 ));
50 }
51
52 let trace_dir = Path::new(TRACE_DIR);
54 fs::create_dir_all(trace_dir)?;
55
56 let filename = format!("{}.ndjson", generation_id);
58 let path = trace_dir.join(&filename);
59 let file = File::create(&path)?;
60 let writer = BufWriter::new(file);
61
62 tracing::info!(path = %path.display(), "Created trace file");
63
64 Ok(Self {
65 writer: Arc::new(Mutex::new(writer)),
66 path,
67 })
68 }
69
70 pub fn write_event(&self, event: &Event) -> Result<()> {
72 let json = serde_json::to_string(event)?;
73
74 let mut writer = self.writer.lock();
75 writeln!(writer, "{}", json)?;
76 writer.flush()?;
77
78 Ok(())
79 }
80
81 pub fn append_event(&self, event: &Event) -> io::Result<()> {
87 let json = serde_json::to_string(event)
88 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
89 let mut writer = self.writer.lock();
90 writeln!(writer, "{}", json)?;
91 writer.flush()?;
92 Ok(())
93 }
94
95 pub fn write_all(&self, event_log: &EventLog) -> Result<()> {
97 let events = event_log.events();
98 for event in events {
99 self.write_event(&event)?;
100 }
101 Ok(())
102 }
103
104 pub fn path(&self) -> &Path {
106 &self.path
107 }
108
109 pub fn close(&self) -> Result<()> {
111 let mut writer = self.writer.lock();
112 writer.flush()?;
113 Ok(())
114 }
115}
116
117pub fn generate_generation_id() -> String {
121 use chrono::Utc;
122
123 let now = Utc::now();
124 let timestamp = now.format("%Y-%m-%dT%H-%M-%S");
125 let random: u32 = rand::random::<u32>() % 0x10000; format!("{}-{:04x}", timestamp, random)
128}
129
130pub fn calculate_workflow_hash(yaml: &str) -> String {
135 use xxhash_rust::xxh3::xxh3_64;
136
137 let hash = xxh3_64(yaml.as_bytes());
138 format!("xxh3:{:016x}", hash)
139}
140
141pub fn prune_traces(max_traces: u32, retention_days: u32) {
149 prune_traces_in_dir(Path::new(TRACE_DIR), max_traces, retention_days);
150}
151
152fn prune_traces_in_dir(trace_dir: &Path, max_traces: u32, retention_days: u32) {
154 if !trace_dir.exists() {
155 return;
156 }
157
158 let dir_iter = match fs::read_dir(trace_dir) {
159 Ok(iter) => iter,
160 Err(e) => {
161 tracing::warn!(error = %e, "Failed to read trace directory for pruning");
162 return;
163 }
164 };
165
166 let mut entries: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
168
169 for entry in dir_iter {
170 let entry = match entry {
171 Ok(e) => e,
172 Err(_) => continue,
173 };
174 let path = entry.path();
175
176 if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
177 let created = entry.metadata().ok().and_then(|m| m.created().ok());
178 entries.push((path, created));
179 }
180 }
181
182 entries.sort_by(|a, b| b.1.cmp(&a.1));
184
185 let cutoff = if retention_days > 0 {
187 SystemTime::now().checked_sub(Duration::from_secs(u64::from(retention_days) * 86400))
188 } else {
189 None
190 };
191
192 let mut to_delete: Vec<PathBuf> = Vec::new();
193 let mut kept: Vec<(PathBuf, Option<SystemTime>)> = Vec::new();
194
195 for (path, created) in entries {
196 let expired = match (&cutoff, &created) {
197 (Some(cutoff_time), Some(create_time)) => create_time < cutoff_time,
198 _ => false,
199 };
200
201 if expired {
202 to_delete.push(path);
203 } else {
204 kept.push((path, created));
205 }
206 }
207
208 if kept.len() > max_traces as usize {
210 let excess = kept.split_off(max_traces as usize);
211 to_delete.extend(excess.into_iter().map(|(path, _)| path));
212 }
213
214 let mut pruned_count: u32 = 0;
216 for path in &to_delete {
217 if let Err(e) = fs::remove_file(path) {
218 tracing::debug!(
219 path = %path.display(),
220 error = %e,
221 "Failed to prune trace file"
222 );
223 } else {
224 pruned_count += 1;
225 }
226 }
227
228 if pruned_count > 0 {
229 tracing::debug!(
230 pruned = pruned_count,
231 max_traces = max_traces,
232 retention_days = retention_days,
233 remaining = kept.len(),
234 "Pruned old trace files"
235 );
236 }
237}
238
239pub fn list_traces() -> Result<Vec<TraceInfo>> {
241 let trace_dir = Path::new(TRACE_DIR);
242
243 if !trace_dir.exists() {
244 return Ok(vec![]);
245 }
246
247 let mut traces = Vec::new();
248
249 for entry in fs::read_dir(trace_dir)? {
250 let entry = entry?;
251 let path = entry.path();
252
253 if path.extension().map(|e| e == "ndjson").unwrap_or(false) {
254 let metadata = entry.metadata()?;
255 let generation_id = path
256 .file_stem()
257 .and_then(|s| s.to_str())
258 .unwrap_or("unknown")
259 .to_string();
260
261 traces.push(TraceInfo {
262 generation_id,
263 path,
264 size_bytes: metadata.len(),
265 created: metadata.created().ok(),
266 });
267 }
268 }
269
270 traces.sort_by(|a, b| b.created.cmp(&a.created));
272
273 Ok(traces)
274}
275
276#[derive(Debug)]
278pub struct TraceInfo {
279 pub generation_id: String,
280 pub path: PathBuf,
281 pub size_bytes: u64,
282 pub created: Option<std::time::SystemTime>,
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn test_generation_id_format() {
291 let id = generate_generation_id();
292 assert!(id.len() > 20);
294 assert!(id.contains('T'));
295 }
296
297 #[test]
298 fn test_workflow_hash() {
299 let yaml = "schema: test\ntasks: []";
300 let hash = calculate_workflow_hash(yaml);
301 assert!(hash.starts_with("xxh3:"));
302 assert_eq!(hash.len(), 21); }
304
305 #[test]
306 fn test_workflow_hash_deterministic() {
307 let yaml = "schema: test";
308 let hash1 = calculate_workflow_hash(yaml);
309 let hash2 = calculate_workflow_hash(yaml);
310 assert_eq!(hash1, hash2);
311 }
312
313 #[test]
314 fn test_workflow_hash_different_inputs() {
315 let hash1 = calculate_workflow_hash("a");
316 let hash2 = calculate_workflow_hash("b");
317 assert_ne!(hash1, hash2);
318 }
319
320 #[test]
321 fn test_trace_writer_creates_file() {
322 use tempfile::TempDir;
323
324 let temp_dir = TempDir::new().unwrap();
326 let trace_dir = temp_dir.path().join(".nika/traces");
327 fs::create_dir_all(&trace_dir).unwrap();
328
329 let gen_id = "test-gen-123";
330 let path = trace_dir.join(format!("{}.ndjson", gen_id));
331 let file = File::create(&path).unwrap();
332 let writer = BufWriter::new(file);
333
334 let trace_writer = TraceWriter {
335 writer: Arc::new(Mutex::new(writer)),
336 path: path.clone(),
337 };
338
339 assert_eq!(trace_writer.path(), path);
340 }
341
342 #[test]
343 fn test_trace_writer_writes_event() {
344 use crate::log::EventKind;
345 use serde_json::json;
346 use tempfile::TempDir;
347
348 let temp_dir = TempDir::new().unwrap();
349 let trace_dir = temp_dir.path().join(".nika/traces");
350 fs::create_dir_all(&trace_dir).unwrap();
351
352 let gen_id = "test-write-event";
353 let path = trace_dir.join(format!("{}.ndjson", gen_id));
354 let file = File::create(&path).unwrap();
355 let writer = BufWriter::new(file);
356
357 let trace_writer = TraceWriter {
358 writer: Arc::new(Mutex::new(writer)),
359 path: path.clone(),
360 };
361
362 let event = Event {
363 id: 0,
364 timestamp_ms: 100,
365 kind: EventKind::TaskStarted {
366 verb: "infer".into(),
367 task_id: "test_task".into(),
368 inputs: json!({}),
369 },
370 };
371
372 trace_writer.write_event(&event).unwrap();
373 trace_writer.close().unwrap();
374
375 let content = fs::read_to_string(&path).unwrap();
377 assert!(content.contains("test_task"));
378 assert!(content.contains("task_started"));
379 }
380
381 #[test]
382 fn test_list_traces_empty_dir() {
383 let result = list_traces();
385 assert!(result.is_ok());
387 }
388
389 #[test]
390 fn test_trace_writer_rejects_path_traversal() {
391 let result = TraceWriter::new("../evil");
393 assert!(result.is_err());
394
395 let result = TraceWriter::new("foo/../bar");
396 assert!(result.is_err());
397
398 let result = TraceWriter::new("foo/bar");
399 assert!(result.is_err());
400
401 let result = TraceWriter::new("foo\\bar");
402 assert!(result.is_err());
403 }
404
405 #[test]
406 fn test_trace_writer_rejects_empty_id() {
407 let result = TraceWriter::new("");
408 assert!(result.is_err());
409 }
410
411 #[test]
412 fn test_trace_writer_accepts_valid_ids() {
413 assert!("2024-01-01T12-00-00-abc0"
415 .chars()
416 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == 'T'));
417 }
418
419 fn make_trace_dir(count: usize) -> tempfile::TempDir {
425 let tmp = tempfile::TempDir::new().unwrap();
426 for i in 0..count {
427 let name = format!("trace-{:04}.ndjson", i);
428 fs::write(tmp.path().join(&name), "").unwrap();
429 std::thread::sleep(std::time::Duration::from_millis(10));
433 }
434 tmp
435 }
436
437 fn count_ndjson(dir: &Path) -> usize {
438 fs::read_dir(dir)
439 .unwrap()
440 .filter_map(|e| e.ok())
441 .filter(|e| {
442 e.path()
443 .extension()
444 .map(|ext| ext == "ndjson")
445 .unwrap_or(false)
446 })
447 .count()
448 }
449
450 #[test]
451 fn test_prune_noop_when_under_limit() {
452 let tmp = make_trace_dir(5);
453 prune_traces_in_dir(tmp.path(), 100, 0);
454 assert_eq!(count_ndjson(tmp.path()), 5);
455 }
456
457 #[test]
458 fn test_prune_enforces_max_traces() {
459 let tmp = make_trace_dir(10);
460 assert_eq!(count_ndjson(tmp.path()), 10);
461
462 prune_traces_in_dir(tmp.path(), 3, 0);
463 assert_eq!(count_ndjson(tmp.path()), 3);
464 }
465
466 #[test]
467 fn test_prune_keeps_newest_files() {
468 let tmp = make_trace_dir(5);
469
470 prune_traces_in_dir(tmp.path(), 2, 0);
472
473 let remaining: Vec<String> = fs::read_dir(tmp.path())
474 .unwrap()
475 .filter_map(|e| e.ok())
476 .filter(|e| {
477 e.path()
478 .extension()
479 .map(|ext| ext == "ndjson")
480 .unwrap_or(false)
481 })
482 .map(|e| e.file_name().to_string_lossy().to_string())
483 .collect();
484
485 assert_eq!(remaining.len(), 2);
486 assert!(remaining.iter().any(|f| f.contains("0004")));
488 assert!(remaining.iter().any(|f| f.contains("0003")));
489 }
490
491 #[test]
492 fn test_prune_nonexistent_dir_is_noop() {
493 let dir = Path::new("/tmp/nika-test-nonexistent-dir-12345");
494 prune_traces_in_dir(dir, 10, 7);
496 }
497
498 #[test]
499 fn test_prune_empty_dir_is_noop() {
500 let tmp = tempfile::TempDir::new().unwrap();
501 prune_traces_in_dir(tmp.path(), 5, 7);
502 assert_eq!(count_ndjson(tmp.path()), 0);
503 }
504
505 #[test]
506 fn test_prune_ignores_non_ndjson_files() {
507 let tmp = tempfile::TempDir::new().unwrap();
508 for i in 0..5 {
510 fs::write(tmp.path().join(format!("trace-{}.ndjson", i)), "").unwrap();
511 std::thread::sleep(std::time::Duration::from_millis(10));
512 }
513 fs::write(tmp.path().join("notes.txt"), "keep me").unwrap();
514 fs::write(tmp.path().join("data.json"), "keep me too").unwrap();
515
516 prune_traces_in_dir(tmp.path(), 2, 0);
517
518 assert_eq!(count_ndjson(tmp.path()), 2);
520 assert!(tmp.path().join("notes.txt").exists());
521 assert!(tmp.path().join("data.json").exists());
522 }
523
524 #[test]
525 fn test_prune_max_traces_zero_deletes_all() {
526 let tmp = make_trace_dir(5);
527 prune_traces_in_dir(tmp.path(), 0, 0);
528 assert_eq!(count_ndjson(tmp.path()), 0);
529 }
530}