1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32 Pending,
34 InProgress,
36 Completed,
38 Failed,
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47 AgentMessageChunk {
48 session_id: String,
49 content: String,
50 },
51 AgentThoughtChunk {
52 session_id: String,
53 content: String,
54 },
55 ToolCall {
56 session_id: String,
57 tool_call_id: String,
58 tool_name: String,
59 kind: Option<ToolKind>,
60 status: ToolCallStatus,
61 raw_input: serde_json::Value,
62 },
63 ToolCallUpdate {
64 session_id: String,
65 tool_call_id: String,
66 tool_name: String,
67 status: ToolCallStatus,
68 raw_output: Option<serde_json::Value>,
69 error: Option<String>,
70 },
71 Plan {
72 session_id: String,
73 plan: serde_json::Value,
74 },
75 TurnStart {
76 session_id: String,
77 iteration: usize,
78 },
79 TurnEnd {
80 session_id: String,
81 iteration: usize,
82 turn_info: serde_json::Value,
83 },
84 FeedbackInjected {
85 session_id: String,
86 kind: String,
87 content: String,
88 },
89 BudgetExhausted {
93 session_id: String,
94 max_iterations: usize,
95 },
96 LoopStuck {
100 session_id: String,
101 max_nudges: usize,
102 last_iteration: usize,
103 tail_excerpt: String,
104 },
105 DaemonWatchdogTripped {
110 session_id: String,
111 attempts: usize,
112 elapsed_ms: u64,
113 },
114 SkillActivated {
118 session_id: String,
119 skill_name: String,
120 iteration: usize,
121 reason: String,
122 },
123 SkillDeactivated {
126 session_id: String,
127 skill_name: String,
128 iteration: usize,
129 },
130 SkillScopeTools {
133 session_id: String,
134 skill_name: String,
135 allowed_tools: Vec<String>,
136 },
137 ToolSearchQuery {
145 session_id: String,
146 tool_use_id: String,
147 name: String,
148 query: serde_json::Value,
149 strategy: String,
150 mode: String,
151 },
152 ToolSearchResult {
156 session_id: String,
157 tool_use_id: String,
158 promoted: Vec<String>,
159 strategy: String,
160 mode: String,
161 },
162}
163
164impl AgentEvent {
165 pub fn session_id(&self) -> &str {
166 match self {
167 Self::AgentMessageChunk { session_id, .. }
168 | Self::AgentThoughtChunk { session_id, .. }
169 | Self::ToolCall { session_id, .. }
170 | Self::ToolCallUpdate { session_id, .. }
171 | Self::Plan { session_id, .. }
172 | Self::TurnStart { session_id, .. }
173 | Self::TurnEnd { session_id, .. }
174 | Self::FeedbackInjected { session_id, .. }
175 | Self::BudgetExhausted { session_id, .. }
176 | Self::LoopStuck { session_id, .. }
177 | Self::DaemonWatchdogTripped { session_id, .. }
178 | Self::SkillActivated { session_id, .. }
179 | Self::SkillDeactivated { session_id, .. }
180 | Self::SkillScopeTools { session_id, .. }
181 | Self::ToolSearchQuery { session_id, .. }
182 | Self::ToolSearchResult { session_id, .. } => session_id,
183 }
184 }
185}
186
187pub trait AgentEventSink: Send + Sync {
190 fn handle_event(&self, event: &AgentEvent);
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
200pub struct PersistedAgentEvent {
201 pub index: u64,
205 pub emitted_at_ms: i64,
209 pub frame_depth: Option<u32>,
213 #[serde(flatten)]
215 pub event: AgentEvent,
216}
217
218pub struct JsonlEventSink {
223 state: Mutex<JsonlEventSinkState>,
224 base_path: std::path::PathBuf,
225}
226
227struct JsonlEventSinkState {
228 writer: std::io::BufWriter<std::fs::File>,
229 index: u64,
230 bytes_written: u64,
231 rotation: u32,
232}
233
234impl JsonlEventSink {
235 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
239
240 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
244 let base_path = base_path.into();
245 if let Some(parent) = base_path.parent() {
246 std::fs::create_dir_all(parent)?;
247 }
248 let file = std::fs::OpenOptions::new()
249 .create(true)
250 .truncate(true)
251 .write(true)
252 .open(&base_path)?;
253 Ok(Arc::new(Self {
254 state: Mutex::new(JsonlEventSinkState {
255 writer: std::io::BufWriter::new(file),
256 index: 0,
257 bytes_written: 0,
258 rotation: 0,
259 }),
260 base_path,
261 }))
262 }
263
264 pub fn flush(&self) -> std::io::Result<()> {
267 use std::io::Write as _;
268 self.state
269 .lock()
270 .expect("jsonl sink mutex poisoned")
271 .writer
272 .flush()
273 }
274
275 pub fn event_count(&self) -> u64 {
278 self.state.lock().expect("jsonl sink mutex poisoned").index
279 }
280
281 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
282 use std::io::Write as _;
283 if state.bytes_written < Self::ROTATE_BYTES {
284 return Ok(());
285 }
286 state.writer.flush()?;
287 state.rotation += 1;
288 let suffix = format!("-{:06}", state.rotation);
289 let rotated = self.base_path.with_file_name({
290 let stem = self
291 .base_path
292 .file_stem()
293 .and_then(|s| s.to_str())
294 .unwrap_or("event_log");
295 let ext = self
296 .base_path
297 .extension()
298 .and_then(|e| e.to_str())
299 .unwrap_or("jsonl");
300 format!("{stem}{suffix}.{ext}")
301 });
302 let file = std::fs::OpenOptions::new()
303 .create(true)
304 .truncate(true)
305 .write(true)
306 .open(&rotated)?;
307 state.writer = std::io::BufWriter::new(file);
308 state.bytes_written = 0;
309 Ok(())
310 }
311}
312
313impl AgentEventSink for JsonlEventSink {
314 fn handle_event(&self, event: &AgentEvent) {
315 use std::io::Write as _;
316 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
317 let index = state.index;
318 state.index += 1;
319 let emitted_at_ms = std::time::SystemTime::now()
320 .duration_since(std::time::UNIX_EPOCH)
321 .map(|d| d.as_millis() as i64)
322 .unwrap_or(0);
323 let envelope = PersistedAgentEvent {
324 index,
325 emitted_at_ms,
326 frame_depth: None,
327 event: event.clone(),
328 };
329 if let Ok(line) = serde_json::to_string(&envelope) {
330 let _ = state.writer.write_all(line.as_bytes());
335 let _ = state.writer.write_all(b"\n");
336 state.bytes_written += line.len() as u64 + 1;
337 let _ = self.rotate_if_needed(&mut state);
338 }
339 }
340}
341
342impl Drop for JsonlEventSink {
343 fn drop(&mut self) {
344 if let Ok(mut state) = self.state.lock() {
345 use std::io::Write as _;
346 let _ = state.writer.flush();
347 }
348 }
349}
350
351pub struct MultiSink {
353 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
354}
355
356impl MultiSink {
357 pub fn new() -> Self {
358 Self {
359 sinks: Mutex::new(Vec::new()),
360 }
361 }
362 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
363 self.sinks.lock().expect("sink mutex poisoned").push(sink);
364 }
365 pub fn len(&self) -> usize {
366 self.sinks.lock().expect("sink mutex poisoned").len()
367 }
368 pub fn is_empty(&self) -> bool {
369 self.len() == 0
370 }
371}
372
373impl Default for MultiSink {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379impl AgentEventSink for MultiSink {
380 fn handle_event(&self, event: &AgentEvent) {
381 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
387 for sink in sinks {
388 sink.handle_event(event);
389 }
390 }
391}
392
393type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
394
395fn external_sinks() -> &'static ExternalSinkRegistry {
396 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
397 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
398}
399
400pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
401 let session_id = session_id.into();
402 let mut reg = external_sinks().write().expect("sink registry poisoned");
403 reg.entry(session_id).or_default().push(sink);
404}
405
406pub fn clear_session_sinks(session_id: &str) {
410 external_sinks()
411 .write()
412 .expect("sink registry poisoned")
413 .remove(session_id);
414}
415
416pub fn reset_all_sinks() {
417 external_sinks()
418 .write()
419 .expect("sink registry poisoned")
420 .clear();
421 crate::agent_sessions::reset_session_store();
422}
423
424pub fn emit_event(event: &AgentEvent) {
428 let sinks: Vec<Arc<dyn AgentEventSink>> = {
429 let reg = external_sinks().read().expect("sink registry poisoned");
430 reg.get(event.session_id()).cloned().unwrap_or_default()
431 };
432 for sink in sinks {
433 sink.handle_event(event);
434 }
435}
436
437pub fn session_external_sink_count(session_id: &str) -> usize {
438 external_sinks()
439 .read()
440 .expect("sink registry poisoned")
441 .get(session_id)
442 .map(|v| v.len())
443 .unwrap_or(0)
444}
445
446pub fn session_closure_subscriber_count(session_id: &str) -> usize {
447 crate::agent_sessions::subscriber_count(session_id)
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use std::sync::atomic::{AtomicUsize, Ordering};
454
455 struct CountingSink(Arc<AtomicUsize>);
456 impl AgentEventSink for CountingSink {
457 fn handle_event(&self, _event: &AgentEvent) {
458 self.0.fetch_add(1, Ordering::SeqCst);
459 }
460 }
461
462 #[test]
463 fn multi_sink_fans_out_in_order() {
464 let multi = MultiSink::new();
465 let a = Arc::new(AtomicUsize::new(0));
466 let b = Arc::new(AtomicUsize::new(0));
467 multi.push(Arc::new(CountingSink(a.clone())));
468 multi.push(Arc::new(CountingSink(b.clone())));
469 let event = AgentEvent::TurnStart {
470 session_id: "s1".into(),
471 iteration: 1,
472 };
473 multi.handle_event(&event);
474 assert_eq!(a.load(Ordering::SeqCst), 1);
475 assert_eq!(b.load(Ordering::SeqCst), 1);
476 }
477
478 #[test]
479 fn session_scoped_sink_routing() {
480 reset_all_sinks();
481 let a = Arc::new(AtomicUsize::new(0));
482 let b = Arc::new(AtomicUsize::new(0));
483 register_sink("session-a", Arc::new(CountingSink(a.clone())));
484 register_sink("session-b", Arc::new(CountingSink(b.clone())));
485 emit_event(&AgentEvent::TurnStart {
486 session_id: "session-a".into(),
487 iteration: 0,
488 });
489 assert_eq!(a.load(Ordering::SeqCst), 1);
490 assert_eq!(b.load(Ordering::SeqCst), 0);
491 emit_event(&AgentEvent::TurnEnd {
492 session_id: "session-b".into(),
493 iteration: 0,
494 turn_info: serde_json::json!({}),
495 });
496 assert_eq!(a.load(Ordering::SeqCst), 1);
497 assert_eq!(b.load(Ordering::SeqCst), 1);
498 clear_session_sinks("session-a");
499 assert_eq!(session_external_sink_count("session-a"), 0);
500 assert_eq!(session_external_sink_count("session-b"), 1);
501 reset_all_sinks();
502 }
503
504 #[test]
505 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
506 use std::io::{BufRead, BufReader};
507 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
508 std::fs::create_dir_all(&dir).unwrap();
509 let path = dir.join("event_log.jsonl");
510 let sink = JsonlEventSink::open(&path).unwrap();
511 for i in 0..5 {
512 sink.handle_event(&AgentEvent::TurnStart {
513 session_id: "s".into(),
514 iteration: i,
515 });
516 }
517 assert_eq!(sink.event_count(), 5);
518 sink.flush().unwrap();
519
520 let file = std::fs::File::open(&path).unwrap();
522 let mut last_idx: i64 = -1;
523 let mut last_ts: i64 = 0;
524 for line in BufReader::new(file).lines() {
525 let line = line.unwrap();
526 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
527 let idx = val["index"].as_i64().unwrap();
528 let ts = val["emitted_at_ms"].as_i64().unwrap();
529 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
530 assert!(ts >= last_ts, "timestamps must be non-decreasing");
531 last_idx = idx;
532 last_ts = ts;
533 assert_eq!(val["type"], "turn_start");
535 }
536 assert_eq!(last_idx, 4);
537 let _ = std::fs::remove_file(&path);
538 }
539
540 #[test]
541 fn tool_call_status_serde() {
542 assert_eq!(
543 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
544 "\"pending\""
545 );
546 assert_eq!(
547 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
548 "\"in_progress\""
549 );
550 assert_eq!(
551 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
552 "\"completed\""
553 );
554 assert_eq!(
555 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
556 "\"failed\""
557 );
558 }
559}