Skip to main content

harn_vm/agent_events/
sinks.rs

1use std::sync::{Arc, Mutex};
2
3use serde::{Deserialize, Serialize};
4
5use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
6
7use super::AgentEvent;
8
9/// External consumers of the event stream (e.g. the harn-cli ACP server,
10/// which translates events into JSON-RPC notifications).
11pub trait AgentEventSink: Send + Sync {
12    fn handle_event(&self, event: &AgentEvent);
13}
14
15/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
16/// `AgentEvent` with monotonic index + timestamp + frame depth so
17/// replay engines can reconstruct paused state at any event index,
18/// and scrubber UIs can bucket events by time. The envelope is the
19/// on-disk shape; the wire format for live consumers is still the
20/// raw `AgentEvent` so existing sinks don't churn.
21#[derive(Clone, Debug, Serialize, Deserialize)]
22pub struct PersistedAgentEvent {
23    /// Monotonic per-session index starting at 0. Unique within a
24    /// session; gaps never happen even under load because the sink
25    /// owns the counter under a mutex.
26    pub index: u64,
27    /// Milliseconds since the Unix epoch, captured when the sink
28    /// received the event. Not the event's emission time — that
29    /// would require threading a clock through every emit site.
30    pub emitted_at_ms: i64,
31    /// Call-stack depth at the moment of emission, when the caller
32    /// can supply it. `None` for events emitted from a context where
33    /// the VM frame stack isn't available.
34    pub frame_depth: Option<u32>,
35    /// The raw event, flattened so `jq '.type'` works as expected.
36    #[serde(flatten)]
37    pub event: AgentEvent,
38}
39
40/// Append-only JSONL sink for a single session's event stream (#103).
41/// One writer per session; sinks rotate to a numbered suffix when a
42/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
43/// sessions rarely exceed 5 MB, so rotation almost never fires).
44pub struct JsonlEventSink {
45    state: Mutex<JsonlEventSinkState>,
46    base_path: std::path::PathBuf,
47}
48
49struct JsonlEventSinkState {
50    writer: std::io::BufWriter<std::fs::File>,
51    index: u64,
52    bytes_written: u64,
53    rotation: u32,
54}
55
56impl JsonlEventSink {
57    /// Hard cap past which the current file rotates to a numbered
58    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
59    /// sessions don't produce unreadable multi-GB logs.
60    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
61
62    /// Open a new sink writing to `base_path`. Creates parent dirs
63    /// if missing. Overwrites an existing file so each fresh session
64    /// starts from index 0.
65    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
66        let base_path = base_path.into();
67        if let Some(parent) = base_path.parent() {
68            std::fs::create_dir_all(parent)?;
69        }
70        let file = std::fs::OpenOptions::new()
71            .create(true)
72            .truncate(true)
73            .write(true)
74            .open(&base_path)?;
75        Ok(Arc::new(Self {
76            state: Mutex::new(JsonlEventSinkState {
77                writer: std::io::BufWriter::new(file),
78                index: 0,
79                bytes_written: 0,
80                rotation: 0,
81            }),
82            base_path,
83        }))
84    }
85
86    /// Flush any buffered writes. Called on session shutdown; the
87    /// Drop impl calls this too but on early panic it may not run.
88    pub fn flush(&self) -> std::io::Result<()> {
89        use std::io::Write as _;
90        self.state
91            .lock()
92            .expect("jsonl sink mutex poisoned")
93            .writer
94            .flush()
95    }
96
97    /// Current event index — primarily for tests and the "how many
98    /// events are in this run" run-record summary.
99    pub fn event_count(&self) -> u64 {
100        self.state.lock().expect("jsonl sink mutex poisoned").index
101    }
102
103    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
104        use std::io::Write as _;
105        if state.bytes_written < Self::ROTATE_BYTES {
106            return Ok(());
107        }
108        state.writer.flush()?;
109        state.rotation += 1;
110        let suffix = format!("-{:06}", state.rotation);
111        let rotated = self.base_path.with_file_name({
112            let stem = self
113                .base_path
114                .file_stem()
115                .and_then(|s| s.to_str())
116                .unwrap_or("event_log");
117            let ext = self
118                .base_path
119                .extension()
120                .and_then(|e| e.to_str())
121                .unwrap_or("jsonl");
122            format!("{stem}{suffix}.{ext}")
123        });
124        let file = std::fs::OpenOptions::new()
125            .create(true)
126            .truncate(true)
127            .write(true)
128            .open(&rotated)?;
129        state.writer = std::io::BufWriter::new(file);
130        state.bytes_written = 0;
131        Ok(())
132    }
133}
134
135/// Event-log-backed sink for a single session's agent event stream.
136/// Uses the generalized append-only event log when one is installed for
137/// the current VM thread and falls back to `JsonlEventSink` only for
138/// older env-driven workflows.
139pub struct EventLogSink {
140    log: Arc<AnyEventLog>,
141    topic: Topic,
142    session_id: String,
143}
144
145impl EventLogSink {
146    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
147        let session_id = session_id.into();
148        let topic = Topic::new(format!(
149            "observability.agent_events.{}",
150            crate::event_log::sanitize_topic_component(&session_id)
151        ))
152        .expect("session id should sanitize to a valid topic");
153        Arc::new(Self {
154            log,
155            topic,
156            session_id,
157        })
158    }
159}
160
161impl AgentEventSink for JsonlEventSink {
162    fn handle_event(&self, event: &AgentEvent) {
163        use std::io::Write as _;
164        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
165        let index = state.index;
166        state.index += 1;
167        let emitted_at_ms = std::time::SystemTime::now()
168            .duration_since(std::time::UNIX_EPOCH)
169            .map(|d| d.as_millis() as i64)
170            .unwrap_or(0);
171        let envelope = PersistedAgentEvent {
172            index,
173            emitted_at_ms,
174            frame_depth: None,
175            event: event.clone(),
176        };
177        if let Ok(line) = serde_json::to_string(&envelope) {
178            // One line, newline-terminated — JSON Lines spec.
179            // Errors here are swallowed on purpose; a failing write
180            // must never crash the agent loop, and the run record
181            // itself is a secondary artifact.
182            let _ = state.writer.write_all(line.as_bytes());
183            let _ = state.writer.write_all(b"\n");
184            state.bytes_written += line.len() as u64 + 1;
185            let _ = self.rotate_if_needed(&mut state);
186        }
187    }
188}
189
190impl AgentEventSink for EventLogSink {
191    fn handle_event(&self, event: &AgentEvent) {
192        let event_json = match serde_json::to_value(event) {
193            Ok(value) => value,
194            Err(_) => return,
195        };
196        let event_kind = event_json
197            .get("type")
198            .and_then(|value| value.as_str())
199            .unwrap_or("agent_event")
200            .to_string();
201        let payload = serde_json::json!({
202            "index_hint": now_ms(),
203            "session_id": self.session_id,
204            "event": event_json,
205        });
206        let mut headers = std::collections::BTreeMap::new();
207        headers.insert("session_id".to_string(), self.session_id.clone());
208        let log = self.log.clone();
209        let topic = self.topic.clone();
210        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
211        if let Ok(handle) = tokio::runtime::Handle::try_current() {
212            handle.spawn(async move {
213                let _ = log.append(&topic, record).await;
214            });
215        } else {
216            let _ = futures::executor::block_on(log.append(&topic, record));
217        }
218    }
219}
220
221impl Drop for JsonlEventSink {
222    fn drop(&mut self) {
223        if let Ok(mut state) = self.state.lock() {
224            use std::io::Write as _;
225            let _ = state.writer.flush();
226        }
227    }
228}
229
230/// Fan-out helper for composing multiple external sinks.
231pub struct MultiSink {
232    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
233}
234
235impl MultiSink {
236    pub fn new() -> Self {
237        Self {
238            sinks: Mutex::new(Vec::new()),
239        }
240    }
241    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
242        self.sinks.lock().expect("sink mutex poisoned").push(sink);
243    }
244    pub fn len(&self) -> usize {
245        self.sinks.lock().expect("sink mutex poisoned").len()
246    }
247    pub fn is_empty(&self) -> bool {
248        self.len() == 0
249    }
250}
251
252impl Default for MultiSink {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258impl AgentEventSink for MultiSink {
259    fn handle_event(&self, event: &AgentEvent) {
260        // Deliberate: snapshot then release the lock before invoking sink
261        // callbacks. Sinks can re-enter the event system (e.g. a host
262        // sink that logs to another AgentEvent path), so holding the
263        // mutex across the callback would risk self-deadlock. Arc clones
264        // are refcount bumps — cheap.
265        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
266        for sink in sinks {
267            sink.handle_event(event);
268        }
269    }
270}
271
272pub(super) fn now_ms() -> i64 {
273    std::time::SystemTime::now()
274        .duration_since(std::time::UNIX_EPOCH)
275        .map(|duration| duration.as_millis() as i64)
276        .unwrap_or(0)
277}