1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32 Pending,
34 InProgress,
36 Completed,
38 Failed,
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47 AgentMessageChunk {
48 session_id: String,
49 content: String,
50 },
51 AgentThoughtChunk {
52 session_id: String,
53 content: String,
54 },
55 ToolCall {
56 session_id: String,
57 tool_call_id: String,
58 tool_name: String,
59 kind: Option<ToolKind>,
60 status: ToolCallStatus,
61 raw_input: serde_json::Value,
62 },
63 ToolCallUpdate {
64 session_id: String,
65 tool_call_id: String,
66 tool_name: String,
67 status: ToolCallStatus,
68 raw_output: Option<serde_json::Value>,
69 error: Option<String>,
70 },
71 Plan {
72 session_id: String,
73 plan: serde_json::Value,
74 },
75 TurnStart {
76 session_id: String,
77 iteration: usize,
78 },
79 TurnEnd {
80 session_id: String,
81 iteration: usize,
82 turn_info: serde_json::Value,
83 },
84 FeedbackInjected {
85 session_id: String,
86 kind: String,
87 content: String,
88 },
89 BudgetExhausted {
93 session_id: String,
94 max_iterations: usize,
95 },
96 LoopStuck {
100 session_id: String,
101 max_nudges: usize,
102 last_iteration: usize,
103 tail_excerpt: String,
104 },
105 DaemonWatchdogTripped {
110 session_id: String,
111 attempts: usize,
112 elapsed_ms: u64,
113 },
114 SkillActivated {
118 session_id: String,
119 skill_name: String,
120 iteration: usize,
121 reason: String,
122 },
123 SkillDeactivated {
126 session_id: String,
127 skill_name: String,
128 iteration: usize,
129 },
130 SkillScopeTools {
133 session_id: String,
134 skill_name: String,
135 allowed_tools: Vec<String>,
136 },
137}
138
139impl AgentEvent {
140 pub fn session_id(&self) -> &str {
141 match self {
142 Self::AgentMessageChunk { session_id, .. }
143 | Self::AgentThoughtChunk { session_id, .. }
144 | Self::ToolCall { session_id, .. }
145 | Self::ToolCallUpdate { session_id, .. }
146 | Self::Plan { session_id, .. }
147 | Self::TurnStart { session_id, .. }
148 | Self::TurnEnd { session_id, .. }
149 | Self::FeedbackInjected { session_id, .. }
150 | Self::BudgetExhausted { session_id, .. }
151 | Self::LoopStuck { session_id, .. }
152 | Self::DaemonWatchdogTripped { session_id, .. }
153 | Self::SkillActivated { session_id, .. }
154 | Self::SkillDeactivated { session_id, .. }
155 | Self::SkillScopeTools { session_id, .. } => session_id,
156 }
157 }
158}
159
160pub trait AgentEventSink: Send + Sync {
163 fn handle_event(&self, event: &AgentEvent);
164}
165
166#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct PersistedAgentEvent {
174 pub index: u64,
178 pub emitted_at_ms: i64,
182 pub frame_depth: Option<u32>,
186 #[serde(flatten)]
188 pub event: AgentEvent,
189}
190
191pub struct JsonlEventSink {
196 state: Mutex<JsonlEventSinkState>,
197 base_path: std::path::PathBuf,
198}
199
200struct JsonlEventSinkState {
201 writer: std::io::BufWriter<std::fs::File>,
202 index: u64,
203 bytes_written: u64,
204 rotation: u32,
205}
206
207impl JsonlEventSink {
208 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
212
213 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
217 let base_path = base_path.into();
218 if let Some(parent) = base_path.parent() {
219 std::fs::create_dir_all(parent)?;
220 }
221 let file = std::fs::OpenOptions::new()
222 .create(true)
223 .truncate(true)
224 .write(true)
225 .open(&base_path)?;
226 Ok(Arc::new(Self {
227 state: Mutex::new(JsonlEventSinkState {
228 writer: std::io::BufWriter::new(file),
229 index: 0,
230 bytes_written: 0,
231 rotation: 0,
232 }),
233 base_path,
234 }))
235 }
236
237 pub fn flush(&self) -> std::io::Result<()> {
240 use std::io::Write as _;
241 self.state
242 .lock()
243 .expect("jsonl sink mutex poisoned")
244 .writer
245 .flush()
246 }
247
248 pub fn event_count(&self) -> u64 {
251 self.state.lock().expect("jsonl sink mutex poisoned").index
252 }
253
254 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
255 use std::io::Write as _;
256 if state.bytes_written < Self::ROTATE_BYTES {
257 return Ok(());
258 }
259 state.writer.flush()?;
260 state.rotation += 1;
261 let suffix = format!("-{:06}", state.rotation);
262 let rotated = self.base_path.with_file_name({
263 let stem = self
264 .base_path
265 .file_stem()
266 .and_then(|s| s.to_str())
267 .unwrap_or("event_log");
268 let ext = self
269 .base_path
270 .extension()
271 .and_then(|e| e.to_str())
272 .unwrap_or("jsonl");
273 format!("{stem}{suffix}.{ext}")
274 });
275 let file = std::fs::OpenOptions::new()
276 .create(true)
277 .truncate(true)
278 .write(true)
279 .open(&rotated)?;
280 state.writer = std::io::BufWriter::new(file);
281 state.bytes_written = 0;
282 Ok(())
283 }
284}
285
286impl AgentEventSink for JsonlEventSink {
287 fn handle_event(&self, event: &AgentEvent) {
288 use std::io::Write as _;
289 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
290 let index = state.index;
291 state.index += 1;
292 let emitted_at_ms = std::time::SystemTime::now()
293 .duration_since(std::time::UNIX_EPOCH)
294 .map(|d| d.as_millis() as i64)
295 .unwrap_or(0);
296 let envelope = PersistedAgentEvent {
297 index,
298 emitted_at_ms,
299 frame_depth: None,
300 event: event.clone(),
301 };
302 if let Ok(line) = serde_json::to_string(&envelope) {
303 let _ = state.writer.write_all(line.as_bytes());
308 let _ = state.writer.write_all(b"\n");
309 state.bytes_written += line.len() as u64 + 1;
310 let _ = self.rotate_if_needed(&mut state);
311 }
312 }
313}
314
315impl Drop for JsonlEventSink {
316 fn drop(&mut self) {
317 if let Ok(mut state) = self.state.lock() {
318 use std::io::Write as _;
319 let _ = state.writer.flush();
320 }
321 }
322}
323
324pub struct MultiSink {
326 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
327}
328
329impl MultiSink {
330 pub fn new() -> Self {
331 Self {
332 sinks: Mutex::new(Vec::new()),
333 }
334 }
335 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
336 self.sinks.lock().expect("sink mutex poisoned").push(sink);
337 }
338 pub fn len(&self) -> usize {
339 self.sinks.lock().expect("sink mutex poisoned").len()
340 }
341 pub fn is_empty(&self) -> bool {
342 self.len() == 0
343 }
344}
345
346impl Default for MultiSink {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352impl AgentEventSink for MultiSink {
353 fn handle_event(&self, event: &AgentEvent) {
354 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
360 for sink in sinks {
361 sink.handle_event(event);
362 }
363 }
364}
365
366type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
367
368fn external_sinks() -> &'static ExternalSinkRegistry {
369 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
370 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
371}
372
373pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
374 let session_id = session_id.into();
375 let mut reg = external_sinks().write().expect("sink registry poisoned");
376 reg.entry(session_id).or_default().push(sink);
377}
378
379pub fn clear_session_sinks(session_id: &str) {
383 external_sinks()
384 .write()
385 .expect("sink registry poisoned")
386 .remove(session_id);
387}
388
389pub fn reset_all_sinks() {
390 external_sinks()
391 .write()
392 .expect("sink registry poisoned")
393 .clear();
394 crate::agent_sessions::reset_session_store();
395}
396
397pub fn emit_event(event: &AgentEvent) {
401 let sinks: Vec<Arc<dyn AgentEventSink>> = {
402 let reg = external_sinks().read().expect("sink registry poisoned");
403 reg.get(event.session_id()).cloned().unwrap_or_default()
404 };
405 for sink in sinks {
406 sink.handle_event(event);
407 }
408}
409
410pub fn session_external_sink_count(session_id: &str) -> usize {
411 external_sinks()
412 .read()
413 .expect("sink registry poisoned")
414 .get(session_id)
415 .map(|v| v.len())
416 .unwrap_or(0)
417}
418
419pub fn session_closure_subscriber_count(session_id: &str) -> usize {
420 crate::agent_sessions::subscriber_count(session_id)
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use std::sync::atomic::{AtomicUsize, Ordering};
427
428 struct CountingSink(Arc<AtomicUsize>);
429 impl AgentEventSink for CountingSink {
430 fn handle_event(&self, _event: &AgentEvent) {
431 self.0.fetch_add(1, Ordering::SeqCst);
432 }
433 }
434
435 #[test]
436 fn multi_sink_fans_out_in_order() {
437 let multi = MultiSink::new();
438 let a = Arc::new(AtomicUsize::new(0));
439 let b = Arc::new(AtomicUsize::new(0));
440 multi.push(Arc::new(CountingSink(a.clone())));
441 multi.push(Arc::new(CountingSink(b.clone())));
442 let event = AgentEvent::TurnStart {
443 session_id: "s1".into(),
444 iteration: 1,
445 };
446 multi.handle_event(&event);
447 assert_eq!(a.load(Ordering::SeqCst), 1);
448 assert_eq!(b.load(Ordering::SeqCst), 1);
449 }
450
451 #[test]
452 fn session_scoped_sink_routing() {
453 reset_all_sinks();
454 let a = Arc::new(AtomicUsize::new(0));
455 let b = Arc::new(AtomicUsize::new(0));
456 register_sink("session-a", Arc::new(CountingSink(a.clone())));
457 register_sink("session-b", Arc::new(CountingSink(b.clone())));
458 emit_event(&AgentEvent::TurnStart {
459 session_id: "session-a".into(),
460 iteration: 0,
461 });
462 assert_eq!(a.load(Ordering::SeqCst), 1);
463 assert_eq!(b.load(Ordering::SeqCst), 0);
464 emit_event(&AgentEvent::TurnEnd {
465 session_id: "session-b".into(),
466 iteration: 0,
467 turn_info: serde_json::json!({}),
468 });
469 assert_eq!(a.load(Ordering::SeqCst), 1);
470 assert_eq!(b.load(Ordering::SeqCst), 1);
471 clear_session_sinks("session-a");
472 assert_eq!(session_external_sink_count("session-a"), 0);
473 assert_eq!(session_external_sink_count("session-b"), 1);
474 reset_all_sinks();
475 }
476
477 #[test]
478 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
479 use std::io::{BufRead, BufReader};
480 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
481 std::fs::create_dir_all(&dir).unwrap();
482 let path = dir.join("event_log.jsonl");
483 let sink = JsonlEventSink::open(&path).unwrap();
484 for i in 0..5 {
485 sink.handle_event(&AgentEvent::TurnStart {
486 session_id: "s".into(),
487 iteration: i,
488 });
489 }
490 assert_eq!(sink.event_count(), 5);
491 sink.flush().unwrap();
492
493 let file = std::fs::File::open(&path).unwrap();
495 let mut last_idx: i64 = -1;
496 let mut last_ts: i64 = 0;
497 for line in BufReader::new(file).lines() {
498 let line = line.unwrap();
499 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
500 let idx = val["index"].as_i64().unwrap();
501 let ts = val["emitted_at_ms"].as_i64().unwrap();
502 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
503 assert!(ts >= last_ts, "timestamps must be non-decreasing");
504 last_idx = idx;
505 last_ts = ts;
506 assert_eq!(val["type"], "turn_start");
508 }
509 assert_eq!(last_idx, 4);
510 let _ = std::fs::remove_file(&path);
511 }
512
513 #[test]
514 fn tool_call_status_serde() {
515 assert_eq!(
516 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
517 "\"pending\""
518 );
519 assert_eq!(
520 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
521 "\"in_progress\""
522 );
523 assert_eq!(
524 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
525 "\"completed\""
526 );
527 assert_eq!(
528 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
529 "\"failed\""
530 );
531 }
532}