use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
use super::AgentEvent;
pub trait AgentEventSink: Send + Sync {
fn handle_event(&self, event: &AgentEvent);
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersistedAgentEvent {
pub index: u64,
pub emitted_at_ms: i64,
pub frame_depth: Option<u32>,
#[serde(flatten)]
pub event: AgentEvent,
}
pub struct JsonlEventSink {
state: Mutex<JsonlEventSinkState>,
base_path: std::path::PathBuf,
}
struct JsonlEventSinkState {
writer: std::io::BufWriter<std::fs::File>,
index: u64,
bytes_written: u64,
rotation: u32,
}
impl JsonlEventSink {
pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
let base_path = base_path.into();
if let Some(parent) = base_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&base_path)?;
Ok(Arc::new(Self {
state: Mutex::new(JsonlEventSinkState {
writer: std::io::BufWriter::new(file),
index: 0,
bytes_written: 0,
rotation: 0,
}),
base_path,
}))
}
pub fn flush(&self) -> std::io::Result<()> {
use std::io::Write as _;
self.state
.lock()
.expect("jsonl sink mutex poisoned")
.writer
.flush()
}
pub fn event_count(&self) -> u64 {
self.state.lock().expect("jsonl sink mutex poisoned").index
}
fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
use std::io::Write as _;
if state.bytes_written < Self::ROTATE_BYTES {
return Ok(());
}
state.writer.flush()?;
state.rotation += 1;
let suffix = format!("-{:06}", state.rotation);
let rotated = self.base_path.with_file_name({
let stem = self
.base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("event_log");
let ext = self
.base_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("jsonl");
format!("{stem}{suffix}.{ext}")
});
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&rotated)?;
state.writer = std::io::BufWriter::new(file);
state.bytes_written = 0;
Ok(())
}
}
pub struct EventLogSink {
log: Arc<AnyEventLog>,
topic: Topic,
session_id: String,
}
impl EventLogSink {
pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
let session_id = session_id.into();
let topic = Topic::new(format!(
"observability.agent_events.{}",
crate::event_log::sanitize_topic_component(&session_id)
))
.expect("session id should sanitize to a valid topic");
Arc::new(Self {
log,
topic,
session_id,
})
}
}
impl AgentEventSink for JsonlEventSink {
fn handle_event(&self, event: &AgentEvent) {
use std::io::Write as _;
let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
let index = state.index;
state.index += 1;
let emitted_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let envelope = PersistedAgentEvent {
index,
emitted_at_ms,
frame_depth: None,
event: event.clone(),
};
if let Ok(line) = serde_json::to_string(&envelope) {
let _ = state.writer.write_all(line.as_bytes());
let _ = state.writer.write_all(b"\n");
state.bytes_written += line.len() as u64 + 1;
let _ = self.rotate_if_needed(&mut state);
}
}
}
impl AgentEventSink for EventLogSink {
fn handle_event(&self, event: &AgentEvent) {
let event_json = match serde_json::to_value(event) {
Ok(value) => value,
Err(_) => return,
};
let event_kind = event_json
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("agent_event")
.to_string();
let payload = serde_json::json!({
"index_hint": now_ms(),
"session_id": self.session_id,
"event": event_json,
});
let mut headers = std::collections::BTreeMap::new();
headers.insert("session_id".to_string(), self.session_id.clone());
let log = self.log.clone();
let topic = self.topic.clone();
let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = log.append(&topic, record).await;
});
} else {
let _ = futures::executor::block_on(log.append(&topic, record));
}
}
}
impl Drop for JsonlEventSink {
fn drop(&mut self) {
if let Ok(mut state) = self.state.lock() {
use std::io::Write as _;
let _ = state.writer.flush();
}
}
}
pub struct MultiSink {
sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
}
impl MultiSink {
pub fn new() -> Self {
Self {
sinks: Mutex::new(Vec::new()),
}
}
pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
self.sinks.lock().expect("sink mutex poisoned").push(sink);
}
pub fn len(&self) -> usize {
self.sinks.lock().expect("sink mutex poisoned").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for MultiSink {
fn default() -> Self {
Self::new()
}
}
impl AgentEventSink for MultiSink {
fn handle_event(&self, event: &AgentEvent) {
let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
for sink in sinks {
sink.handle_event(event);
}
}
}
pub(super) fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0)
}