Skip to main content

car_eventlog/
lib.rs

1//! Event log with JSONL persistence for Common Agent Runtime.
2//!
3//! Append-only event log. Every runtime operation is recorded here.
4//! Supports optional JSONL journal persistence for replay and audit.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::fs::{self, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "camelCase")]
17pub struct EventLogStats {
18    pub events: usize,
19    pub spans: usize,
20    pub approx_event_bytes: usize,
21    pub approx_span_bytes: usize,
22}
23
24/// Event kinds matching the Python EventKind enum.
25#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum EventKind {
28    ProposalReceived,
29    ActionValidated,
30    ActionRejected,
31    ActionExecuting,
32    ActionSucceeded,
33    ActionFailed,
34    ActionSkipped,
35    ActionRetrying,
36    ActionDeduplicated,
37    PolicyViolation,
38    StateChanged,
39    StateSnapshot,
40    StateRollback,
41    // Skill lifecycle events (SkillRL-inspired)
42    SkillDistilled,
43    SkillEvolved,
44    SkillDeprecated,
45    EvolutionTriggered,
46    /// A provisional skill candidate passed the validation gate and was promoted
47    /// to Active, superseding its incumbent (SkillOpt-inspired — see
48    /// `docs/solutions/gated-skill-optimization.md`).
49    CandidatePromoted,
50    /// A provisional skill candidate failed the validation gate and was rejected
51    /// (recorded in the rejected-edit buffer so it isn't regenerated).
52    CandidateRejected,
53    // Memory consolidation ("dream") events
54    Consolidated,
55    // Replanning events
56    ReplanAttempted,
57    ReplanProposalReceived,
58    ReplanRejected,
59    ReplanExhausted,
60    // Voice turn telemetry — emitted by car-engine's voice_turn dispatch
61    // and the orchestrator. `data` carries `turn_id` (u64) plus
62    // event-specific fields like `text_len`, `error`, `timeout_ms`.
63    VoiceFastTurnStarted,
64    VoiceFastTurnEnded,
65    VoiceSidecarResolved,
66    VoiceSidecarFailed,
67    VoiceSidecarTimedOut,
68    VoiceTurnCancelled,
69    VoiceBridgePlayed,
70    // Per-execution caller / tenant scope (Parslee-ai/car#187 phase 3).
71    // Emitted by Runtime::execute_scoped* once per proposal when the
72    // RuntimeScope carries any identity. `data` carries `caller_id`,
73    // `tenant_id`, and `claims` — exact set depends on what the
74    // dispatcher forwarded. Audit / log analysis correlates actions
75    // back to the caller / tenant that triggered them.
76    SessionScope,
77}
78
79/// Status of a trace span.
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
81#[serde(rename_all = "snake_case")]
82pub enum SpanStatus {
83    Ok,
84    Error,
85    Unset,
86}
87
88/// A trace span representing a unit of work.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Span {
91    pub trace_id: String,
92    pub span_id: String,
93    pub parent_span_id: Option<String>,
94    pub name: String,
95    pub start_time: DateTime<Utc>,
96    pub end_time: Option<DateTime<Utc>>,
97    pub status: SpanStatus,
98    pub attributes: HashMap<String, Value>,
99}
100
101/// A single event in the log.
102#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103pub struct Event {
104    pub kind: EventKind,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub action_id: Option<String>,
107    #[serde(default, skip_serializing_if = "Option::is_none")]
108    pub proposal_id: Option<String>,
109    #[serde(default)]
110    pub data: HashMap<String, Value>,
111    #[serde(default = "Utc::now")]
112    pub timestamp: DateTime<Utc>,
113}
114
115/// Append-only event log with optional JSONL journal.
116pub struct EventLog {
117    events: Vec<Event>,
118    spans: Vec<Span>,
119    journal_path: Option<PathBuf>,
120}
121
122impl EventLog {
123    pub fn new() -> Self {
124        Self {
125            events: Vec::new(),
126            spans: Vec::new(),
127            journal_path: None,
128        }
129    }
130
131    pub fn with_journal(path: PathBuf) -> Self {
132        if let Some(parent) = path.parent() {
133            let _ = fs::create_dir_all(parent);
134        }
135        Self {
136            events: Vec::new(),
137            spans: Vec::new(),
138            journal_path: Some(path),
139        }
140    }
141
142    pub fn append(
143        &mut self,
144        kind: EventKind,
145        action_id: Option<&str>,
146        proposal_id: Option<&str>,
147        data: HashMap<String, Value>,
148    ) -> &Event {
149        let event = Event {
150            kind,
151            action_id: action_id.map(|s| s.to_string()),
152            proposal_id: proposal_id.map(|s| s.to_string()),
153            data,
154            timestamp: Utc::now(),
155        };
156
157        if let Some(ref path) = self.journal_path {
158            if let Ok(json) = serde_json::to_string(&event) {
159                if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
160                    let _ = writeln!(file, "{}", json);
161                }
162            }
163        }
164
165        self.events.push(event);
166        self.events.last().unwrap()
167    }
168
169    pub fn events(&self) -> &[Event] {
170        &self.events
171    }
172
173    pub fn len(&self) -> usize {
174        self.events.len()
175    }
176
177    pub fn span_len(&self) -> usize {
178        self.spans.len()
179    }
180
181    pub fn is_empty(&self) -> bool {
182        self.events.is_empty()
183    }
184
185    pub fn stats(&self) -> EventLogStats {
186        EventLogStats {
187            events: self.events.len(),
188            spans: self.spans.len(),
189            approx_event_bytes: approx_json_bytes(&self.events),
190            approx_span_bytes: approx_json_bytes(&self.spans),
191        }
192    }
193
194    pub fn truncate_events_keep_last(&mut self, keep_last: usize) -> usize {
195        truncate_vec_keep_last(&mut self.events, keep_last)
196    }
197
198    pub fn truncate_spans_keep_last(&mut self, keep_last: usize) -> usize {
199        truncate_vec_keep_last(&mut self.spans, keep_last)
200    }
201
202    pub fn clear(&mut self) -> EventLogStats {
203        let removed = self.stats();
204        self.events.clear();
205        self.events.shrink_to_fit();
206        self.spans.clear();
207        self.spans.shrink_to_fit();
208        removed
209    }
210
211    pub fn filter(&self, kind: Option<&EventKind>, action_id: Option<&str>) -> Vec<&Event> {
212        self.events
213            .iter()
214            .filter(|e| {
215                if let Some(k) = kind {
216                    if &e.kind != k {
217                        return false;
218                    }
219                }
220                if let Some(aid) = action_id {
221                    if e.action_id.as_deref() != Some(aid) {
222                        return false;
223                    }
224                }
225                true
226            })
227            .collect()
228    }
229
230    /// Begin a new trace span. Returns the generated span_id.
231    pub fn begin_span(
232        &mut self,
233        name: &str,
234        trace_id: &str,
235        parent_span_id: Option<&str>,
236        attributes: HashMap<String, Value>,
237    ) -> String {
238        let span_id = Uuid::new_v4().to_string();
239        let span = Span {
240            trace_id: trace_id.to_string(),
241            span_id: span_id.clone(),
242            parent_span_id: parent_span_id.map(|s| s.to_string()),
243            name: name.to_string(),
244            start_time: Utc::now(),
245            end_time: None,
246            status: SpanStatus::Unset,
247            attributes,
248        };
249        self.spans.push(span);
250        span_id
251    }
252
253    /// End an open span by setting its status and end time.
254    pub fn end_span(&mut self, span_id: &str, status: SpanStatus) {
255        if let Some(span) = self.spans.iter_mut().find(|s| s.span_id == span_id) {
256            span.end_time = Some(Utc::now());
257            span.status = status;
258        }
259    }
260
261    /// Return all spans.
262    pub fn spans(&self) -> Vec<Span> {
263        self.spans.clone()
264    }
265
266    /// Export traces as OTLP-compatible JSON.
267    pub fn export_traces(&self) -> String {
268        // Group spans by trace_id
269        let mut traces: HashMap<&str, Vec<&Span>> = HashMap::new();
270        for span in &self.spans {
271            traces.entry(span.trace_id.as_str()).or_default().push(span);
272        }
273
274        let resource_spans: Vec<Value> = traces
275            .into_iter()
276            .map(|(_trace_id, spans)| {
277                let scope_spans = spans
278                    .iter()
279                    .map(|s| {
280                        let mut span_obj = serde_json::json!({
281                            "traceId": s.trace_id,
282                            "spanId": s.span_id,
283                            "name": s.name,
284                            "startTimeUnixNano": s.start_time.timestamp_nanos_opt().unwrap_or(0).to_string(),
285                            "status": {
286                                "code": match s.status {
287                                    SpanStatus::Ok => 1,
288                                    SpanStatus::Error => 2,
289                                    SpanStatus::Unset => 0,
290                                }
291                            },
292                            "attributes": s.attributes.iter().map(|(k, v)| {
293                                serde_json::json!({
294                                    "key": k,
295                                    "value": { "stringValue": v.to_string() }
296                                })
297                            }).collect::<Vec<_>>(),
298                        });
299
300                        if let Some(ref parent) = s.parent_span_id {
301                            span_obj.as_object_mut().unwrap().insert(
302                                "parentSpanId".to_string(),
303                                Value::from(parent.as_str()),
304                            );
305                        }
306                        if let Some(end) = s.end_time {
307                            span_obj.as_object_mut().unwrap().insert(
308                                "endTimeUnixNano".to_string(),
309                                Value::from(end.timestamp_nanos_opt().unwrap_or(0).to_string()),
310                            );
311                        }
312
313                        span_obj
314                    })
315                    .collect::<Vec<_>>();
316
317                serde_json::json!({
318                    "resource": {
319                        "attributes": [
320                            { "key": "service.name", "value": { "stringValue": "car-runtime" } }
321                        ]
322                    },
323                    "scopeSpans": [{
324                        "scope": { "name": "car-eventlog" },
325                        "spans": scope_spans
326                    }]
327                })
328            })
329            .collect();
330
331        serde_json::to_string(&serde_json::json!({
332            "resourceSpans": resource_spans
333        }))
334        .unwrap_or_else(|_| "{}".to_string())
335    }
336
337    /// Load an event log from a JSONL journal file.
338    pub fn load(path: &Path) -> std::io::Result<Self> {
339        let file = fs::File::open(path)?;
340        let reader = BufReader::new(file);
341        let mut events = Vec::new();
342
343        for line in reader.lines() {
344            let line = line?;
345            let line = line.trim();
346            if !line.is_empty() {
347                if let Ok(event) = serde_json::from_str::<Event>(line) {
348                    events.push(event);
349                }
350            }
351        }
352
353        Ok(Self {
354            events,
355            spans: Vec::new(),
356            journal_path: Some(path.to_path_buf()),
357        })
358    }
359}
360
361fn approx_json_bytes<T: Serialize>(value: &T) -> usize {
362    serde_json::to_vec(value)
363        .map(|bytes| bytes.len())
364        .unwrap_or(0)
365}
366
367fn truncate_vec_keep_last<T>(items: &mut Vec<T>, keep_last: usize) -> usize {
368    let len = items.len();
369    if len <= keep_last {
370        return 0;
371    }
372    let removed = len - keep_last;
373    items.drain(..removed);
374    items.shrink_to_fit();
375    removed
376}
377
378impl Default for EventLog {
379    fn default() -> Self {
380        Self::new()
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn append_and_read() {
390        let mut log = EventLog::new();
391        log.append(
392            EventKind::ProposalReceived,
393            None,
394            Some("p1"),
395            [("source".to_string(), Value::from("test"))].into(),
396        );
397        assert_eq!(log.len(), 1);
398        assert_eq!(log.events()[0].kind, EventKind::ProposalReceived);
399    }
400
401    #[test]
402    fn filter_by_kind() {
403        let mut log = EventLog::new();
404        log.append(
405            EventKind::ProposalReceived,
406            None,
407            Some("p1"),
408            HashMap::new(),
409        );
410        log.append(
411            EventKind::ActionValidated,
412            Some("a1"),
413            Some("p1"),
414            HashMap::new(),
415        );
416        log.append(
417            EventKind::ActionSucceeded,
418            Some("a1"),
419            Some("p1"),
420            HashMap::new(),
421        );
422
423        let validated = log.filter(Some(&EventKind::ActionValidated), None);
424        assert_eq!(validated.len(), 1);
425    }
426
427    #[test]
428    fn filter_by_action_id() {
429        let mut log = EventLog::new();
430        log.append(EventKind::ActionValidated, Some("a1"), None, HashMap::new());
431        log.append(EventKind::ActionValidated, Some("a2"), None, HashMap::new());
432
433        let a1_events = log.filter(None, Some("a1"));
434        assert_eq!(a1_events.len(), 1);
435    }
436
437    #[test]
438    fn journal_write_and_reload() {
439        let dir = tempfile::tempdir().unwrap();
440        let journal = dir.path().join("events.jsonl");
441
442        {
443            let mut log = EventLog::with_journal(journal.clone());
444            log.append(
445                EventKind::ProposalReceived,
446                None,
447                Some("p1"),
448                HashMap::new(),
449            );
450            log.append(
451                EventKind::ActionSucceeded,
452                Some("a1"),
453                Some("p1"),
454                HashMap::new(),
455            );
456        }
457
458        assert!(journal.exists());
459
460        let reloaded = EventLog::load(&journal).unwrap();
461        assert_eq!(reloaded.len(), 2);
462        assert_eq!(reloaded.events()[0].kind, EventKind::ProposalReceived);
463        assert_eq!(reloaded.events()[1].kind, EventKind::ActionSucceeded);
464    }
465
466    #[test]
467    fn event_kind_serializes_snake_case() {
468        assert_eq!(
469            serde_json::to_string(&EventKind::ProposalReceived).unwrap(),
470            "\"proposal_received\""
471        );
472        assert_eq!(
473            serde_json::to_string(&EventKind::StateSnapshot).unwrap(),
474            "\"state_snapshot\""
475        );
476    }
477
478    #[test]
479    fn stats_truncate_and_clear_release_retained_entries() {
480        let mut log = EventLog::new();
481        for idx in 0..5 {
482            log.append(
483                EventKind::ActionSucceeded,
484                Some(&format!("a{idx}")),
485                Some("p1"),
486                [("payload".to_string(), Value::from("x".repeat(16)))].into(),
487            );
488            log.begin_span("action.tool_call", "trace", None, HashMap::new());
489        }
490
491        let stats = log.stats();
492        assert_eq!(stats.events, 5);
493        assert_eq!(stats.spans, 5);
494        assert!(stats.approx_event_bytes > 0);
495        assert!(stats.approx_span_bytes > 0);
496
497        assert_eq!(log.truncate_events_keep_last(2), 3);
498        assert_eq!(log.truncate_spans_keep_last(1), 4);
499        assert_eq!(log.len(), 2);
500        assert_eq!(log.span_len(), 1);
501        assert_eq!(log.events()[0].action_id.as_deref(), Some("a3"));
502
503        let removed = log.clear();
504        assert_eq!(removed.events, 2);
505        assert_eq!(removed.spans, 1);
506        assert_eq!(log.len(), 0);
507        assert_eq!(log.span_len(), 0);
508    }
509
510    #[test]
511    fn span_begin_end_lifecycle() {
512        let mut log = EventLog::new();
513        let trace_id = "trace-1".to_string();
514
515        let span_id = log.begin_span(
516            "test.operation",
517            &trace_id,
518            None,
519            [("key".to_string(), Value::from("value"))].into(),
520        );
521
522        let spans = log.spans();
523        assert_eq!(spans.len(), 1);
524        assert_eq!(spans[0].name, "test.operation");
525        assert_eq!(spans[0].trace_id, "trace-1");
526        assert!(spans[0].parent_span_id.is_none());
527        assert!(spans[0].end_time.is_none());
528        assert_eq!(spans[0].status, SpanStatus::Unset);
529
530        log.end_span(&span_id, SpanStatus::Ok);
531
532        let spans = log.spans();
533        assert!(spans[0].end_time.is_some());
534        assert_eq!(spans[0].status, SpanStatus::Ok);
535    }
536
537    #[test]
538    fn span_parent_child_relationship() {
539        let mut log = EventLog::new();
540        let trace_id = "trace-2".to_string();
541
542        let parent_id = log.begin_span("parent.op", &trace_id, None, HashMap::new());
543        let child_id = log.begin_span("child.op", &trace_id, Some(&parent_id), HashMap::new());
544
545        let spans = log.spans();
546        assert_eq!(spans.len(), 2);
547
548        let child = spans.iter().find(|s| s.span_id == child_id).unwrap();
549        assert_eq!(child.parent_span_id.as_deref(), Some(parent_id.as_str()));
550        assert_eq!(child.trace_id, trace_id);
551
552        let parent = spans.iter().find(|s| s.span_id == parent_id).unwrap();
553        assert!(parent.parent_span_id.is_none());
554    }
555
556    #[test]
557    fn export_traces_produces_valid_json() {
558        let mut log = EventLog::new();
559        let trace_id = "trace-3".to_string();
560
561        let root = log.begin_span(
562            "proposal.execute",
563            &trace_id,
564            None,
565            [("proposal_id".to_string(), Value::from("p1"))].into(),
566        );
567        let child = log.begin_span(
568            "action.tool_call",
569            &trace_id,
570            Some(&root),
571            [("tool".to_string(), Value::from("read_file"))].into(),
572        );
573        log.end_span(&child, SpanStatus::Ok);
574        log.end_span(&root, SpanStatus::Ok);
575
576        let json_str = log.export_traces();
577        let parsed: Value =
578            serde_json::from_str(&json_str).expect("export_traces must produce valid JSON");
579
580        let resource_spans = parsed["resourceSpans"].as_array().unwrap();
581        assert_eq!(resource_spans.len(), 1);
582
583        let scope_spans = &resource_spans[0]["scopeSpans"][0]["spans"];
584        let spans_arr = scope_spans.as_array().unwrap();
585        assert_eq!(spans_arr.len(), 2);
586
587        // Verify OTLP structure
588        for span in spans_arr {
589            assert!(span.get("traceId").is_some());
590            assert!(span.get("spanId").is_some());
591            assert!(span.get("name").is_some());
592            assert!(span.get("startTimeUnixNano").is_some());
593            assert!(span.get("endTimeUnixNano").is_some());
594            assert!(span.get("status").is_some());
595        }
596
597        // Verify the child has parentSpanId
598        let child_span = spans_arr
599            .iter()
600            .find(|s| s["name"] == "action.tool_call")
601            .unwrap();
602        assert!(child_span.get("parentSpanId").is_some());
603    }
604
605    #[test]
606    fn span_status_set_on_error() {
607        let mut log = EventLog::new();
608        let trace_id = "trace-4".to_string();
609
610        let span_id = log.begin_span("failing.op", &trace_id, None, HashMap::new());
611        log.end_span(&span_id, SpanStatus::Error);
612
613        let spans = log.spans();
614        assert_eq!(spans[0].status, SpanStatus::Error);
615        assert!(spans[0].end_time.is_some());
616    }
617}