1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::tool_annotations::ToolKind;
28
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
34pub enum WorkerEvent {
35 WorkerSpawned,
36 WorkerCompleted,
37 WorkerFailed,
38 WorkerCancelled,
39}
40
41impl WorkerEvent {
42 pub fn as_status(self) -> &'static str {
43 match self {
44 Self::WorkerSpawned => "running",
45 Self::WorkerCompleted => "completed",
46 Self::WorkerFailed => "failed",
47 Self::WorkerCancelled => "cancelled",
48 }
49 }
50
51 pub fn as_str(self) -> &'static str {
52 match self {
53 Self::WorkerSpawned => "WorkerSpawned",
54 Self::WorkerCompleted => "WorkerCompleted",
55 Self::WorkerFailed => "WorkerFailed",
56 Self::WorkerCancelled => "WorkerCancelled",
57 }
58 }
59}
60
61#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum ToolCallStatus {
65 Pending,
67 InProgress,
69 Completed,
71 Failed,
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize)]
78#[serde(tag = "type", rename_all = "snake_case")]
79pub enum AgentEvent {
80 AgentMessageChunk {
81 session_id: String,
82 content: String,
83 },
84 AgentThoughtChunk {
85 session_id: String,
86 content: String,
87 },
88 ToolCall {
89 session_id: String,
90 tool_call_id: String,
91 tool_name: String,
92 kind: Option<ToolKind>,
93 status: ToolCallStatus,
94 raw_input: serde_json::Value,
95 },
96 ToolCallUpdate {
97 session_id: String,
98 tool_call_id: String,
99 tool_name: String,
100 status: ToolCallStatus,
101 raw_output: Option<serde_json::Value>,
102 error: Option<String>,
103 },
104 Plan {
105 session_id: String,
106 plan: serde_json::Value,
107 },
108 TurnStart {
109 session_id: String,
110 iteration: usize,
111 },
112 TurnEnd {
113 session_id: String,
114 iteration: usize,
115 turn_info: serde_json::Value,
116 },
117 FeedbackInjected {
118 session_id: String,
119 kind: String,
120 content: String,
121 },
122 BudgetExhausted {
126 session_id: String,
127 max_iterations: usize,
128 },
129 LoopStuck {
133 session_id: String,
134 max_nudges: usize,
135 last_iteration: usize,
136 tail_excerpt: String,
137 },
138 DaemonWatchdogTripped {
143 session_id: String,
144 attempts: usize,
145 elapsed_ms: u64,
146 },
147 SkillActivated {
151 session_id: String,
152 skill_name: String,
153 iteration: usize,
154 reason: String,
155 },
156 SkillDeactivated {
159 session_id: String,
160 skill_name: String,
161 iteration: usize,
162 },
163 SkillScopeTools {
166 session_id: String,
167 skill_name: String,
168 allowed_tools: Vec<String>,
169 },
170 ToolSearchQuery {
178 session_id: String,
179 tool_use_id: String,
180 name: String,
181 query: serde_json::Value,
182 strategy: String,
183 mode: String,
184 },
185 ToolSearchResult {
189 session_id: String,
190 tool_use_id: String,
191 promoted: Vec<String>,
192 strategy: String,
193 mode: String,
194 },
195 TranscriptCompacted {
196 session_id: String,
197 mode: String,
198 strategy: String,
199 archived_messages: usize,
200 estimated_tokens_before: usize,
201 estimated_tokens_after: usize,
202 snapshot_asset_id: Option<String>,
203 },
204}
205
206impl AgentEvent {
207 pub fn session_id(&self) -> &str {
208 match self {
209 Self::AgentMessageChunk { session_id, .. }
210 | Self::AgentThoughtChunk { session_id, .. }
211 | Self::ToolCall { session_id, .. }
212 | Self::ToolCallUpdate { session_id, .. }
213 | Self::Plan { session_id, .. }
214 | Self::TurnStart { session_id, .. }
215 | Self::TurnEnd { session_id, .. }
216 | Self::FeedbackInjected { session_id, .. }
217 | Self::BudgetExhausted { session_id, .. }
218 | Self::LoopStuck { session_id, .. }
219 | Self::DaemonWatchdogTripped { session_id, .. }
220 | Self::SkillActivated { session_id, .. }
221 | Self::SkillDeactivated { session_id, .. }
222 | Self::SkillScopeTools { session_id, .. }
223 | Self::ToolSearchQuery { session_id, .. }
224 | Self::ToolSearchResult { session_id, .. }
225 | Self::TranscriptCompacted { session_id, .. } => session_id,
226 }
227 }
228}
229
230pub trait AgentEventSink: Send + Sync {
233 fn handle_event(&self, event: &AgentEvent);
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize)]
243pub struct PersistedAgentEvent {
244 pub index: u64,
248 pub emitted_at_ms: i64,
252 pub frame_depth: Option<u32>,
256 #[serde(flatten)]
258 pub event: AgentEvent,
259}
260
261pub struct JsonlEventSink {
266 state: Mutex<JsonlEventSinkState>,
267 base_path: std::path::PathBuf,
268}
269
270struct JsonlEventSinkState {
271 writer: std::io::BufWriter<std::fs::File>,
272 index: u64,
273 bytes_written: u64,
274 rotation: u32,
275}
276
277impl JsonlEventSink {
278 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
282
283 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
287 let base_path = base_path.into();
288 if let Some(parent) = base_path.parent() {
289 std::fs::create_dir_all(parent)?;
290 }
291 let file = std::fs::OpenOptions::new()
292 .create(true)
293 .truncate(true)
294 .write(true)
295 .open(&base_path)?;
296 Ok(Arc::new(Self {
297 state: Mutex::new(JsonlEventSinkState {
298 writer: std::io::BufWriter::new(file),
299 index: 0,
300 bytes_written: 0,
301 rotation: 0,
302 }),
303 base_path,
304 }))
305 }
306
307 pub fn flush(&self) -> std::io::Result<()> {
310 use std::io::Write as _;
311 self.state
312 .lock()
313 .expect("jsonl sink mutex poisoned")
314 .writer
315 .flush()
316 }
317
318 pub fn event_count(&self) -> u64 {
321 self.state.lock().expect("jsonl sink mutex poisoned").index
322 }
323
324 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
325 use std::io::Write as _;
326 if state.bytes_written < Self::ROTATE_BYTES {
327 return Ok(());
328 }
329 state.writer.flush()?;
330 state.rotation += 1;
331 let suffix = format!("-{:06}", state.rotation);
332 let rotated = self.base_path.with_file_name({
333 let stem = self
334 .base_path
335 .file_stem()
336 .and_then(|s| s.to_str())
337 .unwrap_or("event_log");
338 let ext = self
339 .base_path
340 .extension()
341 .and_then(|e| e.to_str())
342 .unwrap_or("jsonl");
343 format!("{stem}{suffix}.{ext}")
344 });
345 let file = std::fs::OpenOptions::new()
346 .create(true)
347 .truncate(true)
348 .write(true)
349 .open(&rotated)?;
350 state.writer = std::io::BufWriter::new(file);
351 state.bytes_written = 0;
352 Ok(())
353 }
354}
355
356pub struct EventLogSink {
361 log: Arc<AnyEventLog>,
362 topic: Topic,
363 session_id: String,
364}
365
366impl EventLogSink {
367 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
368 let session_id = session_id.into();
369 let topic = Topic::new(format!(
370 "observability.agent_events.{}",
371 crate::event_log::sanitize_topic_component(&session_id)
372 ))
373 .expect("session id should sanitize to a valid topic");
374 Arc::new(Self {
375 log,
376 topic,
377 session_id,
378 })
379 }
380}
381
382impl AgentEventSink for JsonlEventSink {
383 fn handle_event(&self, event: &AgentEvent) {
384 use std::io::Write as _;
385 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
386 let index = state.index;
387 state.index += 1;
388 let emitted_at_ms = std::time::SystemTime::now()
389 .duration_since(std::time::UNIX_EPOCH)
390 .map(|d| d.as_millis() as i64)
391 .unwrap_or(0);
392 let envelope = PersistedAgentEvent {
393 index,
394 emitted_at_ms,
395 frame_depth: None,
396 event: event.clone(),
397 };
398 if let Ok(line) = serde_json::to_string(&envelope) {
399 let _ = state.writer.write_all(line.as_bytes());
404 let _ = state.writer.write_all(b"\n");
405 state.bytes_written += line.len() as u64 + 1;
406 let _ = self.rotate_if_needed(&mut state);
407 }
408 }
409}
410
411impl AgentEventSink for EventLogSink {
412 fn handle_event(&self, event: &AgentEvent) {
413 let event_json = match serde_json::to_value(event) {
414 Ok(value) => value,
415 Err(_) => return,
416 };
417 let event_kind = event_json
418 .get("type")
419 .and_then(|value| value.as_str())
420 .unwrap_or("agent_event")
421 .to_string();
422 let payload = serde_json::json!({
423 "index_hint": now_ms(),
424 "session_id": self.session_id,
425 "event": event_json,
426 });
427 let mut headers = std::collections::BTreeMap::new();
428 headers.insert("session_id".to_string(), self.session_id.clone());
429 let log = self.log.clone();
430 let topic = self.topic.clone();
431 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
432 if let Ok(handle) = tokio::runtime::Handle::try_current() {
433 handle.spawn(async move {
434 let _ = log.append(&topic, record).await;
435 });
436 } else {
437 let _ = futures::executor::block_on(log.append(&topic, record));
438 }
439 }
440}
441
442impl Drop for JsonlEventSink {
443 fn drop(&mut self) {
444 if let Ok(mut state) = self.state.lock() {
445 use std::io::Write as _;
446 let _ = state.writer.flush();
447 }
448 }
449}
450
451pub struct MultiSink {
453 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
454}
455
456impl MultiSink {
457 pub fn new() -> Self {
458 Self {
459 sinks: Mutex::new(Vec::new()),
460 }
461 }
462 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
463 self.sinks.lock().expect("sink mutex poisoned").push(sink);
464 }
465 pub fn len(&self) -> usize {
466 self.sinks.lock().expect("sink mutex poisoned").len()
467 }
468 pub fn is_empty(&self) -> bool {
469 self.len() == 0
470 }
471}
472
473impl Default for MultiSink {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479impl AgentEventSink for MultiSink {
480 fn handle_event(&self, event: &AgentEvent) {
481 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
487 for sink in sinks {
488 sink.handle_event(event);
489 }
490 }
491}
492
493type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
494
495fn external_sinks() -> &'static ExternalSinkRegistry {
496 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
497 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
498}
499
500pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
501 let session_id = session_id.into();
502 let mut reg = external_sinks().write().expect("sink registry poisoned");
503 reg.entry(session_id).or_default().push(sink);
504}
505
506pub fn clear_session_sinks(session_id: &str) {
510 external_sinks()
511 .write()
512 .expect("sink registry poisoned")
513 .remove(session_id);
514}
515
516pub fn reset_all_sinks() {
517 external_sinks()
518 .write()
519 .expect("sink registry poisoned")
520 .clear();
521 crate::agent_sessions::reset_session_store();
522}
523
524pub fn emit_event(event: &AgentEvent) {
528 let sinks: Vec<Arc<dyn AgentEventSink>> = {
529 let reg = external_sinks().read().expect("sink registry poisoned");
530 reg.get(event.session_id()).cloned().unwrap_or_default()
531 };
532 for sink in sinks {
533 sink.handle_event(event);
534 }
535}
536
537fn now_ms() -> i64 {
538 std::time::SystemTime::now()
539 .duration_since(std::time::UNIX_EPOCH)
540 .map(|duration| duration.as_millis() as i64)
541 .unwrap_or(0)
542}
543
544pub fn session_external_sink_count(session_id: &str) -> usize {
545 external_sinks()
546 .read()
547 .expect("sink registry poisoned")
548 .get(session_id)
549 .map(|v| v.len())
550 .unwrap_or(0)
551}
552
553pub fn session_closure_subscriber_count(session_id: &str) -> usize {
554 crate::agent_sessions::subscriber_count(session_id)
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use std::sync::atomic::{AtomicUsize, Ordering};
561
562 struct CountingSink(Arc<AtomicUsize>);
563 impl AgentEventSink for CountingSink {
564 fn handle_event(&self, _event: &AgentEvent) {
565 self.0.fetch_add(1, Ordering::SeqCst);
566 }
567 }
568
569 #[test]
570 fn multi_sink_fans_out_in_order() {
571 let multi = MultiSink::new();
572 let a = Arc::new(AtomicUsize::new(0));
573 let b = Arc::new(AtomicUsize::new(0));
574 multi.push(Arc::new(CountingSink(a.clone())));
575 multi.push(Arc::new(CountingSink(b.clone())));
576 let event = AgentEvent::TurnStart {
577 session_id: "s1".into(),
578 iteration: 1,
579 };
580 multi.handle_event(&event);
581 assert_eq!(a.load(Ordering::SeqCst), 1);
582 assert_eq!(b.load(Ordering::SeqCst), 1);
583 }
584
585 #[test]
586 fn session_scoped_sink_routing() {
587 reset_all_sinks();
588 let a = Arc::new(AtomicUsize::new(0));
589 let b = Arc::new(AtomicUsize::new(0));
590 register_sink("session-a", Arc::new(CountingSink(a.clone())));
591 register_sink("session-b", Arc::new(CountingSink(b.clone())));
592 emit_event(&AgentEvent::TurnStart {
593 session_id: "session-a".into(),
594 iteration: 0,
595 });
596 assert_eq!(a.load(Ordering::SeqCst), 1);
597 assert_eq!(b.load(Ordering::SeqCst), 0);
598 emit_event(&AgentEvent::TurnEnd {
599 session_id: "session-b".into(),
600 iteration: 0,
601 turn_info: serde_json::json!({}),
602 });
603 assert_eq!(a.load(Ordering::SeqCst), 1);
604 assert_eq!(b.load(Ordering::SeqCst), 1);
605 clear_session_sinks("session-a");
606 assert_eq!(session_external_sink_count("session-a"), 0);
607 assert_eq!(session_external_sink_count("session-b"), 1);
608 reset_all_sinks();
609 }
610
611 #[test]
612 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
613 use std::io::{BufRead, BufReader};
614 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
615 std::fs::create_dir_all(&dir).unwrap();
616 let path = dir.join("event_log.jsonl");
617 let sink = JsonlEventSink::open(&path).unwrap();
618 for i in 0..5 {
619 sink.handle_event(&AgentEvent::TurnStart {
620 session_id: "s".into(),
621 iteration: i,
622 });
623 }
624 assert_eq!(sink.event_count(), 5);
625 sink.flush().unwrap();
626
627 let file = std::fs::File::open(&path).unwrap();
629 let mut last_idx: i64 = -1;
630 let mut last_ts: i64 = 0;
631 for line in BufReader::new(file).lines() {
632 let line = line.unwrap();
633 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
634 let idx = val["index"].as_i64().unwrap();
635 let ts = val["emitted_at_ms"].as_i64().unwrap();
636 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
637 assert!(ts >= last_ts, "timestamps must be non-decreasing");
638 last_idx = idx;
639 last_ts = ts;
640 assert_eq!(val["type"], "turn_start");
642 }
643 assert_eq!(last_idx, 4);
644 let _ = std::fs::remove_file(&path);
645 }
646
647 #[test]
648 fn tool_call_status_serde() {
649 assert_eq!(
650 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
651 "\"pending\""
652 );
653 assert_eq!(
654 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
655 "\"in_progress\""
656 );
657 assert_eq!(
658 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
659 "\"completed\""
660 );
661 assert_eq!(
662 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
663 "\"failed\""
664 );
665 }
666}