1use crate::trace::events::{TraceEvent, TraceEventKind};
4use crate::trace::hasher::{canonical_json, sha256_hash};
5use chrono::Utc;
6use serde_json::json;
7use std::fs::{self, File, OpenOptions};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11pub struct TraceStore {
12 trace_dir: PathBuf,
13 current_run_id: String,
14 current_file: Option<File>,
15 seq: u64,
16 prev_hash: String,
17 doc_hash: String,
18}
19
20const TRACE_GENESIS_HASH: &str = "sha256:genesis";
21
22impl TraceStore {
23 pub fn new(base_dir: &Path) -> Self {
24 let trace_dir = base_dir.join("trace");
25 fs::create_dir_all(&trace_dir).ok();
26 Self {
27 trace_dir,
28 current_run_id: String::new(),
29 current_file: None,
30 seq: 0,
31 prev_hash: TRACE_GENESIS_HASH.to_string(),
32 doc_hash: String::new(),
33 }
34 }
35
36 pub fn start_run(&mut self, doc_hash: &str) -> String {
37 let run_id = uuid::Uuid::new_v4().to_string();
38 self.current_run_id = run_id.clone();
39 self.doc_hash = doc_hash.to_string();
40 self.seq = 0;
41 self.prev_hash = TRACE_GENESIS_HASH.to_string();
42
43 let path = self.trace_dir.join(format!("{}.jsonl", &run_id));
44 self.current_file = OpenOptions::new()
45 .create(true)
46 .truncate(true)
47 .write(true)
48 .open(path)
49 .ok();
50
51 self.emit_event(TraceEventKind::RunStart, None, None);
52 run_id
53 }
54
55 pub fn end_run(&mut self) {
56 self.emit_event(TraceEventKind::RunEnd, None, None);
57 self.current_file = None;
58 }
59
60 pub fn cell_start(&mut self, cell_name: &str) {
61 self.emit_event(TraceEventKind::CellStart, Some(cell_name.to_string()), None);
62 }
63
64 pub fn cell_end(&mut self, cell_name: &str) {
65 self.emit_event(TraceEventKind::CellEnd, Some(cell_name.to_string()), None);
66 }
67
68 pub fn call_enter(&mut self, cell_name: &str) {
69 self.emit_event(TraceEventKind::CallEnter, Some(cell_name.to_string()), None);
70 }
71
72 pub fn call_exit(&mut self, cell_name: &str, result_type: &str) {
73 let mut event = self.make_event(TraceEventKind::CallExit);
74 event.cell = Some(cell_name.to_string());
75 event.details = Some(json!({ "result_type": result_type }));
76 self.write_event(&mut event);
77 }
78
79 pub fn vm_step(&mut self, cell: &str, ip: usize, opcode: &str) {
80 let mut event = self.make_event(TraceEventKind::VmStep);
81 event.cell = Some(cell.to_string());
82 event.details = Some(json!({ "ip": ip, "opcode": opcode }));
83 self.write_event(&mut event);
84 }
85
86 #[allow(clippy::too_many_arguments)]
87 pub fn tool_call(
88 &mut self,
89 cell: &str,
90 tool_id: &str,
91 tool_version: &str,
92 latency_ms: u64,
93 cached: bool,
94 success: bool,
95 message: Option<&str>,
96 ) {
97 let mut event = self.make_event(TraceEventKind::ToolCall);
98 event.cell = Some(cell.to_string());
99 event.tool_id = Some(tool_id.to_string());
100 event.tool_version = Some(tool_version.to_string());
101 event.latency_ms = Some(latency_ms);
102 event.cached = Some(cached);
103 event.details = Some(json!({ "success": success }));
104 event.message = message.map(ToString::to_string);
105 self.write_event(&mut event);
106 }
107
108 pub fn schema_validate(&mut self, cell: &str, schema: &str, valid: bool) {
109 let mut event = self.make_event(TraceEventKind::SchemaValidate);
110 event.cell = Some(cell.to_string());
111 event.details = Some(json!({ "schema": schema, "valid": valid }));
112 self.write_event(&mut event);
113 }
114
115 pub fn error(&mut self, cell: Option<&str>, message: &str) {
116 let mut event = self.make_event(TraceEventKind::Error);
117 event.cell = cell.map(|c| c.to_string());
118 event.message = Some(message.to_string());
119 self.write_event(&mut event);
120 }
121
122 pub fn run_id(&self) -> &str {
123 &self.current_run_id
124 }
125
126 fn emit_event(&mut self, kind: TraceEventKind, cell: Option<String>, message: Option<String>) {
127 let mut event = self.make_event(kind);
128 event.cell = cell;
129 event.message = message;
130 self.write_event(&mut event);
131 }
132
133 fn make_event(&mut self, kind: TraceEventKind) -> TraceEvent {
134 self.seq += 1;
135
136 TraceEvent {
137 seq: self.seq,
138 kind,
139 prev_hash: self.prev_hash.clone(),
140 hash: String::new(),
141 timestamp: Utc::now(),
142 doc_hash: self.doc_hash.clone(),
143 cell: None,
144 tool_id: None,
145 tool_version: None,
146 inputs_hash: None,
147 outputs_hash: None,
148 policy_hash: None,
149 latency_ms: None,
150 cached: None,
151 details: None,
152 message: None,
153 }
154 }
155
156 fn write_event(&mut self, event: &mut TraceEvent) {
157 event.hash = compute_event_hash(event);
158 self.prev_hash = event.hash.clone();
159 if let Some(ref mut file) = self.current_file {
160 if let Ok(json) = serde_json::to_string(event) {
161 writeln!(file, "{}", json).ok();
162 }
163 }
164 }
165}
166
167fn kind_str(kind: &TraceEventKind) -> &'static str {
168 match kind {
169 TraceEventKind::RunStart => "run_start",
170 TraceEventKind::CellStart => "cell_start",
171 TraceEventKind::CellEnd => "cell_end",
172 TraceEventKind::CallEnter => "call_enter",
173 TraceEventKind::CallExit => "call_exit",
174 TraceEventKind::VmStep => "vm_step",
175 TraceEventKind::ToolCall => "tool_call",
176 TraceEventKind::SchemaValidate => "schema_validate",
177 TraceEventKind::Error => "error",
178 TraceEventKind::RunEnd => "run_end",
179 }
180}
181
182fn event_payload(event: &TraceEvent) -> serde_json::Value {
183 json!({
184 "seq": event.seq,
185 "kind": kind_str(&event.kind),
186 "prev_hash": &event.prev_hash,
187 "doc_hash": &event.doc_hash,
188 "cell": &event.cell,
189 "tool_id": &event.tool_id,
190 "tool_version": &event.tool_version,
191 "inputs_hash": &event.inputs_hash,
192 "outputs_hash": &event.outputs_hash,
193 "policy_hash": &event.policy_hash,
194 "latency_ms": event.latency_ms,
195 "cached": event.cached,
196 "details": &event.details,
197 "message": &event.message,
198 })
199}
200
201pub fn compute_event_hash(event: &TraceEvent) -> String {
202 let canonical = canonical_json(&event_payload(event));
203 sha256_hash(&canonical)
204}
205
206pub fn verify_event_chain(events: &[TraceEvent]) -> Result<(), String> {
207 let mut expected_seq = 1_u64;
208 let mut expected_prev = TRACE_GENESIS_HASH.to_string();
209
210 for event in events {
211 if event.seq != expected_seq {
212 return Err(format!(
213 "trace sequence mismatch at seq {} (expected {})",
214 event.seq, expected_seq
215 ));
216 }
217 if event.prev_hash != expected_prev {
218 return Err(format!(
219 "trace hash chain mismatch at seq {} (expected prev '{}', got '{}')",
220 event.seq, expected_prev, event.prev_hash
221 ));
222 }
223 let expected_hash = compute_event_hash(event);
224 if event.hash != expected_hash {
225 return Err(format!(
226 "trace event hash mismatch at seq {} (expected '{}', got '{}')",
227 event.seq, expected_hash, event.hash
228 ));
229 }
230 expected_seq += 1;
231 expected_prev = event.hash.clone();
232 }
233
234 Ok(())
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use chrono::{TimeZone, Utc};
241
242 #[test]
243 fn trace_store_emits_structured_vm_events() {
244 let base_dir =
245 std::env::temp_dir().join(format!("lumen-trace-store-test-{}", uuid::Uuid::new_v4()));
246 fs::create_dir_all(&base_dir).expect("test temp dir should be created");
247
248 let mut store = TraceStore::new(&base_dir);
249 let run_id = store.start_run("doc-123");
250 store.cell_start("main");
251 store.call_enter("main");
252 store.vm_step("main", 7, "ToolCall");
253 store.tool_call("main", "http.get", "1.0.0", 12, false, true, None);
254 store.schema_validate("main", "String", true);
255 store.call_exit("main", "String");
256 store.cell_end("main");
257 store.end_run();
258
259 let path = base_dir.join("trace").join(format!("{}.jsonl", run_id));
260 let content = fs::read_to_string(&path).expect("trace file should be readable");
261 let events: Vec<TraceEvent> = content
262 .lines()
263 .map(|line| serde_json::from_str(line).expect("trace event should deserialize"))
264 .collect();
265
266 let kinds: Vec<TraceEventKind> = events.iter().map(|event| event.kind.clone()).collect();
267 assert_eq!(
268 kinds,
269 vec![
270 TraceEventKind::RunStart,
271 TraceEventKind::CellStart,
272 TraceEventKind::CallEnter,
273 TraceEventKind::VmStep,
274 TraceEventKind::ToolCall,
275 TraceEventKind::SchemaValidate,
276 TraceEventKind::CallExit,
277 TraceEventKind::CellEnd,
278 TraceEventKind::RunEnd,
279 ]
280 );
281
282 let step = events
283 .iter()
284 .find(|event| event.kind == TraceEventKind::VmStep)
285 .expect("vm_step event should exist");
286 assert_eq!(step.cell.as_deref(), Some("main"));
287 assert_eq!(
288 step.details
289 .as_ref()
290 .and_then(|d| d.get("ip"))
291 .and_then(|v| v.as_u64()),
292 Some(7)
293 );
294 assert_eq!(
295 step.details
296 .as_ref()
297 .and_then(|d| d.get("opcode"))
298 .and_then(|v| v.as_str()),
299 Some("ToolCall")
300 );
301
302 let tool = events
303 .iter()
304 .find(|event| event.kind == TraceEventKind::ToolCall)
305 .expect("tool_call event should exist");
306 assert_eq!(tool.tool_id.as_deref(), Some("http.get"));
307 assert_eq!(tool.tool_version.as_deref(), Some("1.0.0"));
308 assert_eq!(tool.latency_ms, Some(12));
309 assert_eq!(tool.cached, Some(false));
310 assert_eq!(
311 tool.details
312 .as_ref()
313 .and_then(|d| d.get("success"))
314 .and_then(|v| v.as_bool()),
315 Some(true)
316 );
317
318 let schema = events
319 .iter()
320 .find(|event| event.kind == TraceEventKind::SchemaValidate)
321 .expect("schema_validate event should exist");
322 assert_eq!(
323 schema
324 .details
325 .as_ref()
326 .and_then(|d| d.get("schema"))
327 .and_then(|v| v.as_str()),
328 Some("String")
329 );
330 assert_eq!(
331 schema
332 .details
333 .as_ref()
334 .and_then(|d| d.get("valid"))
335 .and_then(|v| v.as_bool()),
336 Some(true)
337 );
338 verify_event_chain(&events).expect("generated trace should pass chain verification");
339
340 fs::remove_dir_all(&base_dir).expect("test temp dir should be removed");
341 }
342
343 #[test]
344 fn compute_event_hash_is_stable_across_timestamp_but_sensitive_to_payload() {
345 let mut event = TraceEvent {
346 seq: 2,
347 kind: TraceEventKind::VmStep,
348 prev_hash: "sha256:prev".to_string(),
349 hash: String::new(),
350 timestamp: Utc
351 .timestamp_opt(1_700_000_000, 0)
352 .single()
353 .expect("timestamp should be valid"),
354 doc_hash: "doc-123".to_string(),
355 cell: Some("main".to_string()),
356 tool_id: None,
357 tool_version: None,
358 inputs_hash: None,
359 outputs_hash: None,
360 policy_hash: None,
361 latency_ms: None,
362 cached: None,
363 details: Some(json!({"ip": 1, "opcode": "LoadK"})),
364 message: None,
365 };
366
367 let hash_a = compute_event_hash(&event);
368 event.timestamp = Utc
369 .timestamp_opt(1_700_000_001, 0)
370 .single()
371 .expect("timestamp should be valid");
372 let hash_b = compute_event_hash(&event);
373 assert_eq!(hash_a, hash_b, "hash should ignore wall-clock timestamp");
374
375 event.message = Some("tampered".to_string());
376 let hash_c = compute_event_hash(&event);
377 assert_ne!(hash_a, hash_c, "hash should change when payload changes");
378 }
379
380 #[test]
381 fn verify_event_chain_rejects_tampered_event_payload() {
382 let base_dir = std::env::temp_dir().join(format!(
383 "lumen-trace-store-verify-test-{}",
384 uuid::Uuid::new_v4()
385 ));
386 fs::create_dir_all(&base_dir).expect("test temp dir should be created");
387
388 let mut store = TraceStore::new(&base_dir);
389 let run_id = store.start_run("doc-123");
390 store.cell_start("main");
391 store.vm_step("main", 1, "LoadK");
392 store.end_run();
393
394 let path = base_dir.join("trace").join(format!("{}.jsonl", run_id));
395 let content = fs::read_to_string(&path).expect("trace file should be readable");
396 let mut events: Vec<TraceEvent> = content
397 .lines()
398 .map(|line| serde_json::from_str(line).expect("trace event should deserialize"))
399 .collect();
400
401 verify_event_chain(&events).expect("fresh events should pass verification");
402
403 let step = events
404 .iter_mut()
405 .find(|event| event.kind == TraceEventKind::VmStep)
406 .expect("vm step event should exist");
407 step.details = Some(json!({"ip": 999, "opcode": "LoadK"}));
408
409 let err = verify_event_chain(&events).expect_err("tampered payload should be rejected");
410 assert!(
411 err.contains("trace event hash mismatch"),
412 "unexpected error: {}",
413 err
414 );
415
416 fs::remove_dir_all(&base_dir).expect("test temp dir should be removed");
417 }
418}