harn_vm/agent_events/
sinks.rs1use std::sync::{Arc, Mutex};
2
3use serde::{Deserialize, Serialize};
4
5use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
6
7use super::AgentEvent;
8
9pub trait AgentEventSink: Send + Sync {
12 fn handle_event(&self, event: &AgentEvent);
13}
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct PersistedAgentEvent {
23 pub index: u64,
27 pub emitted_at_ms: i64,
31 pub frame_depth: Option<u32>,
35 #[serde(flatten)]
37 pub event: AgentEvent,
38}
39
40pub struct JsonlEventSink {
45 state: Mutex<JsonlEventSinkState>,
46 base_path: std::path::PathBuf,
47}
48
49struct JsonlEventSinkState {
50 writer: std::io::BufWriter<std::fs::File>,
51 index: u64,
52 bytes_written: u64,
53 rotation: u32,
54}
55
56impl JsonlEventSink {
57 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
61
62 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
66 let base_path = base_path.into();
67 if let Some(parent) = base_path.parent() {
68 std::fs::create_dir_all(parent)?;
69 }
70 let file = std::fs::OpenOptions::new()
71 .create(true)
72 .truncate(true)
73 .write(true)
74 .open(&base_path)?;
75 Ok(Arc::new(Self {
76 state: Mutex::new(JsonlEventSinkState {
77 writer: std::io::BufWriter::new(file),
78 index: 0,
79 bytes_written: 0,
80 rotation: 0,
81 }),
82 base_path,
83 }))
84 }
85
86 pub fn flush(&self) -> std::io::Result<()> {
89 use std::io::Write as _;
90 self.state
91 .lock()
92 .expect("jsonl sink mutex poisoned")
93 .writer
94 .flush()
95 }
96
97 pub fn event_count(&self) -> u64 {
100 self.state.lock().expect("jsonl sink mutex poisoned").index
101 }
102
103 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
104 use std::io::Write as _;
105 if state.bytes_written < Self::ROTATE_BYTES {
106 return Ok(());
107 }
108 state.writer.flush()?;
109 state.rotation += 1;
110 let suffix = format!("-{:06}", state.rotation);
111 let rotated = self.base_path.with_file_name({
112 let stem = self
113 .base_path
114 .file_stem()
115 .and_then(|s| s.to_str())
116 .unwrap_or("event_log");
117 let ext = self
118 .base_path
119 .extension()
120 .and_then(|e| e.to_str())
121 .unwrap_or("jsonl");
122 format!("{stem}{suffix}.{ext}")
123 });
124 let file = std::fs::OpenOptions::new()
125 .create(true)
126 .truncate(true)
127 .write(true)
128 .open(&rotated)?;
129 state.writer = std::io::BufWriter::new(file);
130 state.bytes_written = 0;
131 Ok(())
132 }
133}
134
135pub struct EventLogSink {
140 log: Arc<AnyEventLog>,
141 topic: Topic,
142 session_id: String,
143}
144
145impl EventLogSink {
146 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
147 let session_id = session_id.into();
148 let topic = Topic::new(format!(
149 "observability.agent_events.{}",
150 crate::event_log::sanitize_topic_component(&session_id)
151 ))
152 .expect("session id should sanitize to a valid topic");
153 Arc::new(Self {
154 log,
155 topic,
156 session_id,
157 })
158 }
159}
160
161impl AgentEventSink for JsonlEventSink {
162 fn handle_event(&self, event: &AgentEvent) {
163 use std::io::Write as _;
164 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
165 let index = state.index;
166 state.index += 1;
167 let emitted_at_ms = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .map(|d| d.as_millis() as i64)
170 .unwrap_or(0);
171 let envelope = PersistedAgentEvent {
172 index,
173 emitted_at_ms,
174 frame_depth: None,
175 event: event.clone(),
176 };
177 if let Ok(line) = serde_json::to_string(&envelope) {
178 let _ = state.writer.write_all(line.as_bytes());
183 let _ = state.writer.write_all(b"\n");
184 state.bytes_written += line.len() as u64 + 1;
185 let _ = self.rotate_if_needed(&mut state);
186 }
187 }
188}
189
190impl AgentEventSink for EventLogSink {
191 fn handle_event(&self, event: &AgentEvent) {
192 let event_json = match serde_json::to_value(event) {
193 Ok(value) => value,
194 Err(_) => return,
195 };
196 let event_kind = event_json
197 .get("type")
198 .and_then(|value| value.as_str())
199 .unwrap_or("agent_event")
200 .to_string();
201 let payload = serde_json::json!({
202 "index_hint": now_ms(),
203 "session_id": self.session_id,
204 "event": event_json,
205 });
206 let mut headers = std::collections::BTreeMap::new();
207 headers.insert("session_id".to_string(), self.session_id.clone());
208 let log = self.log.clone();
209 let topic = self.topic.clone();
210 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
211 if let Ok(handle) = tokio::runtime::Handle::try_current() {
212 handle.spawn(async move {
213 let _ = log.append(&topic, record).await;
214 });
215 } else {
216 let _ = futures::executor::block_on(log.append(&topic, record));
217 }
218 }
219}
220
221impl Drop for JsonlEventSink {
222 fn drop(&mut self) {
223 if let Ok(mut state) = self.state.lock() {
224 use std::io::Write as _;
225 let _ = state.writer.flush();
226 }
227 }
228}
229
230pub struct MultiSink {
232 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
233}
234
235impl MultiSink {
236 pub fn new() -> Self {
237 Self {
238 sinks: Mutex::new(Vec::new()),
239 }
240 }
241 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
242 self.sinks.lock().expect("sink mutex poisoned").push(sink);
243 }
244 pub fn len(&self) -> usize {
245 self.sinks.lock().expect("sink mutex poisoned").len()
246 }
247 pub fn is_empty(&self) -> bool {
248 self.len() == 0
249 }
250}
251
252impl Default for MultiSink {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258impl AgentEventSink for MultiSink {
259 fn handle_event(&self, event: &AgentEvent) {
260 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
266 for sink in sinks {
267 sink.handle_event(event);
268 }
269 }
270}
271
272pub(super) fn now_ms() -> i64 {
273 std::time::SystemTime::now()
274 .duration_since(std::time::UNIX_EPOCH)
275 .map(|duration| duration.as_millis() as i64)
276 .unwrap_or(0)
277}