1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::fs::{self, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct EventLogStats {
18 pub events: usize,
19 pub spans: usize,
20 pub approx_event_bytes: usize,
21 pub approx_span_bytes: usize,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum EventKind {
28 ProposalReceived,
29 ActionValidated,
30 ActionRejected,
31 ActionExecuting,
32 ActionSucceeded,
33 ActionFailed,
34 ActionSkipped,
35 ActionRetrying,
36 ActionDeduplicated,
37 PolicyViolation,
38 StateChanged,
39 StateSnapshot,
40 StateRollback,
41 SkillDistilled,
43 SkillEvolved,
44 SkillDeprecated,
45 EvolutionTriggered,
46 CandidatePromoted,
50 CandidateRejected,
53 Consolidated,
55 ReplanAttempted,
57 ReplanProposalReceived,
58 ReplanRejected,
59 ReplanExhausted,
60 VoiceFastTurnStarted,
64 VoiceFastTurnEnded,
65 VoiceSidecarResolved,
66 VoiceSidecarFailed,
67 VoiceSidecarTimedOut,
68 VoiceTurnCancelled,
69 VoiceBridgePlayed,
70 SessionScope,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
81#[serde(rename_all = "snake_case")]
82pub enum SpanStatus {
83 Ok,
84 Error,
85 Unset,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Span {
91 pub trace_id: String,
92 pub span_id: String,
93 pub parent_span_id: Option<String>,
94 pub name: String,
95 pub start_time: DateTime<Utc>,
96 pub end_time: Option<DateTime<Utc>>,
97 pub status: SpanStatus,
98 pub attributes: HashMap<String, Value>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103pub struct Event {
104 pub kind: EventKind,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub action_id: Option<String>,
107 #[serde(default, skip_serializing_if = "Option::is_none")]
108 pub proposal_id: Option<String>,
109 #[serde(default)]
110 pub data: HashMap<String, Value>,
111 #[serde(default = "Utc::now")]
112 pub timestamp: DateTime<Utc>,
113}
114
115pub struct EventLog {
117 events: Vec<Event>,
118 spans: Vec<Span>,
119 journal_path: Option<PathBuf>,
120}
121
122impl EventLog {
123 pub fn new() -> Self {
124 Self {
125 events: Vec::new(),
126 spans: Vec::new(),
127 journal_path: None,
128 }
129 }
130
131 pub fn with_journal(path: PathBuf) -> Self {
132 if let Some(parent) = path.parent() {
133 let _ = fs::create_dir_all(parent);
134 }
135 Self {
136 events: Vec::new(),
137 spans: Vec::new(),
138 journal_path: Some(path),
139 }
140 }
141
142 pub fn append(
143 &mut self,
144 kind: EventKind,
145 action_id: Option<&str>,
146 proposal_id: Option<&str>,
147 data: HashMap<String, Value>,
148 ) -> &Event {
149 let event = Event {
150 kind,
151 action_id: action_id.map(|s| s.to_string()),
152 proposal_id: proposal_id.map(|s| s.to_string()),
153 data,
154 timestamp: Utc::now(),
155 };
156
157 if let Some(ref path) = self.journal_path {
158 if let Ok(json) = serde_json::to_string(&event) {
159 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
160 let _ = writeln!(file, "{}", json);
161 }
162 }
163 }
164
165 self.events.push(event);
166 self.events.last().unwrap()
167 }
168
169 pub fn events(&self) -> &[Event] {
170 &self.events
171 }
172
173 pub fn len(&self) -> usize {
174 self.events.len()
175 }
176
177 pub fn span_len(&self) -> usize {
178 self.spans.len()
179 }
180
181 pub fn is_empty(&self) -> bool {
182 self.events.is_empty()
183 }
184
185 pub fn stats(&self) -> EventLogStats {
186 EventLogStats {
187 events: self.events.len(),
188 spans: self.spans.len(),
189 approx_event_bytes: approx_json_bytes(&self.events),
190 approx_span_bytes: approx_json_bytes(&self.spans),
191 }
192 }
193
194 pub fn truncate_events_keep_last(&mut self, keep_last: usize) -> usize {
195 truncate_vec_keep_last(&mut self.events, keep_last)
196 }
197
198 pub fn truncate_spans_keep_last(&mut self, keep_last: usize) -> usize {
199 truncate_vec_keep_last(&mut self.spans, keep_last)
200 }
201
202 pub fn clear(&mut self) -> EventLogStats {
203 let removed = self.stats();
204 self.events.clear();
205 self.events.shrink_to_fit();
206 self.spans.clear();
207 self.spans.shrink_to_fit();
208 removed
209 }
210
211 pub fn filter(&self, kind: Option<&EventKind>, action_id: Option<&str>) -> Vec<&Event> {
212 self.events
213 .iter()
214 .filter(|e| {
215 if let Some(k) = kind {
216 if &e.kind != k {
217 return false;
218 }
219 }
220 if let Some(aid) = action_id {
221 if e.action_id.as_deref() != Some(aid) {
222 return false;
223 }
224 }
225 true
226 })
227 .collect()
228 }
229
230 pub fn begin_span(
232 &mut self,
233 name: &str,
234 trace_id: &str,
235 parent_span_id: Option<&str>,
236 attributes: HashMap<String, Value>,
237 ) -> String {
238 let span_id = Uuid::new_v4().to_string();
239 let span = Span {
240 trace_id: trace_id.to_string(),
241 span_id: span_id.clone(),
242 parent_span_id: parent_span_id.map(|s| s.to_string()),
243 name: name.to_string(),
244 start_time: Utc::now(),
245 end_time: None,
246 status: SpanStatus::Unset,
247 attributes,
248 };
249 self.spans.push(span);
250 span_id
251 }
252
253 pub fn end_span(&mut self, span_id: &str, status: SpanStatus) {
255 if let Some(span) = self.spans.iter_mut().find(|s| s.span_id == span_id) {
256 span.end_time = Some(Utc::now());
257 span.status = status;
258 }
259 }
260
261 pub fn spans(&self) -> Vec<Span> {
263 self.spans.clone()
264 }
265
266 pub fn export_traces(&self) -> String {
268 let mut traces: HashMap<&str, Vec<&Span>> = HashMap::new();
270 for span in &self.spans {
271 traces.entry(span.trace_id.as_str()).or_default().push(span);
272 }
273
274 let resource_spans: Vec<Value> = traces
275 .into_iter()
276 .map(|(_trace_id, spans)| {
277 let scope_spans = spans
278 .iter()
279 .map(|s| {
280 let mut span_obj = serde_json::json!({
281 "traceId": s.trace_id,
282 "spanId": s.span_id,
283 "name": s.name,
284 "startTimeUnixNano": s.start_time.timestamp_nanos_opt().unwrap_or(0).to_string(),
285 "status": {
286 "code": match s.status {
287 SpanStatus::Ok => 1,
288 SpanStatus::Error => 2,
289 SpanStatus::Unset => 0,
290 }
291 },
292 "attributes": s.attributes.iter().map(|(k, v)| {
293 serde_json::json!({
294 "key": k,
295 "value": { "stringValue": v.to_string() }
296 })
297 }).collect::<Vec<_>>(),
298 });
299
300 if let Some(ref parent) = s.parent_span_id {
301 span_obj.as_object_mut().unwrap().insert(
302 "parentSpanId".to_string(),
303 Value::from(parent.as_str()),
304 );
305 }
306 if let Some(end) = s.end_time {
307 span_obj.as_object_mut().unwrap().insert(
308 "endTimeUnixNano".to_string(),
309 Value::from(end.timestamp_nanos_opt().unwrap_or(0).to_string()),
310 );
311 }
312
313 span_obj
314 })
315 .collect::<Vec<_>>();
316
317 serde_json::json!({
318 "resource": {
319 "attributes": [
320 { "key": "service.name", "value": { "stringValue": "car-runtime" } }
321 ]
322 },
323 "scopeSpans": [{
324 "scope": { "name": "car-eventlog" },
325 "spans": scope_spans
326 }]
327 })
328 })
329 .collect();
330
331 serde_json::to_string(&serde_json::json!({
332 "resourceSpans": resource_spans
333 }))
334 .unwrap_or_else(|_| "{}".to_string())
335 }
336
337 pub fn load(path: &Path) -> std::io::Result<Self> {
339 let file = fs::File::open(path)?;
340 let reader = BufReader::new(file);
341 let mut events = Vec::new();
342
343 for line in reader.lines() {
344 let line = line?;
345 let line = line.trim();
346 if !line.is_empty() {
347 if let Ok(event) = serde_json::from_str::<Event>(line) {
348 events.push(event);
349 }
350 }
351 }
352
353 Ok(Self {
354 events,
355 spans: Vec::new(),
356 journal_path: Some(path.to_path_buf()),
357 })
358 }
359}
360
361fn approx_json_bytes<T: Serialize>(value: &T) -> usize {
362 serde_json::to_vec(value)
363 .map(|bytes| bytes.len())
364 .unwrap_or(0)
365}
366
367fn truncate_vec_keep_last<T>(items: &mut Vec<T>, keep_last: usize) -> usize {
368 let len = items.len();
369 if len <= keep_last {
370 return 0;
371 }
372 let removed = len - keep_last;
373 items.drain(..removed);
374 items.shrink_to_fit();
375 removed
376}
377
378impl Default for EventLog {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn append_and_read() {
390 let mut log = EventLog::new();
391 log.append(
392 EventKind::ProposalReceived,
393 None,
394 Some("p1"),
395 [("source".to_string(), Value::from("test"))].into(),
396 );
397 assert_eq!(log.len(), 1);
398 assert_eq!(log.events()[0].kind, EventKind::ProposalReceived);
399 }
400
401 #[test]
402 fn filter_by_kind() {
403 let mut log = EventLog::new();
404 log.append(
405 EventKind::ProposalReceived,
406 None,
407 Some("p1"),
408 HashMap::new(),
409 );
410 log.append(
411 EventKind::ActionValidated,
412 Some("a1"),
413 Some("p1"),
414 HashMap::new(),
415 );
416 log.append(
417 EventKind::ActionSucceeded,
418 Some("a1"),
419 Some("p1"),
420 HashMap::new(),
421 );
422
423 let validated = log.filter(Some(&EventKind::ActionValidated), None);
424 assert_eq!(validated.len(), 1);
425 }
426
427 #[test]
428 fn filter_by_action_id() {
429 let mut log = EventLog::new();
430 log.append(EventKind::ActionValidated, Some("a1"), None, HashMap::new());
431 log.append(EventKind::ActionValidated, Some("a2"), None, HashMap::new());
432
433 let a1_events = log.filter(None, Some("a1"));
434 assert_eq!(a1_events.len(), 1);
435 }
436
437 #[test]
438 fn journal_write_and_reload() {
439 let dir = tempfile::tempdir().unwrap();
440 let journal = dir.path().join("events.jsonl");
441
442 {
443 let mut log = EventLog::with_journal(journal.clone());
444 log.append(
445 EventKind::ProposalReceived,
446 None,
447 Some("p1"),
448 HashMap::new(),
449 );
450 log.append(
451 EventKind::ActionSucceeded,
452 Some("a1"),
453 Some("p1"),
454 HashMap::new(),
455 );
456 }
457
458 assert!(journal.exists());
459
460 let reloaded = EventLog::load(&journal).unwrap();
461 assert_eq!(reloaded.len(), 2);
462 assert_eq!(reloaded.events()[0].kind, EventKind::ProposalReceived);
463 assert_eq!(reloaded.events()[1].kind, EventKind::ActionSucceeded);
464 }
465
466 #[test]
467 fn event_kind_serializes_snake_case() {
468 assert_eq!(
469 serde_json::to_string(&EventKind::ProposalReceived).unwrap(),
470 "\"proposal_received\""
471 );
472 assert_eq!(
473 serde_json::to_string(&EventKind::StateSnapshot).unwrap(),
474 "\"state_snapshot\""
475 );
476 }
477
478 #[test]
479 fn stats_truncate_and_clear_release_retained_entries() {
480 let mut log = EventLog::new();
481 for idx in 0..5 {
482 log.append(
483 EventKind::ActionSucceeded,
484 Some(&format!("a{idx}")),
485 Some("p1"),
486 [("payload".to_string(), Value::from("x".repeat(16)))].into(),
487 );
488 log.begin_span("action.tool_call", "trace", None, HashMap::new());
489 }
490
491 let stats = log.stats();
492 assert_eq!(stats.events, 5);
493 assert_eq!(stats.spans, 5);
494 assert!(stats.approx_event_bytes > 0);
495 assert!(stats.approx_span_bytes > 0);
496
497 assert_eq!(log.truncate_events_keep_last(2), 3);
498 assert_eq!(log.truncate_spans_keep_last(1), 4);
499 assert_eq!(log.len(), 2);
500 assert_eq!(log.span_len(), 1);
501 assert_eq!(log.events()[0].action_id.as_deref(), Some("a3"));
502
503 let removed = log.clear();
504 assert_eq!(removed.events, 2);
505 assert_eq!(removed.spans, 1);
506 assert_eq!(log.len(), 0);
507 assert_eq!(log.span_len(), 0);
508 }
509
510 #[test]
511 fn span_begin_end_lifecycle() {
512 let mut log = EventLog::new();
513 let trace_id = "trace-1".to_string();
514
515 let span_id = log.begin_span(
516 "test.operation",
517 &trace_id,
518 None,
519 [("key".to_string(), Value::from("value"))].into(),
520 );
521
522 let spans = log.spans();
523 assert_eq!(spans.len(), 1);
524 assert_eq!(spans[0].name, "test.operation");
525 assert_eq!(spans[0].trace_id, "trace-1");
526 assert!(spans[0].parent_span_id.is_none());
527 assert!(spans[0].end_time.is_none());
528 assert_eq!(spans[0].status, SpanStatus::Unset);
529
530 log.end_span(&span_id, SpanStatus::Ok);
531
532 let spans = log.spans();
533 assert!(spans[0].end_time.is_some());
534 assert_eq!(spans[0].status, SpanStatus::Ok);
535 }
536
537 #[test]
538 fn span_parent_child_relationship() {
539 let mut log = EventLog::new();
540 let trace_id = "trace-2".to_string();
541
542 let parent_id = log.begin_span("parent.op", &trace_id, None, HashMap::new());
543 let child_id = log.begin_span("child.op", &trace_id, Some(&parent_id), HashMap::new());
544
545 let spans = log.spans();
546 assert_eq!(spans.len(), 2);
547
548 let child = spans.iter().find(|s| s.span_id == child_id).unwrap();
549 assert_eq!(child.parent_span_id.as_deref(), Some(parent_id.as_str()));
550 assert_eq!(child.trace_id, trace_id);
551
552 let parent = spans.iter().find(|s| s.span_id == parent_id).unwrap();
553 assert!(parent.parent_span_id.is_none());
554 }
555
556 #[test]
557 fn export_traces_produces_valid_json() {
558 let mut log = EventLog::new();
559 let trace_id = "trace-3".to_string();
560
561 let root = log.begin_span(
562 "proposal.execute",
563 &trace_id,
564 None,
565 [("proposal_id".to_string(), Value::from("p1"))].into(),
566 );
567 let child = log.begin_span(
568 "action.tool_call",
569 &trace_id,
570 Some(&root),
571 [("tool".to_string(), Value::from("read_file"))].into(),
572 );
573 log.end_span(&child, SpanStatus::Ok);
574 log.end_span(&root, SpanStatus::Ok);
575
576 let json_str = log.export_traces();
577 let parsed: Value =
578 serde_json::from_str(&json_str).expect("export_traces must produce valid JSON");
579
580 let resource_spans = parsed["resourceSpans"].as_array().unwrap();
581 assert_eq!(resource_spans.len(), 1);
582
583 let scope_spans = &resource_spans[0]["scopeSpans"][0]["spans"];
584 let spans_arr = scope_spans.as_array().unwrap();
585 assert_eq!(spans_arr.len(), 2);
586
587 for span in spans_arr {
589 assert!(span.get("traceId").is_some());
590 assert!(span.get("spanId").is_some());
591 assert!(span.get("name").is_some());
592 assert!(span.get("startTimeUnixNano").is_some());
593 assert!(span.get("endTimeUnixNano").is_some());
594 assert!(span.get("status").is_some());
595 }
596
597 let child_span = spans_arr
599 .iter()
600 .find(|s| s["name"] == "action.tool_call")
601 .unwrap();
602 assert!(child_span.get("parentSpanId").is_some());
603 }
604
605 #[test]
606 fn span_status_set_on_error() {
607 let mut log = EventLog::new();
608 let trace_id = "trace-4".to_string();
609
610 let span_id = log.begin_span("failing.op", &trace_id, None, HashMap::new());
611 log.end_span(&span_id, SpanStatus::Error);
612
613 let spans = log.spans();
614 assert_eq!(spans[0].status, SpanStatus::Error);
615 assert!(spans[0].end_time.is_some());
616 }
617}