1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::HandoffArtifact;
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
35pub enum WorkerEvent {
36 WorkerSpawned,
37 WorkerCompleted,
38 WorkerFailed,
39 WorkerCancelled,
40}
41
42impl WorkerEvent {
43 pub fn as_status(self) -> &'static str {
44 match self {
45 Self::WorkerSpawned => "running",
46 Self::WorkerCompleted => "completed",
47 Self::WorkerFailed => "failed",
48 Self::WorkerCancelled => "cancelled",
49 }
50 }
51
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::WorkerSpawned => "WorkerSpawned",
55 Self::WorkerCompleted => "WorkerCompleted",
56 Self::WorkerFailed => "WorkerFailed",
57 Self::WorkerCancelled => "WorkerCancelled",
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum ToolCallStatus {
66 Pending,
68 InProgress,
70 Completed,
72 Failed,
74}
75
76#[derive(Clone, Debug, Serialize, Deserialize)]
79#[serde(tag = "type", rename_all = "snake_case")]
80pub enum AgentEvent {
81 AgentMessageChunk {
82 session_id: String,
83 content: String,
84 },
85 AgentThoughtChunk {
86 session_id: String,
87 content: String,
88 },
89 ToolCall {
90 session_id: String,
91 tool_call_id: String,
92 tool_name: String,
93 kind: Option<ToolKind>,
94 status: ToolCallStatus,
95 raw_input: serde_json::Value,
96 },
97 ToolCallUpdate {
98 session_id: String,
99 tool_call_id: String,
100 tool_name: String,
101 status: ToolCallStatus,
102 raw_output: Option<serde_json::Value>,
103 error: Option<String>,
104 },
105 Plan {
106 session_id: String,
107 plan: serde_json::Value,
108 },
109 TurnStart {
110 session_id: String,
111 iteration: usize,
112 },
113 TurnEnd {
114 session_id: String,
115 iteration: usize,
116 turn_info: serde_json::Value,
117 },
118 FeedbackInjected {
119 session_id: String,
120 kind: String,
121 content: String,
122 },
123 BudgetExhausted {
127 session_id: String,
128 max_iterations: usize,
129 },
130 LoopStuck {
134 session_id: String,
135 max_nudges: usize,
136 last_iteration: usize,
137 tail_excerpt: String,
138 },
139 DaemonWatchdogTripped {
144 session_id: String,
145 attempts: usize,
146 elapsed_ms: u64,
147 },
148 SkillActivated {
152 session_id: String,
153 skill_name: String,
154 iteration: usize,
155 reason: String,
156 },
157 SkillDeactivated {
160 session_id: String,
161 skill_name: String,
162 iteration: usize,
163 },
164 SkillScopeTools {
167 session_id: String,
168 skill_name: String,
169 allowed_tools: Vec<String>,
170 },
171 ToolSearchQuery {
179 session_id: String,
180 tool_use_id: String,
181 name: String,
182 query: serde_json::Value,
183 strategy: String,
184 mode: String,
185 },
186 ToolSearchResult {
190 session_id: String,
191 tool_use_id: String,
192 promoted: Vec<String>,
193 strategy: String,
194 mode: String,
195 },
196 TranscriptCompacted {
197 session_id: String,
198 mode: String,
199 strategy: String,
200 archived_messages: usize,
201 estimated_tokens_before: usize,
202 estimated_tokens_after: usize,
203 snapshot_asset_id: Option<String>,
204 },
205 Handoff {
206 session_id: String,
207 artifact_id: String,
208 handoff: Box<HandoffArtifact>,
209 },
210}
211
212impl AgentEvent {
213 pub fn session_id(&self) -> &str {
214 match self {
215 Self::AgentMessageChunk { session_id, .. }
216 | Self::AgentThoughtChunk { session_id, .. }
217 | Self::ToolCall { session_id, .. }
218 | Self::ToolCallUpdate { session_id, .. }
219 | Self::Plan { session_id, .. }
220 | Self::TurnStart { session_id, .. }
221 | Self::TurnEnd { session_id, .. }
222 | Self::FeedbackInjected { session_id, .. }
223 | Self::BudgetExhausted { session_id, .. }
224 | Self::LoopStuck { session_id, .. }
225 | Self::DaemonWatchdogTripped { session_id, .. }
226 | Self::SkillActivated { session_id, .. }
227 | Self::SkillDeactivated { session_id, .. }
228 | Self::SkillScopeTools { session_id, .. }
229 | Self::ToolSearchQuery { session_id, .. }
230 | Self::ToolSearchResult { session_id, .. }
231 | Self::TranscriptCompacted { session_id, .. }
232 | Self::Handoff { session_id, .. } => session_id,
233 }
234 }
235}
236
237pub trait AgentEventSink: Send + Sync {
240 fn handle_event(&self, event: &AgentEvent);
241}
242
243#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct PersistedAgentEvent {
251 pub index: u64,
255 pub emitted_at_ms: i64,
259 pub frame_depth: Option<u32>,
263 #[serde(flatten)]
265 pub event: AgentEvent,
266}
267
268pub struct JsonlEventSink {
273 state: Mutex<JsonlEventSinkState>,
274 base_path: std::path::PathBuf,
275}
276
277struct JsonlEventSinkState {
278 writer: std::io::BufWriter<std::fs::File>,
279 index: u64,
280 bytes_written: u64,
281 rotation: u32,
282}
283
284impl JsonlEventSink {
285 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
289
290 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
294 let base_path = base_path.into();
295 if let Some(parent) = base_path.parent() {
296 std::fs::create_dir_all(parent)?;
297 }
298 let file = std::fs::OpenOptions::new()
299 .create(true)
300 .truncate(true)
301 .write(true)
302 .open(&base_path)?;
303 Ok(Arc::new(Self {
304 state: Mutex::new(JsonlEventSinkState {
305 writer: std::io::BufWriter::new(file),
306 index: 0,
307 bytes_written: 0,
308 rotation: 0,
309 }),
310 base_path,
311 }))
312 }
313
314 pub fn flush(&self) -> std::io::Result<()> {
317 use std::io::Write as _;
318 self.state
319 .lock()
320 .expect("jsonl sink mutex poisoned")
321 .writer
322 .flush()
323 }
324
325 pub fn event_count(&self) -> u64 {
328 self.state.lock().expect("jsonl sink mutex poisoned").index
329 }
330
331 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
332 use std::io::Write as _;
333 if state.bytes_written < Self::ROTATE_BYTES {
334 return Ok(());
335 }
336 state.writer.flush()?;
337 state.rotation += 1;
338 let suffix = format!("-{:06}", state.rotation);
339 let rotated = self.base_path.with_file_name({
340 let stem = self
341 .base_path
342 .file_stem()
343 .and_then(|s| s.to_str())
344 .unwrap_or("event_log");
345 let ext = self
346 .base_path
347 .extension()
348 .and_then(|e| e.to_str())
349 .unwrap_or("jsonl");
350 format!("{stem}{suffix}.{ext}")
351 });
352 let file = std::fs::OpenOptions::new()
353 .create(true)
354 .truncate(true)
355 .write(true)
356 .open(&rotated)?;
357 state.writer = std::io::BufWriter::new(file);
358 state.bytes_written = 0;
359 Ok(())
360 }
361}
362
363pub struct EventLogSink {
368 log: Arc<AnyEventLog>,
369 topic: Topic,
370 session_id: String,
371}
372
373impl EventLogSink {
374 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
375 let session_id = session_id.into();
376 let topic = Topic::new(format!(
377 "observability.agent_events.{}",
378 crate::event_log::sanitize_topic_component(&session_id)
379 ))
380 .expect("session id should sanitize to a valid topic");
381 Arc::new(Self {
382 log,
383 topic,
384 session_id,
385 })
386 }
387}
388
389impl AgentEventSink for JsonlEventSink {
390 fn handle_event(&self, event: &AgentEvent) {
391 use std::io::Write as _;
392 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
393 let index = state.index;
394 state.index += 1;
395 let emitted_at_ms = std::time::SystemTime::now()
396 .duration_since(std::time::UNIX_EPOCH)
397 .map(|d| d.as_millis() as i64)
398 .unwrap_or(0);
399 let envelope = PersistedAgentEvent {
400 index,
401 emitted_at_ms,
402 frame_depth: None,
403 event: event.clone(),
404 };
405 if let Ok(line) = serde_json::to_string(&envelope) {
406 let _ = state.writer.write_all(line.as_bytes());
411 let _ = state.writer.write_all(b"\n");
412 state.bytes_written += line.len() as u64 + 1;
413 let _ = self.rotate_if_needed(&mut state);
414 }
415 }
416}
417
418impl AgentEventSink for EventLogSink {
419 fn handle_event(&self, event: &AgentEvent) {
420 let event_json = match serde_json::to_value(event) {
421 Ok(value) => value,
422 Err(_) => return,
423 };
424 let event_kind = event_json
425 .get("type")
426 .and_then(|value| value.as_str())
427 .unwrap_or("agent_event")
428 .to_string();
429 let payload = serde_json::json!({
430 "index_hint": now_ms(),
431 "session_id": self.session_id,
432 "event": event_json,
433 });
434 let mut headers = std::collections::BTreeMap::new();
435 headers.insert("session_id".to_string(), self.session_id.clone());
436 let log = self.log.clone();
437 let topic = self.topic.clone();
438 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
439 if let Ok(handle) = tokio::runtime::Handle::try_current() {
440 handle.spawn(async move {
441 let _ = log.append(&topic, record).await;
442 });
443 } else {
444 let _ = futures::executor::block_on(log.append(&topic, record));
445 }
446 }
447}
448
449impl Drop for JsonlEventSink {
450 fn drop(&mut self) {
451 if let Ok(mut state) = self.state.lock() {
452 use std::io::Write as _;
453 let _ = state.writer.flush();
454 }
455 }
456}
457
458pub struct MultiSink {
460 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
461}
462
463impl MultiSink {
464 pub fn new() -> Self {
465 Self {
466 sinks: Mutex::new(Vec::new()),
467 }
468 }
469 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
470 self.sinks.lock().expect("sink mutex poisoned").push(sink);
471 }
472 pub fn len(&self) -> usize {
473 self.sinks.lock().expect("sink mutex poisoned").len()
474 }
475 pub fn is_empty(&self) -> bool {
476 self.len() == 0
477 }
478}
479
480impl Default for MultiSink {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486impl AgentEventSink for MultiSink {
487 fn handle_event(&self, event: &AgentEvent) {
488 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
494 for sink in sinks {
495 sink.handle_event(event);
496 }
497 }
498}
499
500type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
501
502fn external_sinks() -> &'static ExternalSinkRegistry {
503 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
504 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
505}
506
507pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
508 let session_id = session_id.into();
509 let mut reg = external_sinks().write().expect("sink registry poisoned");
510 reg.entry(session_id).or_default().push(sink);
511}
512
513pub fn clear_session_sinks(session_id: &str) {
517 external_sinks()
518 .write()
519 .expect("sink registry poisoned")
520 .remove(session_id);
521}
522
523pub fn reset_all_sinks() {
524 external_sinks()
525 .write()
526 .expect("sink registry poisoned")
527 .clear();
528 crate::agent_sessions::reset_session_store();
529}
530
531pub fn emit_event(event: &AgentEvent) {
535 let sinks: Vec<Arc<dyn AgentEventSink>> = {
536 let reg = external_sinks().read().expect("sink registry poisoned");
537 reg.get(event.session_id()).cloned().unwrap_or_default()
538 };
539 for sink in sinks {
540 sink.handle_event(event);
541 }
542}
543
544fn now_ms() -> i64 {
545 std::time::SystemTime::now()
546 .duration_since(std::time::UNIX_EPOCH)
547 .map(|duration| duration.as_millis() as i64)
548 .unwrap_or(0)
549}
550
551pub fn session_external_sink_count(session_id: &str) -> usize {
552 external_sinks()
553 .read()
554 .expect("sink registry poisoned")
555 .get(session_id)
556 .map(|v| v.len())
557 .unwrap_or(0)
558}
559
560pub fn session_closure_subscriber_count(session_id: &str) -> usize {
561 crate::agent_sessions::subscriber_count(session_id)
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use std::sync::atomic::{AtomicUsize, Ordering};
568
569 struct CountingSink(Arc<AtomicUsize>);
570 impl AgentEventSink for CountingSink {
571 fn handle_event(&self, _event: &AgentEvent) {
572 self.0.fetch_add(1, Ordering::SeqCst);
573 }
574 }
575
576 #[test]
577 fn multi_sink_fans_out_in_order() {
578 let multi = MultiSink::new();
579 let a = Arc::new(AtomicUsize::new(0));
580 let b = Arc::new(AtomicUsize::new(0));
581 multi.push(Arc::new(CountingSink(a.clone())));
582 multi.push(Arc::new(CountingSink(b.clone())));
583 let event = AgentEvent::TurnStart {
584 session_id: "s1".into(),
585 iteration: 1,
586 };
587 multi.handle_event(&event);
588 assert_eq!(a.load(Ordering::SeqCst), 1);
589 assert_eq!(b.load(Ordering::SeqCst), 1);
590 }
591
592 #[test]
593 fn session_scoped_sink_routing() {
594 reset_all_sinks();
595 let a = Arc::new(AtomicUsize::new(0));
596 let b = Arc::new(AtomicUsize::new(0));
597 register_sink("session-a", Arc::new(CountingSink(a.clone())));
598 register_sink("session-b", Arc::new(CountingSink(b.clone())));
599 emit_event(&AgentEvent::TurnStart {
600 session_id: "session-a".into(),
601 iteration: 0,
602 });
603 assert_eq!(a.load(Ordering::SeqCst), 1);
604 assert_eq!(b.load(Ordering::SeqCst), 0);
605 emit_event(&AgentEvent::TurnEnd {
606 session_id: "session-b".into(),
607 iteration: 0,
608 turn_info: serde_json::json!({}),
609 });
610 assert_eq!(a.load(Ordering::SeqCst), 1);
611 assert_eq!(b.load(Ordering::SeqCst), 1);
612 clear_session_sinks("session-a");
613 assert_eq!(session_external_sink_count("session-a"), 0);
614 assert_eq!(session_external_sink_count("session-b"), 1);
615 reset_all_sinks();
616 }
617
618 #[test]
619 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
620 use std::io::{BufRead, BufReader};
621 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
622 std::fs::create_dir_all(&dir).unwrap();
623 let path = dir.join("event_log.jsonl");
624 let sink = JsonlEventSink::open(&path).unwrap();
625 for i in 0..5 {
626 sink.handle_event(&AgentEvent::TurnStart {
627 session_id: "s".into(),
628 iteration: i,
629 });
630 }
631 assert_eq!(sink.event_count(), 5);
632 sink.flush().unwrap();
633
634 let file = std::fs::File::open(&path).unwrap();
636 let mut last_idx: i64 = -1;
637 let mut last_ts: i64 = 0;
638 for line in BufReader::new(file).lines() {
639 let line = line.unwrap();
640 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
641 let idx = val["index"].as_i64().unwrap();
642 let ts = val["emitted_at_ms"].as_i64().unwrap();
643 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
644 assert!(ts >= last_ts, "timestamps must be non-decreasing");
645 last_idx = idx;
646 last_ts = ts;
647 assert_eq!(val["type"], "turn_start");
649 }
650 assert_eq!(last_idx, 4);
651 let _ = std::fs::remove_file(&path);
652 }
653
654 #[test]
655 fn tool_call_status_serde() {
656 assert_eq!(
657 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
658 "\"pending\""
659 );
660 assert_eq!(
661 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
662 "\"in_progress\""
663 );
664 assert_eq!(
665 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
666 "\"completed\""
667 );
668 assert_eq!(
669 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
670 "\"failed\""
671 );
672 }
673}