use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EventLogStats {
pub events: usize,
pub spans: usize,
pub approx_event_bytes: usize,
pub approx_span_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventKind {
ProposalReceived,
ActionValidated,
ActionRejected,
ActionExecuting,
ActionSucceeded,
ActionFailed,
ActionSkipped,
ActionRetrying,
ActionDeduplicated,
PolicyViolation,
StateChanged,
StateSnapshot,
StateRollback,
SkillDistilled,
SkillEvolved,
SkillDeprecated,
EvolutionTriggered,
Consolidated,
ReplanAttempted,
ReplanProposalReceived,
ReplanRejected,
ReplanExhausted,
VoiceFastTurnStarted,
VoiceFastTurnEnded,
VoiceSidecarResolved,
VoiceSidecarFailed,
VoiceSidecarTimedOut,
VoiceTurnCancelled,
VoiceBridgePlayed,
SessionScope,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum SpanStatus {
Ok,
Error,
Unset,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Span {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub name: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub status: SpanStatus,
pub attributes: HashMap<String, Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Event {
pub kind: EventKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub action_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub proposal_id: Option<String>,
#[serde(default)]
pub data: HashMap<String, Value>,
#[serde(default = "Utc::now")]
pub timestamp: DateTime<Utc>,
}
pub struct EventLog {
events: Vec<Event>,
spans: Vec<Span>,
journal_path: Option<PathBuf>,
}
impl EventLog {
pub fn new() -> Self {
Self {
events: Vec::new(),
spans: Vec::new(),
journal_path: None,
}
}
pub fn with_journal(path: PathBuf) -> Self {
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
Self {
events: Vec::new(),
spans: Vec::new(),
journal_path: Some(path),
}
}
pub fn append(
&mut self,
kind: EventKind,
action_id: Option<&str>,
proposal_id: Option<&str>,
data: HashMap<String, Value>,
) -> &Event {
let event = Event {
kind,
action_id: action_id.map(|s| s.to_string()),
proposal_id: proposal_id.map(|s| s.to_string()),
data,
timestamp: Utc::now(),
};
if let Some(ref path) = self.journal_path {
if let Ok(json) = serde_json::to_string(&event) {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "{}", json);
}
}
}
self.events.push(event);
self.events.last().unwrap()
}
pub fn events(&self) -> &[Event] {
&self.events
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn span_len(&self) -> usize {
self.spans.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn stats(&self) -> EventLogStats {
EventLogStats {
events: self.events.len(),
spans: self.spans.len(),
approx_event_bytes: approx_json_bytes(&self.events),
approx_span_bytes: approx_json_bytes(&self.spans),
}
}
pub fn truncate_events_keep_last(&mut self, keep_last: usize) -> usize {
truncate_vec_keep_last(&mut self.events, keep_last)
}
pub fn truncate_spans_keep_last(&mut self, keep_last: usize) -> usize {
truncate_vec_keep_last(&mut self.spans, keep_last)
}
pub fn clear(&mut self) -> EventLogStats {
let removed = self.stats();
self.events.clear();
self.events.shrink_to_fit();
self.spans.clear();
self.spans.shrink_to_fit();
removed
}
pub fn filter(&self, kind: Option<&EventKind>, action_id: Option<&str>) -> Vec<&Event> {
self.events
.iter()
.filter(|e| {
if let Some(k) = kind {
if &e.kind != k {
return false;
}
}
if let Some(aid) = action_id {
if e.action_id.as_deref() != Some(aid) {
return false;
}
}
true
})
.collect()
}
pub fn begin_span(
&mut self,
name: &str,
trace_id: &str,
parent_span_id: Option<&str>,
attributes: HashMap<String, Value>,
) -> String {
let span_id = Uuid::new_v4().to_string();
let span = Span {
trace_id: trace_id.to_string(),
span_id: span_id.clone(),
parent_span_id: parent_span_id.map(|s| s.to_string()),
name: name.to_string(),
start_time: Utc::now(),
end_time: None,
status: SpanStatus::Unset,
attributes,
};
self.spans.push(span);
span_id
}
pub fn end_span(&mut self, span_id: &str, status: SpanStatus) {
if let Some(span) = self.spans.iter_mut().find(|s| s.span_id == span_id) {
span.end_time = Some(Utc::now());
span.status = status;
}
}
pub fn spans(&self) -> Vec<Span> {
self.spans.clone()
}
pub fn export_traces(&self) -> String {
let mut traces: HashMap<&str, Vec<&Span>> = HashMap::new();
for span in &self.spans {
traces.entry(span.trace_id.as_str()).or_default().push(span);
}
let resource_spans: Vec<Value> = traces
.into_iter()
.map(|(_trace_id, spans)| {
let scope_spans = spans
.iter()
.map(|s| {
let mut span_obj = serde_json::json!({
"traceId": s.trace_id,
"spanId": s.span_id,
"name": s.name,
"startTimeUnixNano": s.start_time.timestamp_nanos_opt().unwrap_or(0).to_string(),
"status": {
"code": match s.status {
SpanStatus::Ok => 1,
SpanStatus::Error => 2,
SpanStatus::Unset => 0,
}
},
"attributes": s.attributes.iter().map(|(k, v)| {
serde_json::json!({
"key": k,
"value": { "stringValue": v.to_string() }
})
}).collect::<Vec<_>>(),
});
if let Some(ref parent) = s.parent_span_id {
span_obj.as_object_mut().unwrap().insert(
"parentSpanId".to_string(),
Value::from(parent.as_str()),
);
}
if let Some(end) = s.end_time {
span_obj.as_object_mut().unwrap().insert(
"endTimeUnixNano".to_string(),
Value::from(end.timestamp_nanos_opt().unwrap_or(0).to_string()),
);
}
span_obj
})
.collect::<Vec<_>>();
serde_json::json!({
"resource": {
"attributes": [
{ "key": "service.name", "value": { "stringValue": "car-runtime" } }
]
},
"scopeSpans": [{
"scope": { "name": "car-eventlog" },
"spans": scope_spans
}]
})
})
.collect();
serde_json::to_string(&serde_json::json!({
"resourceSpans": resource_spans
}))
.unwrap_or_else(|_| "{}".to_string())
}
pub fn load(path: &Path) -> std::io::Result<Self> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut events = Vec::new();
for line in reader.lines() {
let line = line?;
let line = line.trim();
if !line.is_empty() {
if let Ok(event) = serde_json::from_str::<Event>(line) {
events.push(event);
}
}
}
Ok(Self {
events,
spans: Vec::new(),
journal_path: Some(path.to_path_buf()),
})
}
}
fn approx_json_bytes<T: Serialize>(value: &T) -> usize {
serde_json::to_vec(value)
.map(|bytes| bytes.len())
.unwrap_or(0)
}
fn truncate_vec_keep_last<T>(items: &mut Vec<T>, keep_last: usize) -> usize {
let len = items.len();
if len <= keep_last {
return 0;
}
let removed = len - keep_last;
items.drain(..removed);
items.shrink_to_fit();
removed
}
impl Default for EventLog {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_read() {
let mut log = EventLog::new();
log.append(
EventKind::ProposalReceived,
None,
Some("p1"),
[("source".to_string(), Value::from("test"))].into(),
);
assert_eq!(log.len(), 1);
assert_eq!(log.events()[0].kind, EventKind::ProposalReceived);
}
#[test]
fn filter_by_kind() {
let mut log = EventLog::new();
log.append(
EventKind::ProposalReceived,
None,
Some("p1"),
HashMap::new(),
);
log.append(
EventKind::ActionValidated,
Some("a1"),
Some("p1"),
HashMap::new(),
);
log.append(
EventKind::ActionSucceeded,
Some("a1"),
Some("p1"),
HashMap::new(),
);
let validated = log.filter(Some(&EventKind::ActionValidated), None);
assert_eq!(validated.len(), 1);
}
#[test]
fn filter_by_action_id() {
let mut log = EventLog::new();
log.append(EventKind::ActionValidated, Some("a1"), None, HashMap::new());
log.append(EventKind::ActionValidated, Some("a2"), None, HashMap::new());
let a1_events = log.filter(None, Some("a1"));
assert_eq!(a1_events.len(), 1);
}
#[test]
fn journal_write_and_reload() {
let dir = tempfile::tempdir().unwrap();
let journal = dir.path().join("events.jsonl");
{
let mut log = EventLog::with_journal(journal.clone());
log.append(
EventKind::ProposalReceived,
None,
Some("p1"),
HashMap::new(),
);
log.append(
EventKind::ActionSucceeded,
Some("a1"),
Some("p1"),
HashMap::new(),
);
}
assert!(journal.exists());
let reloaded = EventLog::load(&journal).unwrap();
assert_eq!(reloaded.len(), 2);
assert_eq!(reloaded.events()[0].kind, EventKind::ProposalReceived);
assert_eq!(reloaded.events()[1].kind, EventKind::ActionSucceeded);
}
#[test]
fn event_kind_serializes_snake_case() {
assert_eq!(
serde_json::to_string(&EventKind::ProposalReceived).unwrap(),
"\"proposal_received\""
);
assert_eq!(
serde_json::to_string(&EventKind::StateSnapshot).unwrap(),
"\"state_snapshot\""
);
}
#[test]
fn stats_truncate_and_clear_release_retained_entries() {
let mut log = EventLog::new();
for idx in 0..5 {
log.append(
EventKind::ActionSucceeded,
Some(&format!("a{idx}")),
Some("p1"),
[("payload".to_string(), Value::from("x".repeat(16)))].into(),
);
log.begin_span("action.tool_call", "trace", None, HashMap::new());
}
let stats = log.stats();
assert_eq!(stats.events, 5);
assert_eq!(stats.spans, 5);
assert!(stats.approx_event_bytes > 0);
assert!(stats.approx_span_bytes > 0);
assert_eq!(log.truncate_events_keep_last(2), 3);
assert_eq!(log.truncate_spans_keep_last(1), 4);
assert_eq!(log.len(), 2);
assert_eq!(log.span_len(), 1);
assert_eq!(log.events()[0].action_id.as_deref(), Some("a3"));
let removed = log.clear();
assert_eq!(removed.events, 2);
assert_eq!(removed.spans, 1);
assert_eq!(log.len(), 0);
assert_eq!(log.span_len(), 0);
}
#[test]
fn span_begin_end_lifecycle() {
let mut log = EventLog::new();
let trace_id = "trace-1".to_string();
let span_id = log.begin_span(
"test.operation",
&trace_id,
None,
[("key".to_string(), Value::from("value"))].into(),
);
let spans = log.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name, "test.operation");
assert_eq!(spans[0].trace_id, "trace-1");
assert!(spans[0].parent_span_id.is_none());
assert!(spans[0].end_time.is_none());
assert_eq!(spans[0].status, SpanStatus::Unset);
log.end_span(&span_id, SpanStatus::Ok);
let spans = log.spans();
assert!(spans[0].end_time.is_some());
assert_eq!(spans[0].status, SpanStatus::Ok);
}
#[test]
fn span_parent_child_relationship() {
let mut log = EventLog::new();
let trace_id = "trace-2".to_string();
let parent_id = log.begin_span("parent.op", &trace_id, None, HashMap::new());
let child_id = log.begin_span("child.op", &trace_id, Some(&parent_id), HashMap::new());
let spans = log.spans();
assert_eq!(spans.len(), 2);
let child = spans.iter().find(|s| s.span_id == child_id).unwrap();
assert_eq!(child.parent_span_id.as_deref(), Some(parent_id.as_str()));
assert_eq!(child.trace_id, trace_id);
let parent = spans.iter().find(|s| s.span_id == parent_id).unwrap();
assert!(parent.parent_span_id.is_none());
}
#[test]
fn export_traces_produces_valid_json() {
let mut log = EventLog::new();
let trace_id = "trace-3".to_string();
let root = log.begin_span(
"proposal.execute",
&trace_id,
None,
[("proposal_id".to_string(), Value::from("p1"))].into(),
);
let child = log.begin_span(
"action.tool_call",
&trace_id,
Some(&root),
[("tool".to_string(), Value::from("read_file"))].into(),
);
log.end_span(&child, SpanStatus::Ok);
log.end_span(&root, SpanStatus::Ok);
let json_str = log.export_traces();
let parsed: Value =
serde_json::from_str(&json_str).expect("export_traces must produce valid JSON");
let resource_spans = parsed["resourceSpans"].as_array().unwrap();
assert_eq!(resource_spans.len(), 1);
let scope_spans = &resource_spans[0]["scopeSpans"][0]["spans"];
let spans_arr = scope_spans.as_array().unwrap();
assert_eq!(spans_arr.len(), 2);
for span in spans_arr {
assert!(span.get("traceId").is_some());
assert!(span.get("spanId").is_some());
assert!(span.get("name").is_some());
assert!(span.get("startTimeUnixNano").is_some());
assert!(span.get("endTimeUnixNano").is_some());
assert!(span.get("status").is_some());
}
let child_span = spans_arr
.iter()
.find(|s| s["name"] == "action.tool_call")
.unwrap();
assert!(child_span.get("parentSpanId").is_some());
}
#[test]
fn span_status_set_on_error() {
let mut log = EventLog::new();
let trace_id = "trace-4".to_string();
let span_id = log.begin_span("failing.op", &trace_id, None, HashMap::new());
log.end_span(&span_id, SpanStatus::Error);
let spans = log.spans();
assert_eq!(spans[0].status, SpanStatus::Error);
assert!(spans[0].end_time.is_some());
}
}