1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::fs::{self, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct EventLogStats {
18 pub events: usize,
19 pub spans: usize,
20 pub approx_event_bytes: usize,
21 pub approx_span_bytes: usize,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum EventKind {
28 ProposalReceived,
29 ActionValidated,
30 ActionRejected,
31 ActionExecuting,
32 ActionSucceeded,
33 ActionFailed,
34 ActionSkipped,
35 ActionRetrying,
36 ActionDeduplicated,
37 PolicyViolation,
38 StateChanged,
39 StateSnapshot,
40 StateRollback,
41 SkillDistilled,
43 SkillEvolved,
44 SkillDeprecated,
45 EvolutionTriggered,
46 Consolidated,
48 ReplanAttempted,
50 ReplanProposalReceived,
51 ReplanRejected,
52 ReplanExhausted,
53 VoiceFastTurnStarted,
57 VoiceFastTurnEnded,
58 VoiceSidecarResolved,
59 VoiceSidecarFailed,
60 VoiceSidecarTimedOut,
61 VoiceTurnCancelled,
62 VoiceBridgePlayed,
63 SessionScope,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74#[serde(rename_all = "snake_case")]
75pub enum SpanStatus {
76 Ok,
77 Error,
78 Unset,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Span {
84 pub trace_id: String,
85 pub span_id: String,
86 pub parent_span_id: Option<String>,
87 pub name: String,
88 pub start_time: DateTime<Utc>,
89 pub end_time: Option<DateTime<Utc>>,
90 pub status: SpanStatus,
91 pub attributes: HashMap<String, Value>,
92}
93
94#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96pub struct Event {
97 pub kind: EventKind,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub action_id: Option<String>,
100 #[serde(default, skip_serializing_if = "Option::is_none")]
101 pub proposal_id: Option<String>,
102 #[serde(default)]
103 pub data: HashMap<String, Value>,
104 #[serde(default = "Utc::now")]
105 pub timestamp: DateTime<Utc>,
106}
107
108pub struct EventLog {
110 events: Vec<Event>,
111 spans: Vec<Span>,
112 journal_path: Option<PathBuf>,
113}
114
115impl EventLog {
116 pub fn new() -> Self {
117 Self {
118 events: Vec::new(),
119 spans: Vec::new(),
120 journal_path: None,
121 }
122 }
123
124 pub fn with_journal(path: PathBuf) -> Self {
125 if let Some(parent) = path.parent() {
126 let _ = fs::create_dir_all(parent);
127 }
128 Self {
129 events: Vec::new(),
130 spans: Vec::new(),
131 journal_path: Some(path),
132 }
133 }
134
135 pub fn append(
136 &mut self,
137 kind: EventKind,
138 action_id: Option<&str>,
139 proposal_id: Option<&str>,
140 data: HashMap<String, Value>,
141 ) -> &Event {
142 let event = Event {
143 kind,
144 action_id: action_id.map(|s| s.to_string()),
145 proposal_id: proposal_id.map(|s| s.to_string()),
146 data,
147 timestamp: Utc::now(),
148 };
149
150 if let Some(ref path) = self.journal_path {
151 if let Ok(json) = serde_json::to_string(&event) {
152 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
153 let _ = writeln!(file, "{}", json);
154 }
155 }
156 }
157
158 self.events.push(event);
159 self.events.last().unwrap()
160 }
161
162 pub fn events(&self) -> &[Event] {
163 &self.events
164 }
165
166 pub fn len(&self) -> usize {
167 self.events.len()
168 }
169
170 pub fn span_len(&self) -> usize {
171 self.spans.len()
172 }
173
174 pub fn is_empty(&self) -> bool {
175 self.events.is_empty()
176 }
177
178 pub fn stats(&self) -> EventLogStats {
179 EventLogStats {
180 events: self.events.len(),
181 spans: self.spans.len(),
182 approx_event_bytes: approx_json_bytes(&self.events),
183 approx_span_bytes: approx_json_bytes(&self.spans),
184 }
185 }
186
187 pub fn truncate_events_keep_last(&mut self, keep_last: usize) -> usize {
188 truncate_vec_keep_last(&mut self.events, keep_last)
189 }
190
191 pub fn truncate_spans_keep_last(&mut self, keep_last: usize) -> usize {
192 truncate_vec_keep_last(&mut self.spans, keep_last)
193 }
194
195 pub fn clear(&mut self) -> EventLogStats {
196 let removed = self.stats();
197 self.events.clear();
198 self.events.shrink_to_fit();
199 self.spans.clear();
200 self.spans.shrink_to_fit();
201 removed
202 }
203
204 pub fn filter(&self, kind: Option<&EventKind>, action_id: Option<&str>) -> Vec<&Event> {
205 self.events
206 .iter()
207 .filter(|e| {
208 if let Some(k) = kind {
209 if &e.kind != k {
210 return false;
211 }
212 }
213 if let Some(aid) = action_id {
214 if e.action_id.as_deref() != Some(aid) {
215 return false;
216 }
217 }
218 true
219 })
220 .collect()
221 }
222
223 pub fn begin_span(
225 &mut self,
226 name: &str,
227 trace_id: &str,
228 parent_span_id: Option<&str>,
229 attributes: HashMap<String, Value>,
230 ) -> String {
231 let span_id = Uuid::new_v4().to_string();
232 let span = Span {
233 trace_id: trace_id.to_string(),
234 span_id: span_id.clone(),
235 parent_span_id: parent_span_id.map(|s| s.to_string()),
236 name: name.to_string(),
237 start_time: Utc::now(),
238 end_time: None,
239 status: SpanStatus::Unset,
240 attributes,
241 };
242 self.spans.push(span);
243 span_id
244 }
245
246 pub fn end_span(&mut self, span_id: &str, status: SpanStatus) {
248 if let Some(span) = self.spans.iter_mut().find(|s| s.span_id == span_id) {
249 span.end_time = Some(Utc::now());
250 span.status = status;
251 }
252 }
253
254 pub fn spans(&self) -> Vec<Span> {
256 self.spans.clone()
257 }
258
259 pub fn export_traces(&self) -> String {
261 let mut traces: HashMap<&str, Vec<&Span>> = HashMap::new();
263 for span in &self.spans {
264 traces.entry(span.trace_id.as_str()).or_default().push(span);
265 }
266
267 let resource_spans: Vec<Value> = traces
268 .into_iter()
269 .map(|(_trace_id, spans)| {
270 let scope_spans = spans
271 .iter()
272 .map(|s| {
273 let mut span_obj = serde_json::json!({
274 "traceId": s.trace_id,
275 "spanId": s.span_id,
276 "name": s.name,
277 "startTimeUnixNano": s.start_time.timestamp_nanos_opt().unwrap_or(0).to_string(),
278 "status": {
279 "code": match s.status {
280 SpanStatus::Ok => 1,
281 SpanStatus::Error => 2,
282 SpanStatus::Unset => 0,
283 }
284 },
285 "attributes": s.attributes.iter().map(|(k, v)| {
286 serde_json::json!({
287 "key": k,
288 "value": { "stringValue": v.to_string() }
289 })
290 }).collect::<Vec<_>>(),
291 });
292
293 if let Some(ref parent) = s.parent_span_id {
294 span_obj.as_object_mut().unwrap().insert(
295 "parentSpanId".to_string(),
296 Value::from(parent.as_str()),
297 );
298 }
299 if let Some(end) = s.end_time {
300 span_obj.as_object_mut().unwrap().insert(
301 "endTimeUnixNano".to_string(),
302 Value::from(end.timestamp_nanos_opt().unwrap_or(0).to_string()),
303 );
304 }
305
306 span_obj
307 })
308 .collect::<Vec<_>>();
309
310 serde_json::json!({
311 "resource": {
312 "attributes": [
313 { "key": "service.name", "value": { "stringValue": "car-runtime" } }
314 ]
315 },
316 "scopeSpans": [{
317 "scope": { "name": "car-eventlog" },
318 "spans": scope_spans
319 }]
320 })
321 })
322 .collect();
323
324 serde_json::to_string(&serde_json::json!({
325 "resourceSpans": resource_spans
326 }))
327 .unwrap_or_else(|_| "{}".to_string())
328 }
329
330 pub fn load(path: &Path) -> std::io::Result<Self> {
332 let file = fs::File::open(path)?;
333 let reader = BufReader::new(file);
334 let mut events = Vec::new();
335
336 for line in reader.lines() {
337 let line = line?;
338 let line = line.trim();
339 if !line.is_empty() {
340 if let Ok(event) = serde_json::from_str::<Event>(line) {
341 events.push(event);
342 }
343 }
344 }
345
346 Ok(Self {
347 events,
348 spans: Vec::new(),
349 journal_path: Some(path.to_path_buf()),
350 })
351 }
352}
353
354fn approx_json_bytes<T: Serialize>(value: &T) -> usize {
355 serde_json::to_vec(value)
356 .map(|bytes| bytes.len())
357 .unwrap_or(0)
358}
359
360fn truncate_vec_keep_last<T>(items: &mut Vec<T>, keep_last: usize) -> usize {
361 let len = items.len();
362 if len <= keep_last {
363 return 0;
364 }
365 let removed = len - keep_last;
366 items.drain(..removed);
367 items.shrink_to_fit();
368 removed
369}
370
371impl Default for EventLog {
372 fn default() -> Self {
373 Self::new()
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn append_and_read() {
383 let mut log = EventLog::new();
384 log.append(
385 EventKind::ProposalReceived,
386 None,
387 Some("p1"),
388 [("source".to_string(), Value::from("test"))].into(),
389 );
390 assert_eq!(log.len(), 1);
391 assert_eq!(log.events()[0].kind, EventKind::ProposalReceived);
392 }
393
394 #[test]
395 fn filter_by_kind() {
396 let mut log = EventLog::new();
397 log.append(
398 EventKind::ProposalReceived,
399 None,
400 Some("p1"),
401 HashMap::new(),
402 );
403 log.append(
404 EventKind::ActionValidated,
405 Some("a1"),
406 Some("p1"),
407 HashMap::new(),
408 );
409 log.append(
410 EventKind::ActionSucceeded,
411 Some("a1"),
412 Some("p1"),
413 HashMap::new(),
414 );
415
416 let validated = log.filter(Some(&EventKind::ActionValidated), None);
417 assert_eq!(validated.len(), 1);
418 }
419
420 #[test]
421 fn filter_by_action_id() {
422 let mut log = EventLog::new();
423 log.append(EventKind::ActionValidated, Some("a1"), None, HashMap::new());
424 log.append(EventKind::ActionValidated, Some("a2"), None, HashMap::new());
425
426 let a1_events = log.filter(None, Some("a1"));
427 assert_eq!(a1_events.len(), 1);
428 }
429
430 #[test]
431 fn journal_write_and_reload() {
432 let dir = tempfile::tempdir().unwrap();
433 let journal = dir.path().join("events.jsonl");
434
435 {
436 let mut log = EventLog::with_journal(journal.clone());
437 log.append(
438 EventKind::ProposalReceived,
439 None,
440 Some("p1"),
441 HashMap::new(),
442 );
443 log.append(
444 EventKind::ActionSucceeded,
445 Some("a1"),
446 Some("p1"),
447 HashMap::new(),
448 );
449 }
450
451 assert!(journal.exists());
452
453 let reloaded = EventLog::load(&journal).unwrap();
454 assert_eq!(reloaded.len(), 2);
455 assert_eq!(reloaded.events()[0].kind, EventKind::ProposalReceived);
456 assert_eq!(reloaded.events()[1].kind, EventKind::ActionSucceeded);
457 }
458
459 #[test]
460 fn event_kind_serializes_snake_case() {
461 assert_eq!(
462 serde_json::to_string(&EventKind::ProposalReceived).unwrap(),
463 "\"proposal_received\""
464 );
465 assert_eq!(
466 serde_json::to_string(&EventKind::StateSnapshot).unwrap(),
467 "\"state_snapshot\""
468 );
469 }
470
471 #[test]
472 fn stats_truncate_and_clear_release_retained_entries() {
473 let mut log = EventLog::new();
474 for idx in 0..5 {
475 log.append(
476 EventKind::ActionSucceeded,
477 Some(&format!("a{idx}")),
478 Some("p1"),
479 [("payload".to_string(), Value::from("x".repeat(16)))].into(),
480 );
481 log.begin_span("action.tool_call", "trace", None, HashMap::new());
482 }
483
484 let stats = log.stats();
485 assert_eq!(stats.events, 5);
486 assert_eq!(stats.spans, 5);
487 assert!(stats.approx_event_bytes > 0);
488 assert!(stats.approx_span_bytes > 0);
489
490 assert_eq!(log.truncate_events_keep_last(2), 3);
491 assert_eq!(log.truncate_spans_keep_last(1), 4);
492 assert_eq!(log.len(), 2);
493 assert_eq!(log.span_len(), 1);
494 assert_eq!(log.events()[0].action_id.as_deref(), Some("a3"));
495
496 let removed = log.clear();
497 assert_eq!(removed.events, 2);
498 assert_eq!(removed.spans, 1);
499 assert_eq!(log.len(), 0);
500 assert_eq!(log.span_len(), 0);
501 }
502
503 #[test]
504 fn span_begin_end_lifecycle() {
505 let mut log = EventLog::new();
506 let trace_id = "trace-1".to_string();
507
508 let span_id = log.begin_span(
509 "test.operation",
510 &trace_id,
511 None,
512 [("key".to_string(), Value::from("value"))].into(),
513 );
514
515 let spans = log.spans();
516 assert_eq!(spans.len(), 1);
517 assert_eq!(spans[0].name, "test.operation");
518 assert_eq!(spans[0].trace_id, "trace-1");
519 assert!(spans[0].parent_span_id.is_none());
520 assert!(spans[0].end_time.is_none());
521 assert_eq!(spans[0].status, SpanStatus::Unset);
522
523 log.end_span(&span_id, SpanStatus::Ok);
524
525 let spans = log.spans();
526 assert!(spans[0].end_time.is_some());
527 assert_eq!(spans[0].status, SpanStatus::Ok);
528 }
529
530 #[test]
531 fn span_parent_child_relationship() {
532 let mut log = EventLog::new();
533 let trace_id = "trace-2".to_string();
534
535 let parent_id = log.begin_span("parent.op", &trace_id, None, HashMap::new());
536 let child_id = log.begin_span("child.op", &trace_id, Some(&parent_id), HashMap::new());
537
538 let spans = log.spans();
539 assert_eq!(spans.len(), 2);
540
541 let child = spans.iter().find(|s| s.span_id == child_id).unwrap();
542 assert_eq!(child.parent_span_id.as_deref(), Some(parent_id.as_str()));
543 assert_eq!(child.trace_id, trace_id);
544
545 let parent = spans.iter().find(|s| s.span_id == parent_id).unwrap();
546 assert!(parent.parent_span_id.is_none());
547 }
548
549 #[test]
550 fn export_traces_produces_valid_json() {
551 let mut log = EventLog::new();
552 let trace_id = "trace-3".to_string();
553
554 let root = log.begin_span(
555 "proposal.execute",
556 &trace_id,
557 None,
558 [("proposal_id".to_string(), Value::from("p1"))].into(),
559 );
560 let child = log.begin_span(
561 "action.tool_call",
562 &trace_id,
563 Some(&root),
564 [("tool".to_string(), Value::from("read_file"))].into(),
565 );
566 log.end_span(&child, SpanStatus::Ok);
567 log.end_span(&root, SpanStatus::Ok);
568
569 let json_str = log.export_traces();
570 let parsed: Value =
571 serde_json::from_str(&json_str).expect("export_traces must produce valid JSON");
572
573 let resource_spans = parsed["resourceSpans"].as_array().unwrap();
574 assert_eq!(resource_spans.len(), 1);
575
576 let scope_spans = &resource_spans[0]["scopeSpans"][0]["spans"];
577 let spans_arr = scope_spans.as_array().unwrap();
578 assert_eq!(spans_arr.len(), 2);
579
580 for span in spans_arr {
582 assert!(span.get("traceId").is_some());
583 assert!(span.get("spanId").is_some());
584 assert!(span.get("name").is_some());
585 assert!(span.get("startTimeUnixNano").is_some());
586 assert!(span.get("endTimeUnixNano").is_some());
587 assert!(span.get("status").is_some());
588 }
589
590 let child_span = spans_arr
592 .iter()
593 .find(|s| s["name"] == "action.tool_call")
594 .unwrap();
595 assert!(child_span.get("parentSpanId").is_some());
596 }
597
598 #[test]
599 fn span_status_set_on_error() {
600 let mut log = EventLog::new();
601 let trace_id = "trace-4".to_string();
602
603 let span_id = log.begin_span("failing.op", &trace_id, None, HashMap::new());
604 log.end_span(&span_id, SpanStatus::Error);
605
606 let spans = log.spans();
607 assert_eq!(spans[0].status, SpanStatus::Error);
608 assert!(spans[0].end_time.is_some());
609 }
610}