Skip to main content

lumen_runtime/trace/
store.rs

1//! JSONL trace file writer with hash-chaining.
2
3use crate::trace::events::{TraceEvent, TraceEventKind};
4use crate::trace::hasher::{canonical_json, sha256_hash};
5use chrono::Utc;
6use serde_json::json;
7use std::fs::{self, File, OpenOptions};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11pub struct TraceStore {
12    trace_dir: PathBuf,
13    current_run_id: String,
14    current_file: Option<File>,
15    seq: u64,
16    prev_hash: String,
17    doc_hash: String,
18}
19
20const TRACE_GENESIS_HASH: &str = "sha256:genesis";
21
22impl TraceStore {
23    pub fn new(base_dir: &Path) -> Self {
24        let trace_dir = base_dir.join("trace");
25        fs::create_dir_all(&trace_dir).ok();
26        Self {
27            trace_dir,
28            current_run_id: String::new(),
29            current_file: None,
30            seq: 0,
31            prev_hash: TRACE_GENESIS_HASH.to_string(),
32            doc_hash: String::new(),
33        }
34    }
35
36    pub fn start_run(&mut self, doc_hash: &str) -> String {
37        let run_id = uuid::Uuid::new_v4().to_string();
38        self.current_run_id = run_id.clone();
39        self.doc_hash = doc_hash.to_string();
40        self.seq = 0;
41        self.prev_hash = TRACE_GENESIS_HASH.to_string();
42
43        let path = self.trace_dir.join(format!("{}.jsonl", &run_id));
44        self.current_file = OpenOptions::new()
45            .create(true)
46            .truncate(true)
47            .write(true)
48            .open(path)
49            .ok();
50
51        self.emit_event(TraceEventKind::RunStart, None, None);
52        run_id
53    }
54
55    pub fn end_run(&mut self) {
56        self.emit_event(TraceEventKind::RunEnd, None, None);
57        self.current_file = None;
58    }
59
60    pub fn cell_start(&mut self, cell_name: &str) {
61        self.emit_event(TraceEventKind::CellStart, Some(cell_name.to_string()), None);
62    }
63
64    pub fn cell_end(&mut self, cell_name: &str) {
65        self.emit_event(TraceEventKind::CellEnd, Some(cell_name.to_string()), None);
66    }
67
68    pub fn call_enter(&mut self, cell_name: &str) {
69        self.emit_event(TraceEventKind::CallEnter, Some(cell_name.to_string()), None);
70    }
71
72    pub fn call_exit(&mut self, cell_name: &str, result_type: &str) {
73        let mut event = self.make_event(TraceEventKind::CallExit);
74        event.cell = Some(cell_name.to_string());
75        event.details = Some(json!({ "result_type": result_type }));
76        self.write_event(&mut event);
77    }
78
79    pub fn vm_step(&mut self, cell: &str, ip: usize, opcode: &str) {
80        let mut event = self.make_event(TraceEventKind::VmStep);
81        event.cell = Some(cell.to_string());
82        event.details = Some(json!({ "ip": ip, "opcode": opcode }));
83        self.write_event(&mut event);
84    }
85
86    #[allow(clippy::too_many_arguments)]
87    pub fn tool_call(
88        &mut self,
89        cell: &str,
90        tool_id: &str,
91        tool_version: &str,
92        latency_ms: u64,
93        cached: bool,
94        success: bool,
95        message: Option<&str>,
96    ) {
97        let mut event = self.make_event(TraceEventKind::ToolCall);
98        event.cell = Some(cell.to_string());
99        event.tool_id = Some(tool_id.to_string());
100        event.tool_version = Some(tool_version.to_string());
101        event.latency_ms = Some(latency_ms);
102        event.cached = Some(cached);
103        event.details = Some(json!({ "success": success }));
104        event.message = message.map(ToString::to_string);
105        self.write_event(&mut event);
106    }
107
108    pub fn schema_validate(&mut self, cell: &str, schema: &str, valid: bool) {
109        let mut event = self.make_event(TraceEventKind::SchemaValidate);
110        event.cell = Some(cell.to_string());
111        event.details = Some(json!({ "schema": schema, "valid": valid }));
112        self.write_event(&mut event);
113    }
114
115    pub fn error(&mut self, cell: Option<&str>, message: &str) {
116        let mut event = self.make_event(TraceEventKind::Error);
117        event.cell = cell.map(|c| c.to_string());
118        event.message = Some(message.to_string());
119        self.write_event(&mut event);
120    }
121
122    pub fn run_id(&self) -> &str {
123        &self.current_run_id
124    }
125
126    fn emit_event(&mut self, kind: TraceEventKind, cell: Option<String>, message: Option<String>) {
127        let mut event = self.make_event(kind);
128        event.cell = cell;
129        event.message = message;
130        self.write_event(&mut event);
131    }
132
133    fn make_event(&mut self, kind: TraceEventKind) -> TraceEvent {
134        self.seq += 1;
135
136        TraceEvent {
137            seq: self.seq,
138            kind,
139            prev_hash: self.prev_hash.clone(),
140            hash: String::new(),
141            timestamp: Utc::now(),
142            doc_hash: self.doc_hash.clone(),
143            cell: None,
144            tool_id: None,
145            tool_version: None,
146            inputs_hash: None,
147            outputs_hash: None,
148            policy_hash: None,
149            latency_ms: None,
150            cached: None,
151            details: None,
152            message: None,
153        }
154    }
155
156    fn write_event(&mut self, event: &mut TraceEvent) {
157        event.hash = compute_event_hash(event);
158        self.prev_hash = event.hash.clone();
159        if let Some(ref mut file) = self.current_file {
160            if let Ok(json) = serde_json::to_string(event) {
161                writeln!(file, "{}", json).ok();
162            }
163        }
164    }
165}
166
167fn kind_str(kind: &TraceEventKind) -> &'static str {
168    match kind {
169        TraceEventKind::RunStart => "run_start",
170        TraceEventKind::CellStart => "cell_start",
171        TraceEventKind::CellEnd => "cell_end",
172        TraceEventKind::CallEnter => "call_enter",
173        TraceEventKind::CallExit => "call_exit",
174        TraceEventKind::VmStep => "vm_step",
175        TraceEventKind::ToolCall => "tool_call",
176        TraceEventKind::SchemaValidate => "schema_validate",
177        TraceEventKind::Error => "error",
178        TraceEventKind::RunEnd => "run_end",
179    }
180}
181
182fn event_payload(event: &TraceEvent) -> serde_json::Value {
183    json!({
184        "seq": event.seq,
185        "kind": kind_str(&event.kind),
186        "prev_hash": &event.prev_hash,
187        "doc_hash": &event.doc_hash,
188        "cell": &event.cell,
189        "tool_id": &event.tool_id,
190        "tool_version": &event.tool_version,
191        "inputs_hash": &event.inputs_hash,
192        "outputs_hash": &event.outputs_hash,
193        "policy_hash": &event.policy_hash,
194        "latency_ms": event.latency_ms,
195        "cached": event.cached,
196        "details": &event.details,
197        "message": &event.message,
198    })
199}
200
201pub fn compute_event_hash(event: &TraceEvent) -> String {
202    let canonical = canonical_json(&event_payload(event));
203    sha256_hash(&canonical)
204}
205
206pub fn verify_event_chain(events: &[TraceEvent]) -> Result<(), String> {
207    let mut expected_seq = 1_u64;
208    let mut expected_prev = TRACE_GENESIS_HASH.to_string();
209
210    for event in events {
211        if event.seq != expected_seq {
212            return Err(format!(
213                "trace sequence mismatch at seq {} (expected {})",
214                event.seq, expected_seq
215            ));
216        }
217        if event.prev_hash != expected_prev {
218            return Err(format!(
219                "trace hash chain mismatch at seq {} (expected prev '{}', got '{}')",
220                event.seq, expected_prev, event.prev_hash
221            ));
222        }
223        let expected_hash = compute_event_hash(event);
224        if event.hash != expected_hash {
225            return Err(format!(
226                "trace event hash mismatch at seq {} (expected '{}', got '{}')",
227                event.seq, expected_hash, event.hash
228            ));
229        }
230        expected_seq += 1;
231        expected_prev = event.hash.clone();
232    }
233
234    Ok(())
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use chrono::{TimeZone, Utc};
241
242    #[test]
243    fn trace_store_emits_structured_vm_events() {
244        let base_dir =
245            std::env::temp_dir().join(format!("lumen-trace-store-test-{}", uuid::Uuid::new_v4()));
246        fs::create_dir_all(&base_dir).expect("test temp dir should be created");
247
248        let mut store = TraceStore::new(&base_dir);
249        let run_id = store.start_run("doc-123");
250        store.cell_start("main");
251        store.call_enter("main");
252        store.vm_step("main", 7, "ToolCall");
253        store.tool_call("main", "http.get", "1.0.0", 12, false, true, None);
254        store.schema_validate("main", "String", true);
255        store.call_exit("main", "String");
256        store.cell_end("main");
257        store.end_run();
258
259        let path = base_dir.join("trace").join(format!("{}.jsonl", run_id));
260        let content = fs::read_to_string(&path).expect("trace file should be readable");
261        let events: Vec<TraceEvent> = content
262            .lines()
263            .map(|line| serde_json::from_str(line).expect("trace event should deserialize"))
264            .collect();
265
266        let kinds: Vec<TraceEventKind> = events.iter().map(|event| event.kind.clone()).collect();
267        assert_eq!(
268            kinds,
269            vec![
270                TraceEventKind::RunStart,
271                TraceEventKind::CellStart,
272                TraceEventKind::CallEnter,
273                TraceEventKind::VmStep,
274                TraceEventKind::ToolCall,
275                TraceEventKind::SchemaValidate,
276                TraceEventKind::CallExit,
277                TraceEventKind::CellEnd,
278                TraceEventKind::RunEnd,
279            ]
280        );
281
282        let step = events
283            .iter()
284            .find(|event| event.kind == TraceEventKind::VmStep)
285            .expect("vm_step event should exist");
286        assert_eq!(step.cell.as_deref(), Some("main"));
287        assert_eq!(
288            step.details
289                .as_ref()
290                .and_then(|d| d.get("ip"))
291                .and_then(|v| v.as_u64()),
292            Some(7)
293        );
294        assert_eq!(
295            step.details
296                .as_ref()
297                .and_then(|d| d.get("opcode"))
298                .and_then(|v| v.as_str()),
299            Some("ToolCall")
300        );
301
302        let tool = events
303            .iter()
304            .find(|event| event.kind == TraceEventKind::ToolCall)
305            .expect("tool_call event should exist");
306        assert_eq!(tool.tool_id.as_deref(), Some("http.get"));
307        assert_eq!(tool.tool_version.as_deref(), Some("1.0.0"));
308        assert_eq!(tool.latency_ms, Some(12));
309        assert_eq!(tool.cached, Some(false));
310        assert_eq!(
311            tool.details
312                .as_ref()
313                .and_then(|d| d.get("success"))
314                .and_then(|v| v.as_bool()),
315            Some(true)
316        );
317
318        let schema = events
319            .iter()
320            .find(|event| event.kind == TraceEventKind::SchemaValidate)
321            .expect("schema_validate event should exist");
322        assert_eq!(
323            schema
324                .details
325                .as_ref()
326                .and_then(|d| d.get("schema"))
327                .and_then(|v| v.as_str()),
328            Some("String")
329        );
330        assert_eq!(
331            schema
332                .details
333                .as_ref()
334                .and_then(|d| d.get("valid"))
335                .and_then(|v| v.as_bool()),
336            Some(true)
337        );
338        verify_event_chain(&events).expect("generated trace should pass chain verification");
339
340        fs::remove_dir_all(&base_dir).expect("test temp dir should be removed");
341    }
342
343    #[test]
344    fn compute_event_hash_is_stable_across_timestamp_but_sensitive_to_payload() {
345        let mut event = TraceEvent {
346            seq: 2,
347            kind: TraceEventKind::VmStep,
348            prev_hash: "sha256:prev".to_string(),
349            hash: String::new(),
350            timestamp: Utc
351                .timestamp_opt(1_700_000_000, 0)
352                .single()
353                .expect("timestamp should be valid"),
354            doc_hash: "doc-123".to_string(),
355            cell: Some("main".to_string()),
356            tool_id: None,
357            tool_version: None,
358            inputs_hash: None,
359            outputs_hash: None,
360            policy_hash: None,
361            latency_ms: None,
362            cached: None,
363            details: Some(json!({"ip": 1, "opcode": "LoadK"})),
364            message: None,
365        };
366
367        let hash_a = compute_event_hash(&event);
368        event.timestamp = Utc
369            .timestamp_opt(1_700_000_001, 0)
370            .single()
371            .expect("timestamp should be valid");
372        let hash_b = compute_event_hash(&event);
373        assert_eq!(hash_a, hash_b, "hash should ignore wall-clock timestamp");
374
375        event.message = Some("tampered".to_string());
376        let hash_c = compute_event_hash(&event);
377        assert_ne!(hash_a, hash_c, "hash should change when payload changes");
378    }
379
380    #[test]
381    fn verify_event_chain_rejects_tampered_event_payload() {
382        let base_dir = std::env::temp_dir().join(format!(
383            "lumen-trace-store-verify-test-{}",
384            uuid::Uuid::new_v4()
385        ));
386        fs::create_dir_all(&base_dir).expect("test temp dir should be created");
387
388        let mut store = TraceStore::new(&base_dir);
389        let run_id = store.start_run("doc-123");
390        store.cell_start("main");
391        store.vm_step("main", 1, "LoadK");
392        store.end_run();
393
394        let path = base_dir.join("trace").join(format!("{}.jsonl", run_id));
395        let content = fs::read_to_string(&path).expect("trace file should be readable");
396        let mut events: Vec<TraceEvent> = content
397            .lines()
398            .map(|line| serde_json::from_str(line).expect("trace event should deserialize"))
399            .collect();
400
401        verify_event_chain(&events).expect("fresh events should pass verification");
402
403        let step = events
404            .iter_mut()
405            .find(|event| event.kind == TraceEventKind::VmStep)
406            .expect("vm step event should exist");
407        step.details = Some(json!({"ip": 999, "opcode": "LoadK"}));
408
409        let err = verify_event_chain(&events).expect_err("tampered payload should be rejected");
410        assert!(
411            err.contains("trace event hash mismatch"),
412            "unexpected error: {}",
413            err
414        );
415
416        fs::remove_dir_all(&base_dir).expect("test temp dir should be removed");
417    }
418}