Skip to main content

car_eventlog/
lib.rs

1//! Event log with JSONL persistence for Common Agent Runtime.
2//!
3//! Append-only event log. Every runtime operation is recorded here.
4//! Supports optional JSONL journal persistence for replay and audit.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::fs::{self, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct EventLogStats {
18    pub events: usize,
19    pub spans: usize,
20    pub approx_event_bytes: usize,
21    pub approx_span_bytes: usize,
22}
23
24/// Event kinds matching the Python EventKind enum.
25#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum EventKind {
28    ProposalReceived,
29    ActionValidated,
30    ActionRejected,
31    ActionExecuting,
32    ActionSucceeded,
33    ActionFailed,
34    ActionSkipped,
35    ActionRetrying,
36    ActionDeduplicated,
37    PolicyViolation,
38    StateChanged,
39    StateSnapshot,
40    StateRollback,
41    // Skill lifecycle events (SkillRL-inspired)
42    SkillDistilled,
43    SkillEvolved,
44    SkillDeprecated,
45    EvolutionTriggered,
46    // Memory consolidation ("dream") events
47    Consolidated,
48    // Replanning events
49    ReplanAttempted,
50    ReplanProposalReceived,
51    ReplanRejected,
52    ReplanExhausted,
53    // Voice turn telemetry — emitted by car-engine's voice_turn dispatch
54    // and the orchestrator. `data` carries `turn_id` (u64) plus
55    // event-specific fields like `text_len`, `error`, `timeout_ms`.
56    VoiceFastTurnStarted,
57    VoiceFastTurnEnded,
58    VoiceSidecarResolved,
59    VoiceSidecarFailed,
60    VoiceSidecarTimedOut,
61    VoiceTurnCancelled,
62    VoiceBridgePlayed,
63    // Per-execution caller / tenant scope (Parslee-ai/car#187 phase 3).
64    // Emitted by Runtime::execute_scoped* once per proposal when the
65    // RuntimeScope carries any identity. `data` carries `caller_id`,
66    // `tenant_id`, and `claims` — exact set depends on what the
67    // dispatcher forwarded. Audit / log analysis correlates actions
68    // back to the caller / tenant that triggered them.
69    SessionScope,
70}
71
72/// Status of a trace span.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74#[serde(rename_all = "snake_case")]
75pub enum SpanStatus {
76    Ok,
77    Error,
78    Unset,
79}
80
81/// A trace span representing a unit of work.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Span {
84    pub trace_id: String,
85    pub span_id: String,
86    pub parent_span_id: Option<String>,
87    pub name: String,
88    pub start_time: DateTime<Utc>,
89    pub end_time: Option<DateTime<Utc>>,
90    pub status: SpanStatus,
91    pub attributes: HashMap<String, Value>,
92}
93
94/// A single event in the log.
95#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96pub struct Event {
97    pub kind: EventKind,
98    #[serde(default, skip_serializing_if = "Option::is_none")]
99    pub action_id: Option<String>,
100    #[serde(default, skip_serializing_if = "Option::is_none")]
101    pub proposal_id: Option<String>,
102    #[serde(default)]
103    pub data: HashMap<String, Value>,
104    #[serde(default = "Utc::now")]
105    pub timestamp: DateTime<Utc>,
106}
107
108/// Append-only event log with optional JSONL journal.
109pub struct EventLog {
110    events: Vec<Event>,
111    spans: Vec<Span>,
112    journal_path: Option<PathBuf>,
113}
114
115impl EventLog {
116    pub fn new() -> Self {
117        Self {
118            events: Vec::new(),
119            spans: Vec::new(),
120            journal_path: None,
121        }
122    }
123
124    pub fn with_journal(path: PathBuf) -> Self {
125        if let Some(parent) = path.parent() {
126            let _ = fs::create_dir_all(parent);
127        }
128        Self {
129            events: Vec::new(),
130            spans: Vec::new(),
131            journal_path: Some(path),
132        }
133    }
134
135    pub fn append(
136        &mut self,
137        kind: EventKind,
138        action_id: Option<&str>,
139        proposal_id: Option<&str>,
140        data: HashMap<String, Value>,
141    ) -> &Event {
142        let event = Event {
143            kind,
144            action_id: action_id.map(|s| s.to_string()),
145            proposal_id: proposal_id.map(|s| s.to_string()),
146            data,
147            timestamp: Utc::now(),
148        };
149
150        if let Some(ref path) = self.journal_path {
151            if let Ok(json) = serde_json::to_string(&event) {
152                if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
153                    let _ = writeln!(file, "{}", json);
154                }
155            }
156        }
157
158        self.events.push(event);
159        self.events.last().unwrap()
160    }
161
162    pub fn events(&self) -> &[Event] {
163        &self.events
164    }
165
166    pub fn len(&self) -> usize {
167        self.events.len()
168    }
169
170    pub fn span_len(&self) -> usize {
171        self.spans.len()
172    }
173
174    pub fn is_empty(&self) -> bool {
175        self.events.is_empty()
176    }
177
178    pub fn stats(&self) -> EventLogStats {
179        EventLogStats {
180            events: self.events.len(),
181            spans: self.spans.len(),
182            approx_event_bytes: approx_json_bytes(&self.events),
183            approx_span_bytes: approx_json_bytes(&self.spans),
184        }
185    }
186
187    pub fn truncate_events_keep_last(&mut self, keep_last: usize) -> usize {
188        truncate_vec_keep_last(&mut self.events, keep_last)
189    }
190
191    pub fn truncate_spans_keep_last(&mut self, keep_last: usize) -> usize {
192        truncate_vec_keep_last(&mut self.spans, keep_last)
193    }
194
195    pub fn clear(&mut self) -> EventLogStats {
196        let removed = self.stats();
197        self.events.clear();
198        self.events.shrink_to_fit();
199        self.spans.clear();
200        self.spans.shrink_to_fit();
201        removed
202    }
203
204    pub fn filter(&self, kind: Option<&EventKind>, action_id: Option<&str>) -> Vec<&Event> {
205        self.events
206            .iter()
207            .filter(|e| {
208                if let Some(k) = kind {
209                    if &e.kind != k {
210                        return false;
211                    }
212                }
213                if let Some(aid) = action_id {
214                    if e.action_id.as_deref() != Some(aid) {
215                        return false;
216                    }
217                }
218                true
219            })
220            .collect()
221    }
222
223    /// Begin a new trace span. Returns the generated span_id.
224    pub fn begin_span(
225        &mut self,
226        name: &str,
227        trace_id: &str,
228        parent_span_id: Option<&str>,
229        attributes: HashMap<String, Value>,
230    ) -> String {
231        let span_id = Uuid::new_v4().to_string();
232        let span = Span {
233            trace_id: trace_id.to_string(),
234            span_id: span_id.clone(),
235            parent_span_id: parent_span_id.map(|s| s.to_string()),
236            name: name.to_string(),
237            start_time: Utc::now(),
238            end_time: None,
239            status: SpanStatus::Unset,
240            attributes,
241        };
242        self.spans.push(span);
243        span_id
244    }
245
246    /// End an open span by setting its status and end time.
247    pub fn end_span(&mut self, span_id: &str, status: SpanStatus) {
248        if let Some(span) = self.spans.iter_mut().find(|s| s.span_id == span_id) {
249            span.end_time = Some(Utc::now());
250            span.status = status;
251        }
252    }
253
254    /// Return all spans.
255    pub fn spans(&self) -> Vec<Span> {
256        self.spans.clone()
257    }
258
259    /// Export traces as OTLP-compatible JSON.
260    pub fn export_traces(&self) -> String {
261        // Group spans by trace_id
262        let mut traces: HashMap<&str, Vec<&Span>> = HashMap::new();
263        for span in &self.spans {
264            traces.entry(span.trace_id.as_str()).or_default().push(span);
265        }
266
267        let resource_spans: Vec<Value> = traces
268            .into_iter()
269            .map(|(_trace_id, spans)| {
270                let scope_spans = spans
271                    .iter()
272                    .map(|s| {
273                        let mut span_obj = serde_json::json!({
274                            "traceId": s.trace_id,
275                            "spanId": s.span_id,
276                            "name": s.name,
277                            "startTimeUnixNano": s.start_time.timestamp_nanos_opt().unwrap_or(0).to_string(),
278                            "status": {
279                                "code": match s.status {
280                                    SpanStatus::Ok => 1,
281                                    SpanStatus::Error => 2,
282                                    SpanStatus::Unset => 0,
283                                }
284                            },
285                            "attributes": s.attributes.iter().map(|(k, v)| {
286                                serde_json::json!({
287                                    "key": k,
288                                    "value": { "stringValue": v.to_string() }
289                                })
290                            }).collect::<Vec<_>>(),
291                        });
292
293                        if let Some(ref parent) = s.parent_span_id {
294                            span_obj.as_object_mut().unwrap().insert(
295                                "parentSpanId".to_string(),
296                                Value::from(parent.as_str()),
297                            );
298                        }
299                        if let Some(end) = s.end_time {
300                            span_obj.as_object_mut().unwrap().insert(
301                                "endTimeUnixNano".to_string(),
302                                Value::from(end.timestamp_nanos_opt().unwrap_or(0).to_string()),
303                            );
304                        }
305
306                        span_obj
307                    })
308                    .collect::<Vec<_>>();
309
310                serde_json::json!({
311                    "resource": {
312                        "attributes": [
313                            { "key": "service.name", "value": { "stringValue": "car-runtime" } }
314                        ]
315                    },
316                    "scopeSpans": [{
317                        "scope": { "name": "car-eventlog" },
318                        "spans": scope_spans
319                    }]
320                })
321            })
322            .collect();
323
324        serde_json::to_string(&serde_json::json!({
325            "resourceSpans": resource_spans
326        }))
327        .unwrap_or_else(|_| "{}".to_string())
328    }
329
330    /// Load an event log from a JSONL journal file.
331    pub fn load(path: &Path) -> std::io::Result<Self> {
332        let file = fs::File::open(path)?;
333        let reader = BufReader::new(file);
334        let mut events = Vec::new();
335
336        for line in reader.lines() {
337            let line = line?;
338            let line = line.trim();
339            if !line.is_empty() {
340                if let Ok(event) = serde_json::from_str::<Event>(line) {
341                    events.push(event);
342                }
343            }
344        }
345
346        Ok(Self {
347            events,
348            spans: Vec::new(),
349            journal_path: Some(path.to_path_buf()),
350        })
351    }
352}
353
354fn approx_json_bytes<T: Serialize>(value: &T) -> usize {
355    serde_json::to_vec(value)
356        .map(|bytes| bytes.len())
357        .unwrap_or(0)
358}
359
360fn truncate_vec_keep_last<T>(items: &mut Vec<T>, keep_last: usize) -> usize {
361    let len = items.len();
362    if len <= keep_last {
363        return 0;
364    }
365    let removed = len - keep_last;
366    items.drain(..removed);
367    items.shrink_to_fit();
368    removed
369}
370
371impl Default for EventLog {
372    fn default() -> Self {
373        Self::new()
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn append_and_read() {
383        let mut log = EventLog::new();
384        log.append(
385            EventKind::ProposalReceived,
386            None,
387            Some("p1"),
388            [("source".to_string(), Value::from("test"))].into(),
389        );
390        assert_eq!(log.len(), 1);
391        assert_eq!(log.events()[0].kind, EventKind::ProposalReceived);
392    }
393
394    #[test]
395    fn filter_by_kind() {
396        let mut log = EventLog::new();
397        log.append(
398            EventKind::ProposalReceived,
399            None,
400            Some("p1"),
401            HashMap::new(),
402        );
403        log.append(
404            EventKind::ActionValidated,
405            Some("a1"),
406            Some("p1"),
407            HashMap::new(),
408        );
409        log.append(
410            EventKind::ActionSucceeded,
411            Some("a1"),
412            Some("p1"),
413            HashMap::new(),
414        );
415
416        let validated = log.filter(Some(&EventKind::ActionValidated), None);
417        assert_eq!(validated.len(), 1);
418    }
419
420    #[test]
421    fn filter_by_action_id() {
422        let mut log = EventLog::new();
423        log.append(EventKind::ActionValidated, Some("a1"), None, HashMap::new());
424        log.append(EventKind::ActionValidated, Some("a2"), None, HashMap::new());
425
426        let a1_events = log.filter(None, Some("a1"));
427        assert_eq!(a1_events.len(), 1);
428    }
429
430    #[test]
431    fn journal_write_and_reload() {
432        let dir = tempfile::tempdir().unwrap();
433        let journal = dir.path().join("events.jsonl");
434
435        {
436            let mut log = EventLog::with_journal(journal.clone());
437            log.append(
438                EventKind::ProposalReceived,
439                None,
440                Some("p1"),
441                HashMap::new(),
442            );
443            log.append(
444                EventKind::ActionSucceeded,
445                Some("a1"),
446                Some("p1"),
447                HashMap::new(),
448            );
449        }
450
451        assert!(journal.exists());
452
453        let reloaded = EventLog::load(&journal).unwrap();
454        assert_eq!(reloaded.len(), 2);
455        assert_eq!(reloaded.events()[0].kind, EventKind::ProposalReceived);
456        assert_eq!(reloaded.events()[1].kind, EventKind::ActionSucceeded);
457    }
458
459    #[test]
460    fn event_kind_serializes_snake_case() {
461        assert_eq!(
462            serde_json::to_string(&EventKind::ProposalReceived).unwrap(),
463            "\"proposal_received\""
464        );
465        assert_eq!(
466            serde_json::to_string(&EventKind::StateSnapshot).unwrap(),
467            "\"state_snapshot\""
468        );
469    }
470
471    #[test]
472    fn stats_truncate_and_clear_release_retained_entries() {
473        let mut log = EventLog::new();
474        for idx in 0..5 {
475            log.append(
476                EventKind::ActionSucceeded,
477                Some(&format!("a{idx}")),
478                Some("p1"),
479                [("payload".to_string(), Value::from("x".repeat(16)))].into(),
480            );
481            log.begin_span("action.tool_call", "trace", None, HashMap::new());
482        }
483
484        let stats = log.stats();
485        assert_eq!(stats.events, 5);
486        assert_eq!(stats.spans, 5);
487        assert!(stats.approx_event_bytes > 0);
488        assert!(stats.approx_span_bytes > 0);
489
490        assert_eq!(log.truncate_events_keep_last(2), 3);
491        assert_eq!(log.truncate_spans_keep_last(1), 4);
492        assert_eq!(log.len(), 2);
493        assert_eq!(log.span_len(), 1);
494        assert_eq!(log.events()[0].action_id.as_deref(), Some("a3"));
495
496        let removed = log.clear();
497        assert_eq!(removed.events, 2);
498        assert_eq!(removed.spans, 1);
499        assert_eq!(log.len(), 0);
500        assert_eq!(log.span_len(), 0);
501    }
502
503    #[test]
504    fn span_begin_end_lifecycle() {
505        let mut log = EventLog::new();
506        let trace_id = "trace-1".to_string();
507
508        let span_id = log.begin_span(
509            "test.operation",
510            &trace_id,
511            None,
512            [("key".to_string(), Value::from("value"))].into(),
513        );
514
515        let spans = log.spans();
516        assert_eq!(spans.len(), 1);
517        assert_eq!(spans[0].name, "test.operation");
518        assert_eq!(spans[0].trace_id, "trace-1");
519        assert!(spans[0].parent_span_id.is_none());
520        assert!(spans[0].end_time.is_none());
521        assert_eq!(spans[0].status, SpanStatus::Unset);
522
523        log.end_span(&span_id, SpanStatus::Ok);
524
525        let spans = log.spans();
526        assert!(spans[0].end_time.is_some());
527        assert_eq!(spans[0].status, SpanStatus::Ok);
528    }
529
530    #[test]
531    fn span_parent_child_relationship() {
532        let mut log = EventLog::new();
533        let trace_id = "trace-2".to_string();
534
535        let parent_id = log.begin_span("parent.op", &trace_id, None, HashMap::new());
536        let child_id = log.begin_span("child.op", &trace_id, Some(&parent_id), HashMap::new());
537
538        let spans = log.spans();
539        assert_eq!(spans.len(), 2);
540
541        let child = spans.iter().find(|s| s.span_id == child_id).unwrap();
542        assert_eq!(child.parent_span_id.as_deref(), Some(parent_id.as_str()));
543        assert_eq!(child.trace_id, trace_id);
544
545        let parent = spans.iter().find(|s| s.span_id == parent_id).unwrap();
546        assert!(parent.parent_span_id.is_none());
547    }
548
549    #[test]
550    fn export_traces_produces_valid_json() {
551        let mut log = EventLog::new();
552        let trace_id = "trace-3".to_string();
553
554        let root = log.begin_span(
555            "proposal.execute",
556            &trace_id,
557            None,
558            [("proposal_id".to_string(), Value::from("p1"))].into(),
559        );
560        let child = log.begin_span(
561            "action.tool_call",
562            &trace_id,
563            Some(&root),
564            [("tool".to_string(), Value::from("read_file"))].into(),
565        );
566        log.end_span(&child, SpanStatus::Ok);
567        log.end_span(&root, SpanStatus::Ok);
568
569        let json_str = log.export_traces();
570        let parsed: Value =
571            serde_json::from_str(&json_str).expect("export_traces must produce valid JSON");
572
573        let resource_spans = parsed["resourceSpans"].as_array().unwrap();
574        assert_eq!(resource_spans.len(), 1);
575
576        let scope_spans = &resource_spans[0]["scopeSpans"][0]["spans"];
577        let spans_arr = scope_spans.as_array().unwrap();
578        assert_eq!(spans_arr.len(), 2);
579
580        // Verify OTLP structure
581        for span in spans_arr {
582            assert!(span.get("traceId").is_some());
583            assert!(span.get("spanId").is_some());
584            assert!(span.get("name").is_some());
585            assert!(span.get("startTimeUnixNano").is_some());
586            assert!(span.get("endTimeUnixNano").is_some());
587            assert!(span.get("status").is_some());
588        }
589
590        // Verify the child has parentSpanId
591        let child_span = spans_arr
592            .iter()
593            .find(|s| s["name"] == "action.tool_call")
594            .unwrap();
595        assert!(child_span.get("parentSpanId").is_some());
596    }
597
598    #[test]
599    fn span_status_set_on_error() {
600        let mut log = EventLog::new();
601        let trace_id = "trace-4".to_string();
602
603        let span_id = log.begin_span("failing.op", &trace_id, None, HashMap::new());
604        log.end_span(&span_id, SpanStatus::Error);
605
606        let spans = log.spans();
607        assert_eq!(spans[0].status, SpanStatus::Error);
608        assert!(spans[0].end_time.is_some());
609    }
610}