harn_vm/agent_events/
sinks.rs1use std::sync::{Arc, Mutex};
2
3use serde::{Deserialize, Serialize};
4
5use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
6
7use super::AgentEvent;
8
9pub trait AgentEventSink: Send + Sync {
12 fn handle_event(&self, event: &AgentEvent);
13}
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct PersistedAgentEvent {
23 pub index: u64,
27 pub emitted_at_ms: i64,
31 pub frame_depth: Option<u32>,
35 #[serde(flatten)]
37 pub event: AgentEvent,
38}
39
40pub struct JsonlEventSink {
45 state: Mutex<JsonlEventSinkState>,
46 base_path: std::path::PathBuf,
47}
48
49struct JsonlEventSinkState {
50 writer: std::io::BufWriter<std::fs::File>,
51 index: u64,
52 bytes_written: u64,
53 rotation: u32,
54}
55
56impl JsonlEventSink {
57 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
61
62 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
66 let base_path = base_path.into();
67 if let Some(parent) = base_path.parent() {
68 std::fs::create_dir_all(parent)?;
69 }
70 let file = std::fs::OpenOptions::new()
71 .create(true)
72 .truncate(true)
73 .write(true)
74 .open(&base_path)?;
75 Ok(Arc::new(Self {
76 state: Mutex::new(JsonlEventSinkState {
77 writer: std::io::BufWriter::new(file),
78 index: 0,
79 bytes_written: 0,
80 rotation: 0,
81 }),
82 base_path,
83 }))
84 }
85
86 pub fn flush(&self) -> std::io::Result<()> {
89 use std::io::Write as _;
90 self.state
91 .lock()
92 .expect("jsonl sink mutex poisoned")
93 .writer
94 .flush()
95 }
96
97 pub fn event_count(&self) -> u64 {
100 self.state.lock().expect("jsonl sink mutex poisoned").index
101 }
102
103 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
104 use std::io::Write as _;
105 if state.bytes_written < Self::ROTATE_BYTES {
106 return Ok(());
107 }
108 state.writer.flush()?;
109 state.rotation += 1;
110 let suffix = format!("-{:06}", state.rotation);
111 let rotated = self.base_path.with_file_name({
112 let stem = self
113 .base_path
114 .file_stem()
115 .and_then(|s| s.to_str())
116 .unwrap_or("event_log");
117 let ext = self
118 .base_path
119 .extension()
120 .and_then(|e| e.to_str())
121 .unwrap_or("jsonl");
122 format!("{stem}{suffix}.{ext}")
123 });
124 let file = std::fs::OpenOptions::new()
125 .create(true)
126 .truncate(true)
127 .write(true)
128 .open(&rotated)?;
129 state.writer = std::io::BufWriter::new(file);
130 state.bytes_written = 0;
131 Ok(())
132 }
133}
134
135pub struct EventLogSink {
140 log: Arc<AnyEventLog>,
141 topic: Topic,
142 session_id: String,
143}
144
145impl EventLogSink {
146 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
147 let session_id = session_id.into();
148 let topic = Topic::new(format!(
149 "observability.agent_events.{}",
150 crate::event_log::sanitize_topic_component(&session_id)
151 ))
152 .expect("session id should sanitize to a valid topic");
153 Arc::new(Self {
154 log,
155 topic,
156 session_id,
157 })
158 }
159}
160
161impl AgentEventSink for JsonlEventSink {
162 fn handle_event(&self, event: &AgentEvent) {
163 use std::io::Write as _;
164 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
165 let index = state.index;
166 state.index += 1;
167 let emitted_at_ms = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .map(|d| d.as_millis() as i64)
170 .unwrap_or(0);
171 let envelope = PersistedAgentEvent {
172 index,
173 emitted_at_ms,
174 frame_depth: None,
175 event: event.clone(),
176 };
177 let Ok(mut envelope_json) = serde_json::to_value(&envelope) else {
178 return;
179 };
180 crate::redact::current_policy().redact_json_in_place(&mut envelope_json);
181 if let Ok(line) = serde_json::to_string(&envelope_json) {
182 if state
187 .writer
188 .write_all(line.as_bytes())
189 .and_then(|_| state.writer.write_all(b"\n"))
190 .is_ok()
191 {
192 state.bytes_written += line.len() as u64 + 1;
193 let _ = state.writer.flush();
194 let _ = self.rotate_if_needed(&mut state);
195 }
196 }
197 }
198}
199
200impl AgentEventSink for EventLogSink {
201 fn handle_event(&self, event: &AgentEvent) {
202 let event_json = match serde_json::to_value(event) {
203 Ok(value) => value,
204 Err(_) => return,
205 };
206 let event_kind = event_json
207 .get("type")
208 .and_then(|value| value.as_str())
209 .unwrap_or("agent_event")
210 .to_string();
211 let payload = serde_json::json!({
212 "index_hint": now_ms(),
213 "session_id": self.session_id,
214 "event": event_json,
215 });
216 let mut headers = std::collections::BTreeMap::new();
217 headers.insert("session_id".to_string(), self.session_id.clone());
218 let log = self.log.clone();
219 let topic = self.topic.clone();
220 let mut record = EventLogRecord::new(event_kind, payload).with_headers(headers);
221 record.redact_in_place(&crate::redact::current_policy());
222 if let Ok(handle) = tokio::runtime::Handle::try_current() {
223 handle.spawn(async move {
224 let _ = log.append(&topic, record).await;
225 });
226 } else {
227 let _ = futures::executor::block_on(log.append(&topic, record));
228 }
229 }
230}
231
232impl Drop for JsonlEventSink {
233 fn drop(&mut self) {
234 if let Ok(mut state) = self.state.lock() {
235 use std::io::Write as _;
236 let _ = state.writer.flush();
237 }
238 }
239}
240
241pub struct MultiSink {
243 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
244}
245
246impl MultiSink {
247 pub fn new() -> Self {
248 Self {
249 sinks: Mutex::new(Vec::new()),
250 }
251 }
252 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
253 self.sinks.lock().expect("sink mutex poisoned").push(sink);
254 }
255 pub fn len(&self) -> usize {
256 self.sinks.lock().expect("sink mutex poisoned").len()
257 }
258 pub fn is_empty(&self) -> bool {
259 self.len() == 0
260 }
261}
262
263impl Default for MultiSink {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl AgentEventSink for MultiSink {
270 fn handle_event(&self, event: &AgentEvent) {
271 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
277 for sink in sinks {
278 sink.handle_event(event);
279 }
280 }
281}
282
283pub(super) fn now_ms() -> i64 {
284 std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)
286 .map(|duration| duration.as_millis() as i64)
287 .unwrap_or(0)
288}