1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::HandoffArtifact;
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
46pub enum WorkerEvent {
47 WorkerSpawned,
48 WorkerCompleted,
49 WorkerFailed,
50 WorkerCancelled,
51}
52
53impl WorkerEvent {
54 pub fn as_status(self) -> &'static str {
55 match self {
56 Self::WorkerSpawned => "running",
57 Self::WorkerCompleted => "completed",
58 Self::WorkerFailed => "failed",
59 Self::WorkerCancelled => "cancelled",
60 }
61 }
62
63 pub fn as_str(self) -> &'static str {
64 match self {
65 Self::WorkerSpawned => "WorkerSpawned",
66 Self::WorkerCompleted => "WorkerCompleted",
67 Self::WorkerFailed => "WorkerFailed",
68 Self::WorkerCancelled => "WorkerCancelled",
69 }
70 }
71}
72
73#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum ToolCallStatus {
77 Pending,
79 InProgress,
81 Completed,
83 Failed,
85}
86
87#[derive(Clone, Debug, Serialize, Deserialize)]
90#[serde(tag = "type", rename_all = "snake_case")]
91pub enum AgentEvent {
92 AgentMessageChunk {
93 session_id: String,
94 content: String,
95 },
96 AgentThoughtChunk {
97 session_id: String,
98 content: String,
99 },
100 ToolCall {
101 session_id: String,
102 tool_call_id: String,
103 tool_name: String,
104 kind: Option<ToolKind>,
105 status: ToolCallStatus,
106 raw_input: serde_json::Value,
107 },
108 ToolCallUpdate {
109 session_id: String,
110 tool_call_id: String,
111 tool_name: String,
112 status: ToolCallStatus,
113 raw_output: Option<serde_json::Value>,
114 error: Option<String>,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
123 duration_ms: Option<u64>,
124 #[serde(default, skip_serializing_if = "Option::is_none")]
128 execution_duration_ms: Option<u64>,
129 },
130 Plan {
131 session_id: String,
132 plan: serde_json::Value,
133 },
134 TurnStart {
135 session_id: String,
136 iteration: usize,
137 },
138 TurnEnd {
139 session_id: String,
140 iteration: usize,
141 turn_info: serde_json::Value,
142 },
143 FeedbackInjected {
144 session_id: String,
145 kind: String,
146 content: String,
147 },
148 BudgetExhausted {
152 session_id: String,
153 max_iterations: usize,
154 },
155 LoopStuck {
159 session_id: String,
160 max_nudges: usize,
161 last_iteration: usize,
162 tail_excerpt: String,
163 },
164 DaemonWatchdogTripped {
169 session_id: String,
170 attempts: usize,
171 elapsed_ms: u64,
172 },
173 SkillActivated {
177 session_id: String,
178 skill_name: String,
179 iteration: usize,
180 reason: String,
181 },
182 SkillDeactivated {
185 session_id: String,
186 skill_name: String,
187 iteration: usize,
188 },
189 SkillScopeTools {
192 session_id: String,
193 skill_name: String,
194 allowed_tools: Vec<String>,
195 },
196 ToolSearchQuery {
204 session_id: String,
205 tool_use_id: String,
206 name: String,
207 query: serde_json::Value,
208 strategy: String,
209 mode: String,
210 },
211 ToolSearchResult {
215 session_id: String,
216 tool_use_id: String,
217 promoted: Vec<String>,
218 strategy: String,
219 mode: String,
220 },
221 TranscriptCompacted {
222 session_id: String,
223 mode: String,
224 strategy: String,
225 archived_messages: usize,
226 estimated_tokens_before: usize,
227 estimated_tokens_after: usize,
228 snapshot_asset_id: Option<String>,
229 },
230 Handoff {
231 session_id: String,
232 artifact_id: String,
233 handoff: Box<HandoffArtifact>,
234 },
235 FsWatch {
236 session_id: String,
237 subscription_id: String,
238 events: Vec<FsWatchEvent>,
239 },
240}
241
242impl AgentEvent {
243 pub fn session_id(&self) -> &str {
244 match self {
245 Self::AgentMessageChunk { session_id, .. }
246 | Self::AgentThoughtChunk { session_id, .. }
247 | Self::ToolCall { session_id, .. }
248 | Self::ToolCallUpdate { session_id, .. }
249 | Self::Plan { session_id, .. }
250 | Self::TurnStart { session_id, .. }
251 | Self::TurnEnd { session_id, .. }
252 | Self::FeedbackInjected { session_id, .. }
253 | Self::BudgetExhausted { session_id, .. }
254 | Self::LoopStuck { session_id, .. }
255 | Self::DaemonWatchdogTripped { session_id, .. }
256 | Self::SkillActivated { session_id, .. }
257 | Self::SkillDeactivated { session_id, .. }
258 | Self::SkillScopeTools { session_id, .. }
259 | Self::ToolSearchQuery { session_id, .. }
260 | Self::ToolSearchResult { session_id, .. }
261 | Self::TranscriptCompacted { session_id, .. }
262 | Self::Handoff { session_id, .. }
263 | Self::FsWatch { session_id, .. } => session_id,
264 }
265 }
266}
267
268pub trait AgentEventSink: Send + Sync {
271 fn handle_event(&self, event: &AgentEvent);
272}
273
274#[derive(Clone, Debug, Serialize, Deserialize)]
281pub struct PersistedAgentEvent {
282 pub index: u64,
286 pub emitted_at_ms: i64,
290 pub frame_depth: Option<u32>,
294 #[serde(flatten)]
296 pub event: AgentEvent,
297}
298
299pub struct JsonlEventSink {
304 state: Mutex<JsonlEventSinkState>,
305 base_path: std::path::PathBuf,
306}
307
308struct JsonlEventSinkState {
309 writer: std::io::BufWriter<std::fs::File>,
310 index: u64,
311 bytes_written: u64,
312 rotation: u32,
313}
314
315impl JsonlEventSink {
316 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
320
321 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
325 let base_path = base_path.into();
326 if let Some(parent) = base_path.parent() {
327 std::fs::create_dir_all(parent)?;
328 }
329 let file = std::fs::OpenOptions::new()
330 .create(true)
331 .truncate(true)
332 .write(true)
333 .open(&base_path)?;
334 Ok(Arc::new(Self {
335 state: Mutex::new(JsonlEventSinkState {
336 writer: std::io::BufWriter::new(file),
337 index: 0,
338 bytes_written: 0,
339 rotation: 0,
340 }),
341 base_path,
342 }))
343 }
344
345 pub fn flush(&self) -> std::io::Result<()> {
348 use std::io::Write as _;
349 self.state
350 .lock()
351 .expect("jsonl sink mutex poisoned")
352 .writer
353 .flush()
354 }
355
356 pub fn event_count(&self) -> u64 {
359 self.state.lock().expect("jsonl sink mutex poisoned").index
360 }
361
362 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
363 use std::io::Write as _;
364 if state.bytes_written < Self::ROTATE_BYTES {
365 return Ok(());
366 }
367 state.writer.flush()?;
368 state.rotation += 1;
369 let suffix = format!("-{:06}", state.rotation);
370 let rotated = self.base_path.with_file_name({
371 let stem = self
372 .base_path
373 .file_stem()
374 .and_then(|s| s.to_str())
375 .unwrap_or("event_log");
376 let ext = self
377 .base_path
378 .extension()
379 .and_then(|e| e.to_str())
380 .unwrap_or("jsonl");
381 format!("{stem}{suffix}.{ext}")
382 });
383 let file = std::fs::OpenOptions::new()
384 .create(true)
385 .truncate(true)
386 .write(true)
387 .open(&rotated)?;
388 state.writer = std::io::BufWriter::new(file);
389 state.bytes_written = 0;
390 Ok(())
391 }
392}
393
394pub struct EventLogSink {
399 log: Arc<AnyEventLog>,
400 topic: Topic,
401 session_id: String,
402}
403
404impl EventLogSink {
405 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
406 let session_id = session_id.into();
407 let topic = Topic::new(format!(
408 "observability.agent_events.{}",
409 crate::event_log::sanitize_topic_component(&session_id)
410 ))
411 .expect("session id should sanitize to a valid topic");
412 Arc::new(Self {
413 log,
414 topic,
415 session_id,
416 })
417 }
418}
419
420impl AgentEventSink for JsonlEventSink {
421 fn handle_event(&self, event: &AgentEvent) {
422 use std::io::Write as _;
423 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
424 let index = state.index;
425 state.index += 1;
426 let emitted_at_ms = std::time::SystemTime::now()
427 .duration_since(std::time::UNIX_EPOCH)
428 .map(|d| d.as_millis() as i64)
429 .unwrap_or(0);
430 let envelope = PersistedAgentEvent {
431 index,
432 emitted_at_ms,
433 frame_depth: None,
434 event: event.clone(),
435 };
436 if let Ok(line) = serde_json::to_string(&envelope) {
437 let _ = state.writer.write_all(line.as_bytes());
442 let _ = state.writer.write_all(b"\n");
443 state.bytes_written += line.len() as u64 + 1;
444 let _ = self.rotate_if_needed(&mut state);
445 }
446 }
447}
448
449impl AgentEventSink for EventLogSink {
450 fn handle_event(&self, event: &AgentEvent) {
451 let event_json = match serde_json::to_value(event) {
452 Ok(value) => value,
453 Err(_) => return,
454 };
455 let event_kind = event_json
456 .get("type")
457 .and_then(|value| value.as_str())
458 .unwrap_or("agent_event")
459 .to_string();
460 let payload = serde_json::json!({
461 "index_hint": now_ms(),
462 "session_id": self.session_id,
463 "event": event_json,
464 });
465 let mut headers = std::collections::BTreeMap::new();
466 headers.insert("session_id".to_string(), self.session_id.clone());
467 let log = self.log.clone();
468 let topic = self.topic.clone();
469 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
470 if let Ok(handle) = tokio::runtime::Handle::try_current() {
471 handle.spawn(async move {
472 let _ = log.append(&topic, record).await;
473 });
474 } else {
475 let _ = futures::executor::block_on(log.append(&topic, record));
476 }
477 }
478}
479
480impl Drop for JsonlEventSink {
481 fn drop(&mut self) {
482 if let Ok(mut state) = self.state.lock() {
483 use std::io::Write as _;
484 let _ = state.writer.flush();
485 }
486 }
487}
488
489pub struct MultiSink {
491 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
492}
493
494impl MultiSink {
495 pub fn new() -> Self {
496 Self {
497 sinks: Mutex::new(Vec::new()),
498 }
499 }
500 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
501 self.sinks.lock().expect("sink mutex poisoned").push(sink);
502 }
503 pub fn len(&self) -> usize {
504 self.sinks.lock().expect("sink mutex poisoned").len()
505 }
506 pub fn is_empty(&self) -> bool {
507 self.len() == 0
508 }
509}
510
511impl Default for MultiSink {
512 fn default() -> Self {
513 Self::new()
514 }
515}
516
517impl AgentEventSink for MultiSink {
518 fn handle_event(&self, event: &AgentEvent) {
519 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
525 for sink in sinks {
526 sink.handle_event(event);
527 }
528 }
529}
530
531type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
532
533fn external_sinks() -> &'static ExternalSinkRegistry {
534 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
535 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
536}
537
538pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
539 let session_id = session_id.into();
540 let mut reg = external_sinks().write().expect("sink registry poisoned");
541 reg.entry(session_id).or_default().push(sink);
542}
543
544pub fn clear_session_sinks(session_id: &str) {
548 external_sinks()
549 .write()
550 .expect("sink registry poisoned")
551 .remove(session_id);
552}
553
554pub fn reset_all_sinks() {
555 external_sinks()
556 .write()
557 .expect("sink registry poisoned")
558 .clear();
559 crate::agent_sessions::reset_session_store();
560}
561
562pub fn emit_event(event: &AgentEvent) {
566 let sinks: Vec<Arc<dyn AgentEventSink>> = {
567 let reg = external_sinks().read().expect("sink registry poisoned");
568 reg.get(event.session_id()).cloned().unwrap_or_default()
569 };
570 for sink in sinks {
571 sink.handle_event(event);
572 }
573}
574
575fn now_ms() -> i64 {
576 std::time::SystemTime::now()
577 .duration_since(std::time::UNIX_EPOCH)
578 .map(|duration| duration.as_millis() as i64)
579 .unwrap_or(0)
580}
581
582pub fn session_external_sink_count(session_id: &str) -> usize {
583 external_sinks()
584 .read()
585 .expect("sink registry poisoned")
586 .get(session_id)
587 .map(|v| v.len())
588 .unwrap_or(0)
589}
590
591pub fn session_closure_subscriber_count(session_id: &str) -> usize {
592 crate::agent_sessions::subscriber_count(session_id)
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use std::sync::atomic::{AtomicUsize, Ordering};
599
600 struct CountingSink(Arc<AtomicUsize>);
601 impl AgentEventSink for CountingSink {
602 fn handle_event(&self, _event: &AgentEvent) {
603 self.0.fetch_add(1, Ordering::SeqCst);
604 }
605 }
606
607 #[test]
608 fn multi_sink_fans_out_in_order() {
609 let multi = MultiSink::new();
610 let a = Arc::new(AtomicUsize::new(0));
611 let b = Arc::new(AtomicUsize::new(0));
612 multi.push(Arc::new(CountingSink(a.clone())));
613 multi.push(Arc::new(CountingSink(b.clone())));
614 let event = AgentEvent::TurnStart {
615 session_id: "s1".into(),
616 iteration: 1,
617 };
618 multi.handle_event(&event);
619 assert_eq!(a.load(Ordering::SeqCst), 1);
620 assert_eq!(b.load(Ordering::SeqCst), 1);
621 }
622
623 #[test]
624 fn session_scoped_sink_routing() {
625 reset_all_sinks();
626 let a = Arc::new(AtomicUsize::new(0));
627 let b = Arc::new(AtomicUsize::new(0));
628 register_sink("session-a", Arc::new(CountingSink(a.clone())));
629 register_sink("session-b", Arc::new(CountingSink(b.clone())));
630 emit_event(&AgentEvent::TurnStart {
631 session_id: "session-a".into(),
632 iteration: 0,
633 });
634 assert_eq!(a.load(Ordering::SeqCst), 1);
635 assert_eq!(b.load(Ordering::SeqCst), 0);
636 emit_event(&AgentEvent::TurnEnd {
637 session_id: "session-b".into(),
638 iteration: 0,
639 turn_info: serde_json::json!({}),
640 });
641 assert_eq!(a.load(Ordering::SeqCst), 1);
642 assert_eq!(b.load(Ordering::SeqCst), 1);
643 clear_session_sinks("session-a");
644 assert_eq!(session_external_sink_count("session-a"), 0);
645 assert_eq!(session_external_sink_count("session-b"), 1);
646 reset_all_sinks();
647 }
648
649 #[test]
650 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
651 use std::io::{BufRead, BufReader};
652 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
653 std::fs::create_dir_all(&dir).unwrap();
654 let path = dir.join("event_log.jsonl");
655 let sink = JsonlEventSink::open(&path).unwrap();
656 for i in 0..5 {
657 sink.handle_event(&AgentEvent::TurnStart {
658 session_id: "s".into(),
659 iteration: i,
660 });
661 }
662 assert_eq!(sink.event_count(), 5);
663 sink.flush().unwrap();
664
665 let file = std::fs::File::open(&path).unwrap();
667 let mut last_idx: i64 = -1;
668 let mut last_ts: i64 = 0;
669 for line in BufReader::new(file).lines() {
670 let line = line.unwrap();
671 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
672 let idx = val["index"].as_i64().unwrap();
673 let ts = val["emitted_at_ms"].as_i64().unwrap();
674 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
675 assert!(ts >= last_ts, "timestamps must be non-decreasing");
676 last_idx = idx;
677 last_ts = ts;
678 assert_eq!(val["type"], "turn_start");
680 }
681 assert_eq!(last_idx, 4);
682 let _ = std::fs::remove_file(&path);
683 }
684
685 #[test]
686 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
687 let terminal = AgentEvent::ToolCallUpdate {
692 session_id: "s".into(),
693 tool_call_id: "tc-1".into(),
694 tool_name: "read".into(),
695 status: ToolCallStatus::Completed,
696 raw_output: None,
697 error: None,
698 duration_ms: Some(42),
699 execution_duration_ms: Some(7),
700 };
701 let value = serde_json::to_value(&terminal).unwrap();
702 assert_eq!(value["duration_ms"], serde_json::json!(42));
703 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
704
705 let intermediate = AgentEvent::ToolCallUpdate {
709 session_id: "s".into(),
710 tool_call_id: "tc-1".into(),
711 tool_name: "read".into(),
712 status: ToolCallStatus::InProgress,
713 raw_output: None,
714 error: None,
715 duration_ms: None,
716 execution_duration_ms: None,
717 };
718 let value = serde_json::to_value(&intermediate).unwrap();
719 let object = value.as_object().expect("update serializes as object");
720 assert!(
721 !object.contains_key("duration_ms"),
722 "duration_ms must be omitted when None: {value}"
723 );
724 assert!(
725 !object.contains_key("execution_duration_ms"),
726 "execution_duration_ms must be omitted when None: {value}"
727 );
728 }
729
730 #[test]
731 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
732 let raw = serde_json::json!({
735 "type": "tool_call_update",
736 "session_id": "s",
737 "tool_call_id": "tc-1",
738 "tool_name": "read",
739 "status": "completed",
740 "raw_output": null,
741 "error": null,
742 });
743 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
744 match event {
745 AgentEvent::ToolCallUpdate {
746 duration_ms,
747 execution_duration_ms,
748 ..
749 } => {
750 assert!(duration_ms.is_none());
751 assert!(execution_duration_ms.is_none());
752 }
753 other => panic!("expected ToolCallUpdate, got {other:?}"),
754 }
755 }
756
757 #[test]
758 fn tool_call_status_serde() {
759 assert_eq!(
760 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
761 "\"pending\""
762 );
763 assert_eq!(
764 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
765 "\"in_progress\""
766 );
767 assert_eq!(
768 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
769 "\"completed\""
770 );
771 assert_eq!(
772 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
773 "\"failed\""
774 );
775 }
776}