1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117impl ToolCallStatus {
118 pub const ALL: [Self; 4] = [
119 Self::Pending,
120 Self::InProgress,
121 Self::Completed,
122 Self::Failed,
123 ];
124
125 pub fn as_str(self) -> &'static str {
126 match self {
127 Self::Pending => "pending",
128 Self::InProgress => "in_progress",
129 Self::Completed => "completed",
130 Self::Failed => "failed",
131 }
132 }
133}
134
135#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144 SchemaValidation,
147 ToolError,
150 McpServerError,
152 HostBridgeError,
154 PermissionDenied,
157 RejectedLoop,
160 ParseAborted,
168 Timeout,
170 Network,
172 Cancelled,
174 Unknown,
176}
177
178impl ToolCallErrorCategory {
179 pub const ALL: [Self; 11] = [
180 Self::SchemaValidation,
181 Self::ToolError,
182 Self::McpServerError,
183 Self::HostBridgeError,
184 Self::PermissionDenied,
185 Self::RejectedLoop,
186 Self::ParseAborted,
187 Self::Timeout,
188 Self::Network,
189 Self::Cancelled,
190 Self::Unknown,
191 ];
192
193 pub fn as_str(self) -> &'static str {
194 match self {
195 Self::SchemaValidation => "schema_validation",
196 Self::ToolError => "tool_error",
197 Self::McpServerError => "mcp_server_error",
198 Self::HostBridgeError => "host_bridge_error",
199 Self::PermissionDenied => "permission_denied",
200 Self::RejectedLoop => "rejected_loop",
201 Self::ParseAborted => "parse_aborted",
202 Self::Timeout => "timeout",
203 Self::Network => "network",
204 Self::Cancelled => "cancelled",
205 Self::Unknown => "unknown",
206 }
207 }
208
209 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216 use crate::value::ErrorCategory as Internal;
217 match category {
218 Internal::Timeout => Self::Timeout,
219 Internal::RateLimit
220 | Internal::Overloaded
221 | Internal::ServerError
222 | Internal::TransientNetwork => Self::Network,
223 Internal::SchemaValidation => Self::SchemaValidation,
224 Internal::ToolError => Self::ToolError,
225 Internal::ToolRejected => Self::PermissionDenied,
226 Internal::Cancelled => Self::Cancelled,
227 Internal::Auth
228 | Internal::EgressBlocked
229 | Internal::NotFound
230 | Internal::CircuitOpen
231 | Internal::BudgetExceeded
232 | Internal::Generic => Self::HostBridgeError,
233 }
234 }
235}
236
237#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250 HarnBuiltin,
253 HostBridge,
256 McpServer { server_name: String },
260 ProviderNative,
265}
266
267#[derive(Clone, Debug, Serialize, Deserialize)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum AgentEvent {
272 AgentMessageChunk {
273 session_id: String,
274 content: String,
275 },
276 AgentThoughtChunk {
277 session_id: String,
278 content: String,
279 },
280 ToolCall {
281 session_id: String,
282 tool_call_id: String,
283 tool_name: String,
284 kind: Option<ToolKind>,
285 status: ToolCallStatus,
286 raw_input: serde_json::Value,
287 #[serde(default, skip_serializing_if = "Option::is_none")]
300 parsing: Option<bool>,
301 #[serde(default, skip_serializing_if = "Option::is_none")]
305 audit: Option<MutationSessionRecord>,
306 },
307 ToolCallUpdate {
308 session_id: String,
309 tool_call_id: String,
310 tool_name: String,
311 status: ToolCallStatus,
312 raw_output: Option<serde_json::Value>,
313 error: Option<String>,
314 #[serde(default, skip_serializing_if = "Option::is_none")]
322 duration_ms: Option<u64>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
327 execution_duration_ms: Option<u64>,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
334 error_category: Option<ToolCallErrorCategory>,
335 #[serde(default, skip_serializing_if = "Option::is_none")]
340 executor: Option<ToolExecutor>,
341 #[serde(default, skip_serializing_if = "Option::is_none")]
350 parsing: Option<bool>,
351 #[serde(default, skip_serializing_if = "Option::is_none")]
359 raw_input: Option<serde_json::Value>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
364 raw_input_partial: Option<String>,
365 #[serde(default, skip_serializing_if = "Option::is_none")]
370 audit: Option<MutationSessionRecord>,
371 },
372 Plan {
373 session_id: String,
374 plan: serde_json::Value,
375 },
376 TurnStart {
377 session_id: String,
378 iteration: usize,
379 },
380 TurnEnd {
381 session_id: String,
382 iteration: usize,
383 turn_info: serde_json::Value,
384 },
385 JudgeDecision {
386 session_id: String,
387 iteration: usize,
388 verdict: String,
389 reasoning: String,
390 next_step: Option<String>,
391 judge_duration_ms: u64,
392 },
393 TypedCheckpoint {
394 session_id: String,
395 checkpoint: serde_json::Value,
396 },
397 FeedbackInjected {
398 session_id: String,
399 kind: String,
400 content: String,
401 },
402 BudgetExhausted {
406 session_id: String,
407 max_iterations: usize,
408 },
409 LoopStuck {
413 session_id: String,
414 max_nudges: usize,
415 last_iteration: usize,
416 tail_excerpt: String,
417 },
418 DaemonWatchdogTripped {
423 session_id: String,
424 attempts: usize,
425 elapsed_ms: u64,
426 },
427 SkillActivated {
431 session_id: String,
432 skill_name: String,
433 iteration: usize,
434 reason: String,
435 },
436 SkillDeactivated {
439 session_id: String,
440 skill_name: String,
441 iteration: usize,
442 },
443 SkillScopeTools {
446 session_id: String,
447 skill_name: String,
448 allowed_tools: Vec<String>,
449 },
450 ToolSearchQuery {
458 session_id: String,
459 tool_use_id: String,
460 name: String,
461 query: serde_json::Value,
462 strategy: String,
463 mode: String,
464 },
465 ToolSearchResult {
469 session_id: String,
470 tool_use_id: String,
471 promoted: Vec<String>,
472 strategy: String,
473 mode: String,
474 },
475 TranscriptCompacted {
476 session_id: String,
477 mode: String,
478 strategy: String,
479 archived_messages: usize,
480 estimated_tokens_before: usize,
481 estimated_tokens_after: usize,
482 snapshot_asset_id: Option<String>,
483 },
484 Handoff {
485 session_id: String,
486 artifact_id: String,
487 handoff: Box<HandoffArtifact>,
488 },
489 FsWatch {
490 session_id: String,
491 subscription_id: String,
492 events: Vec<FsWatchEvent>,
493 },
494 WorkerUpdate {
510 session_id: String,
511 worker_id: String,
512 worker_name: String,
513 worker_task: String,
514 worker_mode: String,
515 event: WorkerEvent,
516 status: String,
517 metadata: serde_json::Value,
518 audit: Option<serde_json::Value>,
519 },
520 HitlRequested {
528 session_id: String,
529 request_id: String,
530 kind: String,
531 payload: serde_json::Value,
532 },
533 HitlResolved {
539 session_id: String,
540 request_id: String,
541 kind: String,
542 outcome: String,
543 },
544 LoopControlDecision {
551 session_id: String,
552 iteration: usize,
553 action: String,
554 old_limit: usize,
555 new_limit: usize,
556 reason: String,
557 status: String,
558 },
559 AgentLoopStallWarning {
564 session_id: String,
565 warning: serde_json::Value,
566 },
567 ToolCallAudit {
580 session_id: String,
581 tool_call_id: String,
582 tool_name: String,
583 audit: serde_json::Value,
584 },
585}
586
587impl AgentEvent {
588 pub fn session_id(&self) -> &str {
589 match self {
590 Self::AgentMessageChunk { session_id, .. }
591 | Self::AgentThoughtChunk { session_id, .. }
592 | Self::ToolCall { session_id, .. }
593 | Self::ToolCallUpdate { session_id, .. }
594 | Self::Plan { session_id, .. }
595 | Self::TurnStart { session_id, .. }
596 | Self::TurnEnd { session_id, .. }
597 | Self::JudgeDecision { session_id, .. }
598 | Self::TypedCheckpoint { session_id, .. }
599 | Self::FeedbackInjected { session_id, .. }
600 | Self::BudgetExhausted { session_id, .. }
601 | Self::LoopStuck { session_id, .. }
602 | Self::DaemonWatchdogTripped { session_id, .. }
603 | Self::SkillActivated { session_id, .. }
604 | Self::SkillDeactivated { session_id, .. }
605 | Self::SkillScopeTools { session_id, .. }
606 | Self::ToolSearchQuery { session_id, .. }
607 | Self::ToolSearchResult { session_id, .. }
608 | Self::TranscriptCompacted { session_id, .. }
609 | Self::Handoff { session_id, .. }
610 | Self::FsWatch { session_id, .. }
611 | Self::WorkerUpdate { session_id, .. }
612 | Self::HitlRequested { session_id, .. }
613 | Self::HitlResolved { session_id, .. }
614 | Self::LoopControlDecision { session_id, .. }
615 | Self::AgentLoopStallWarning { session_id, .. }
616 | Self::ToolCallAudit { session_id, .. } => session_id,
617 }
618 }
619}
620
621pub trait AgentEventSink: Send + Sync {
624 fn handle_event(&self, event: &AgentEvent);
625}
626
627#[derive(Clone, Debug, Serialize, Deserialize)]
634pub struct PersistedAgentEvent {
635 pub index: u64,
639 pub emitted_at_ms: i64,
643 pub frame_depth: Option<u32>,
647 #[serde(flatten)]
649 pub event: AgentEvent,
650}
651
652pub struct JsonlEventSink {
657 state: Mutex<JsonlEventSinkState>,
658 base_path: std::path::PathBuf,
659}
660
661struct JsonlEventSinkState {
662 writer: std::io::BufWriter<std::fs::File>,
663 index: u64,
664 bytes_written: u64,
665 rotation: u32,
666}
667
668impl JsonlEventSink {
669 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
673
674 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
678 let base_path = base_path.into();
679 if let Some(parent) = base_path.parent() {
680 std::fs::create_dir_all(parent)?;
681 }
682 let file = std::fs::OpenOptions::new()
683 .create(true)
684 .truncate(true)
685 .write(true)
686 .open(&base_path)?;
687 Ok(Arc::new(Self {
688 state: Mutex::new(JsonlEventSinkState {
689 writer: std::io::BufWriter::new(file),
690 index: 0,
691 bytes_written: 0,
692 rotation: 0,
693 }),
694 base_path,
695 }))
696 }
697
698 pub fn flush(&self) -> std::io::Result<()> {
701 use std::io::Write as _;
702 self.state
703 .lock()
704 .expect("jsonl sink mutex poisoned")
705 .writer
706 .flush()
707 }
708
709 pub fn event_count(&self) -> u64 {
712 self.state.lock().expect("jsonl sink mutex poisoned").index
713 }
714
715 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
716 use std::io::Write as _;
717 if state.bytes_written < Self::ROTATE_BYTES {
718 return Ok(());
719 }
720 state.writer.flush()?;
721 state.rotation += 1;
722 let suffix = format!("-{:06}", state.rotation);
723 let rotated = self.base_path.with_file_name({
724 let stem = self
725 .base_path
726 .file_stem()
727 .and_then(|s| s.to_str())
728 .unwrap_or("event_log");
729 let ext = self
730 .base_path
731 .extension()
732 .and_then(|e| e.to_str())
733 .unwrap_or("jsonl");
734 format!("{stem}{suffix}.{ext}")
735 });
736 let file = std::fs::OpenOptions::new()
737 .create(true)
738 .truncate(true)
739 .write(true)
740 .open(&rotated)?;
741 state.writer = std::io::BufWriter::new(file);
742 state.bytes_written = 0;
743 Ok(())
744 }
745}
746
747pub struct EventLogSink {
752 log: Arc<AnyEventLog>,
753 topic: Topic,
754 session_id: String,
755}
756
757impl EventLogSink {
758 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
759 let session_id = session_id.into();
760 let topic = Topic::new(format!(
761 "observability.agent_events.{}",
762 crate::event_log::sanitize_topic_component(&session_id)
763 ))
764 .expect("session id should sanitize to a valid topic");
765 Arc::new(Self {
766 log,
767 topic,
768 session_id,
769 })
770 }
771}
772
773impl AgentEventSink for JsonlEventSink {
774 fn handle_event(&self, event: &AgentEvent) {
775 use std::io::Write as _;
776 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
777 let index = state.index;
778 state.index += 1;
779 let emitted_at_ms = std::time::SystemTime::now()
780 .duration_since(std::time::UNIX_EPOCH)
781 .map(|d| d.as_millis() as i64)
782 .unwrap_or(0);
783 let envelope = PersistedAgentEvent {
784 index,
785 emitted_at_ms,
786 frame_depth: None,
787 event: event.clone(),
788 };
789 if let Ok(line) = serde_json::to_string(&envelope) {
790 let _ = state.writer.write_all(line.as_bytes());
795 let _ = state.writer.write_all(b"\n");
796 state.bytes_written += line.len() as u64 + 1;
797 let _ = self.rotate_if_needed(&mut state);
798 }
799 }
800}
801
802impl AgentEventSink for EventLogSink {
803 fn handle_event(&self, event: &AgentEvent) {
804 let event_json = match serde_json::to_value(event) {
805 Ok(value) => value,
806 Err(_) => return,
807 };
808 let event_kind = event_json
809 .get("type")
810 .and_then(|value| value.as_str())
811 .unwrap_or("agent_event")
812 .to_string();
813 let payload = serde_json::json!({
814 "index_hint": now_ms(),
815 "session_id": self.session_id,
816 "event": event_json,
817 });
818 let mut headers = std::collections::BTreeMap::new();
819 headers.insert("session_id".to_string(), self.session_id.clone());
820 let log = self.log.clone();
821 let topic = self.topic.clone();
822 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
823 if let Ok(handle) = tokio::runtime::Handle::try_current() {
824 handle.spawn(async move {
825 let _ = log.append(&topic, record).await;
826 });
827 } else {
828 let _ = futures::executor::block_on(log.append(&topic, record));
829 }
830 }
831}
832
833impl Drop for JsonlEventSink {
834 fn drop(&mut self) {
835 if let Ok(mut state) = self.state.lock() {
836 use std::io::Write as _;
837 let _ = state.writer.flush();
838 }
839 }
840}
841
842pub struct MultiSink {
844 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
845}
846
847impl MultiSink {
848 pub fn new() -> Self {
849 Self {
850 sinks: Mutex::new(Vec::new()),
851 }
852 }
853 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
854 self.sinks.lock().expect("sink mutex poisoned").push(sink);
855 }
856 pub fn len(&self) -> usize {
857 self.sinks.lock().expect("sink mutex poisoned").len()
858 }
859 pub fn is_empty(&self) -> bool {
860 self.len() == 0
861 }
862}
863
864impl Default for MultiSink {
865 fn default() -> Self {
866 Self::new()
867 }
868}
869
870impl AgentEventSink for MultiSink {
871 fn handle_event(&self, event: &AgentEvent) {
872 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
878 for sink in sinks {
879 sink.handle_event(event);
880 }
881 }
882}
883
884#[cfg(test)]
885#[derive(Clone)]
886struct RegisteredSink {
887 owner: std::thread::ThreadId,
888 sink: Arc<dyn AgentEventSink>,
889}
890
891#[cfg(not(test))]
892type RegisteredSink = Arc<dyn AgentEventSink>;
893
894type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
895
896fn external_sinks() -> &'static ExternalSinkRegistry {
897 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
898 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
899}
900
901pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
902 let session_id = session_id.into();
903 let mut reg = external_sinks().write().expect("sink registry poisoned");
904 #[cfg(test)]
905 let sink = RegisteredSink {
906 owner: std::thread::current().id(),
907 sink,
908 };
909 reg.entry(session_id).or_default().push(sink);
910}
911
912pub fn clear_session_sinks(session_id: &str) {
916 #[cfg(test)]
917 {
918 let owner = std::thread::current().id();
919 let mut reg = external_sinks().write().expect("sink registry poisoned");
920 if let Some(sinks) = reg.get_mut(session_id) {
921 sinks.retain(|sink| sink.owner != owner);
922 if sinks.is_empty() {
923 reg.remove(session_id);
924 }
925 }
926 }
927 #[cfg(not(test))]
928 {
929 external_sinks()
930 .write()
931 .expect("sink registry poisoned")
932 .remove(session_id);
933 }
934}
935
936pub fn reset_all_sinks() {
937 #[cfg(test)]
938 {
939 let owner = std::thread::current().id();
940 let mut reg = external_sinks().write().expect("sink registry poisoned");
941 reg.retain(|_, sinks| {
942 sinks.retain(|sink| sink.owner != owner);
943 !sinks.is_empty()
944 });
945 crate::agent_sessions::reset_session_store();
946 }
947 #[cfg(not(test))]
948 {
949 external_sinks()
950 .write()
951 .expect("sink registry poisoned")
952 .clear();
953 crate::agent_sessions::reset_session_store();
954 }
955}
956
957pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
964 if source_session_id.is_empty() || target_session_id.is_empty() {
965 return;
966 }
967 if source_session_id == target_session_id {
968 return;
969 }
970 let mut reg = external_sinks().write().expect("sink registry poisoned");
971 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
972 return;
973 };
974 let target = reg.entry(target_session_id.to_string()).or_default();
975 #[cfg(test)]
976 {
977 for source in source_sinks {
978 let already_present = target
979 .iter()
980 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
981 if !already_present {
982 target.push(source);
983 }
984 }
985 }
986 #[cfg(not(test))]
987 {
988 for source in source_sinks {
989 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
990 if !already_present {
991 target.push(source);
992 }
993 }
994 }
995}
996
997pub fn emit_event(event: &AgentEvent) {
1001 let sinks: Vec<Arc<dyn AgentEventSink>> = {
1002 let reg = external_sinks().read().expect("sink registry poisoned");
1003 #[cfg(test)]
1004 {
1005 let owner = std::thread::current().id();
1006 reg.get(event.session_id())
1007 .map(|sinks| {
1008 sinks
1009 .iter()
1010 .filter(|sink| sink.owner == owner)
1011 .map(|sink| sink.sink.clone())
1012 .collect()
1013 })
1014 .unwrap_or_default()
1015 }
1016 #[cfg(not(test))]
1017 {
1018 reg.get(event.session_id()).cloned().unwrap_or_default()
1019 }
1020 };
1021 for sink in sinks {
1022 sink.handle_event(event);
1023 }
1024}
1025
1026fn now_ms() -> i64 {
1027 std::time::SystemTime::now()
1028 .duration_since(std::time::UNIX_EPOCH)
1029 .map(|duration| duration.as_millis() as i64)
1030 .unwrap_or(0)
1031}
1032
1033pub fn session_external_sink_count(session_id: &str) -> usize {
1034 #[cfg(test)]
1035 {
1036 let owner = std::thread::current().id();
1037 return external_sinks()
1038 .read()
1039 .expect("sink registry poisoned")
1040 .get(session_id)
1041 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1042 .unwrap_or(0);
1043 }
1044 #[cfg(not(test))]
1045 {
1046 external_sinks()
1047 .read()
1048 .expect("sink registry poisoned")
1049 .get(session_id)
1050 .map(|v| v.len())
1051 .unwrap_or(0)
1052 }
1053}
1054
1055pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1056 crate::agent_sessions::subscriber_count(session_id)
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use super::*;
1062 use std::sync::atomic::{AtomicUsize, Ordering};
1063
1064 struct CountingSink(Arc<AtomicUsize>);
1065 impl AgentEventSink for CountingSink {
1066 fn handle_event(&self, _event: &AgentEvent) {
1067 self.0.fetch_add(1, Ordering::SeqCst);
1068 }
1069 }
1070
1071 #[test]
1072 fn multi_sink_fans_out_in_order() {
1073 let multi = MultiSink::new();
1074 let a = Arc::new(AtomicUsize::new(0));
1075 let b = Arc::new(AtomicUsize::new(0));
1076 multi.push(Arc::new(CountingSink(a.clone())));
1077 multi.push(Arc::new(CountingSink(b.clone())));
1078 let event = AgentEvent::TurnStart {
1079 session_id: "s1".into(),
1080 iteration: 1,
1081 };
1082 multi.handle_event(&event);
1083 assert_eq!(a.load(Ordering::SeqCst), 1);
1084 assert_eq!(b.load(Ordering::SeqCst), 1);
1085 }
1086
1087 #[test]
1088 fn session_scoped_sink_routing() {
1089 reset_all_sinks();
1090 let a = Arc::new(AtomicUsize::new(0));
1091 let b = Arc::new(AtomicUsize::new(0));
1092 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1093 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1094 emit_event(&AgentEvent::TurnStart {
1095 session_id: "session-a".into(),
1096 iteration: 0,
1097 });
1098 assert_eq!(a.load(Ordering::SeqCst), 1);
1099 assert_eq!(b.load(Ordering::SeqCst), 0);
1100 emit_event(&AgentEvent::TurnEnd {
1101 session_id: "session-b".into(),
1102 iteration: 0,
1103 turn_info: serde_json::json!({}),
1104 });
1105 assert_eq!(a.load(Ordering::SeqCst), 1);
1106 assert_eq!(b.load(Ordering::SeqCst), 1);
1107 clear_session_sinks("session-a");
1108 assert_eq!(session_external_sink_count("session-a"), 0);
1109 assert_eq!(session_external_sink_count("session-b"), 1);
1110 reset_all_sinks();
1111 }
1112
1113 #[test]
1114 fn newly_opened_child_session_inherits_current_external_sinks() {
1115 reset_all_sinks();
1116 let delivered = Arc::new(AtomicUsize::new(0));
1117 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1118 {
1119 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1120 let inner = crate::agent_sessions::open_or_create(None);
1121 assert_ne!(inner, "outer-session");
1122 emit_event(&AgentEvent::TurnStart {
1123 session_id: inner,
1124 iteration: 0,
1125 });
1126 }
1127 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1128 reset_all_sinks();
1129 }
1130
1131 #[test]
1132 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1133 use std::io::{BufRead, BufReader};
1134 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1135 std::fs::create_dir_all(&dir).unwrap();
1136 let path = dir.join("event_log.jsonl");
1137 let sink = JsonlEventSink::open(&path).unwrap();
1138 for i in 0..5 {
1139 sink.handle_event(&AgentEvent::TurnStart {
1140 session_id: "s".into(),
1141 iteration: i,
1142 });
1143 }
1144 assert_eq!(sink.event_count(), 5);
1145 sink.flush().unwrap();
1146
1147 let file = std::fs::File::open(&path).unwrap();
1149 let mut last_idx: i64 = -1;
1150 let mut last_ts: i64 = 0;
1151 for line in BufReader::new(file).lines() {
1152 let line = line.unwrap();
1153 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1154 let idx = val["index"].as_i64().unwrap();
1155 let ts = val["emitted_at_ms"].as_i64().unwrap();
1156 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1157 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1158 last_idx = idx;
1159 last_ts = ts;
1160 assert_eq!(val["type"], "turn_start");
1162 }
1163 assert_eq!(last_idx, 4);
1164 let _ = std::fs::remove_file(&path);
1165 }
1166
1167 #[test]
1168 fn judge_decision_round_trips_through_jsonl_sink() {
1169 use std::io::{BufRead, BufReader};
1170 let dir =
1171 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1172 std::fs::create_dir_all(&dir).unwrap();
1173 let path = dir.join("event_log.jsonl");
1174 let sink = JsonlEventSink::open(&path).unwrap();
1175 sink.handle_event(&AgentEvent::JudgeDecision {
1176 session_id: "s".into(),
1177 iteration: 2,
1178 verdict: "continue".into(),
1179 reasoning: "needs a concrete next step".into(),
1180 next_step: Some("run the verifier".into()),
1181 judge_duration_ms: 17,
1182 });
1183 sink.flush().unwrap();
1184
1185 let file = std::fs::File::open(&path).unwrap();
1186 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1187 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1188 match recovered.event {
1189 AgentEvent::JudgeDecision {
1190 session_id,
1191 iteration,
1192 verdict,
1193 reasoning,
1194 next_step,
1195 judge_duration_ms,
1196 } => {
1197 assert_eq!(session_id, "s");
1198 assert_eq!(iteration, 2);
1199 assert_eq!(verdict, "continue");
1200 assert_eq!(reasoning, "needs a concrete next step");
1201 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1202 assert_eq!(judge_duration_ms, 17);
1203 }
1204 other => panic!("expected JudgeDecision, got {other:?}"),
1205 }
1206 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1207 assert_eq!(value["type"], "judge_decision");
1208 let _ = std::fs::remove_file(&path);
1209 let _ = std::fs::remove_dir(&dir);
1210 }
1211
1212 #[test]
1213 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1214 let terminal = AgentEvent::ToolCallUpdate {
1219 session_id: "s".into(),
1220 tool_call_id: "tc-1".into(),
1221 tool_name: "read".into(),
1222 status: ToolCallStatus::Completed,
1223 raw_output: None,
1224 error: None,
1225 duration_ms: Some(42),
1226 execution_duration_ms: Some(7),
1227 error_category: None,
1228 executor: None,
1229 parsing: None,
1230
1231 raw_input: None,
1232 raw_input_partial: None,
1233 audit: None,
1234 };
1235 let value = serde_json::to_value(&terminal).unwrap();
1236 assert_eq!(value["duration_ms"], serde_json::json!(42));
1237 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1238
1239 let intermediate = AgentEvent::ToolCallUpdate {
1243 session_id: "s".into(),
1244 tool_call_id: "tc-1".into(),
1245 tool_name: "read".into(),
1246 status: ToolCallStatus::InProgress,
1247 raw_output: None,
1248 error: None,
1249 duration_ms: None,
1250 execution_duration_ms: None,
1251 error_category: None,
1252 executor: None,
1253 parsing: None,
1254
1255 raw_input: None,
1256 raw_input_partial: None,
1257 audit: None,
1258 };
1259 let value = serde_json::to_value(&intermediate).unwrap();
1260 let object = value.as_object().expect("update serializes as object");
1261 assert!(
1262 !object.contains_key("duration_ms"),
1263 "duration_ms must be omitted when None: {value}"
1264 );
1265 assert!(
1266 !object.contains_key("execution_duration_ms"),
1267 "execution_duration_ms must be omitted when None: {value}"
1268 );
1269 }
1270
1271 #[test]
1272 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1273 let raw = serde_json::json!({
1276 "type": "tool_call_update",
1277 "session_id": "s",
1278 "tool_call_id": "tc-1",
1279 "tool_name": "read",
1280 "status": "completed",
1281 "raw_output": null,
1282 "error": null,
1283 });
1284 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1285 match event {
1286 AgentEvent::ToolCallUpdate {
1287 duration_ms,
1288 execution_duration_ms,
1289 ..
1290 } => {
1291 assert!(duration_ms.is_none());
1292 assert!(execution_duration_ms.is_none());
1293 }
1294 other => panic!("expected ToolCallUpdate, got {other:?}"),
1295 }
1296 }
1297
1298 #[test]
1299 fn tool_call_status_serde() {
1300 assert_eq!(
1301 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1302 "\"pending\""
1303 );
1304 assert_eq!(
1305 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1306 "\"in_progress\""
1307 );
1308 assert_eq!(
1309 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1310 "\"completed\""
1311 );
1312 assert_eq!(
1313 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1314 "\"failed\""
1315 );
1316 }
1317
1318 #[test]
1319 fn tool_call_error_category_serializes_as_snake_case() {
1320 let pairs = [
1321 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1322 (ToolCallErrorCategory::ToolError, "tool_error"),
1323 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1324 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1325 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1326 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1327 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1328 (ToolCallErrorCategory::Timeout, "timeout"),
1329 (ToolCallErrorCategory::Network, "network"),
1330 (ToolCallErrorCategory::Cancelled, "cancelled"),
1331 (ToolCallErrorCategory::Unknown, "unknown"),
1332 ];
1333 for (variant, wire) in pairs {
1334 let encoded = serde_json::to_string(&variant).unwrap();
1335 assert_eq!(encoded, format!("\"{wire}\""));
1336 assert_eq!(variant.as_str(), wire);
1337 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1340 assert_eq!(decoded, variant);
1341 }
1342 }
1343
1344 #[test]
1345 fn tool_executor_round_trips_with_adjacent_tag() {
1346 for executor in [
1351 ToolExecutor::HarnBuiltin,
1352 ToolExecutor::HostBridge,
1353 ToolExecutor::McpServer {
1354 server_name: "linear".to_string(),
1355 },
1356 ToolExecutor::ProviderNative,
1357 ] {
1358 let json = serde_json::to_value(&executor).unwrap();
1359 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1360 match &executor {
1361 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1362 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1363 ToolExecutor::McpServer { server_name } => {
1364 assert_eq!(kind, "mcp_server");
1365 assert_eq!(json["server_name"], *server_name);
1366 }
1367 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1368 }
1369 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1370 assert_eq!(recovered, executor);
1371 }
1372 }
1373
1374 #[test]
1375 fn tool_call_error_category_from_internal_collapses_transient_family() {
1376 use crate::value::ErrorCategory as Internal;
1377 assert_eq!(
1378 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1379 ToolCallErrorCategory::Timeout
1380 );
1381 for net in [
1382 Internal::RateLimit,
1383 Internal::Overloaded,
1384 Internal::ServerError,
1385 Internal::TransientNetwork,
1386 ] {
1387 assert_eq!(
1388 ToolCallErrorCategory::from_internal(&net),
1389 ToolCallErrorCategory::Network,
1390 "{net:?} should map to Network",
1391 );
1392 }
1393 assert_eq!(
1394 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1395 ToolCallErrorCategory::SchemaValidation
1396 );
1397 assert_eq!(
1398 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1399 ToolCallErrorCategory::ToolError
1400 );
1401 assert_eq!(
1402 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1403 ToolCallErrorCategory::PermissionDenied
1404 );
1405 assert_eq!(
1406 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1407 ToolCallErrorCategory::Cancelled
1408 );
1409 for bridge in [
1410 Internal::Auth,
1411 Internal::EgressBlocked,
1412 Internal::NotFound,
1413 Internal::CircuitOpen,
1414 Internal::Generic,
1415 ] {
1416 assert_eq!(
1417 ToolCallErrorCategory::from_internal(&bridge),
1418 ToolCallErrorCategory::HostBridgeError,
1419 "{bridge:?} should map to HostBridgeError",
1420 );
1421 }
1422 }
1423
1424 #[test]
1425 fn tool_call_update_event_omits_error_category_when_none() {
1426 let event = AgentEvent::ToolCallUpdate {
1427 session_id: "s".into(),
1428 tool_call_id: "t".into(),
1429 tool_name: "read".into(),
1430 status: ToolCallStatus::Completed,
1431 raw_output: None,
1432 error: None,
1433 duration_ms: None,
1434 execution_duration_ms: None,
1435 error_category: None,
1436 executor: None,
1437 parsing: None,
1438
1439 raw_input: None,
1440 raw_input_partial: None,
1441 audit: None,
1442 };
1443 let v = serde_json::to_value(&event).unwrap();
1444 assert_eq!(v["type"], "tool_call_update");
1445 assert!(v.get("error_category").is_none());
1446 }
1447
1448 #[test]
1449 fn tool_call_update_event_serializes_error_category_when_set() {
1450 let event = AgentEvent::ToolCallUpdate {
1451 session_id: "s".into(),
1452 tool_call_id: "t".into(),
1453 tool_name: "read".into(),
1454 status: ToolCallStatus::Failed,
1455 raw_output: None,
1456 error: Some("missing required field".into()),
1457 duration_ms: None,
1458 execution_duration_ms: None,
1459 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1460 executor: None,
1461 parsing: None,
1462
1463 raw_input: None,
1464 raw_input_partial: None,
1465 audit: None,
1466 };
1467 let v = serde_json::to_value(&event).unwrap();
1468 assert_eq!(v["error_category"], "schema_validation");
1469 assert_eq!(v["error"], "missing required field");
1470 }
1471
1472 #[test]
1473 fn tool_call_update_omits_executor_when_absent() {
1474 let event = AgentEvent::ToolCallUpdate {
1478 session_id: "s".into(),
1479 tool_call_id: "tc-1".into(),
1480 tool_name: "read".into(),
1481 status: ToolCallStatus::Completed,
1482 raw_output: None,
1483 error: None,
1484 duration_ms: None,
1485 execution_duration_ms: None,
1486 error_category: None,
1487 executor: None,
1488 parsing: None,
1489
1490 raw_input: None,
1491 raw_input_partial: None,
1492 audit: None,
1493 };
1494 let json = serde_json::to_value(&event).unwrap();
1495 assert!(json.get("executor").is_none(), "got: {json}");
1496 }
1497
1498 #[test]
1499 fn worker_event_status_strings_cover_all_variants() {
1500 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1505 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1506 assert_eq!(
1507 WorkerEvent::WorkerWaitingForInput.as_status(),
1508 "awaiting_input"
1509 );
1510 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1511 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1512 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1513
1514 for terminal in [
1515 WorkerEvent::WorkerCompleted,
1516 WorkerEvent::WorkerFailed,
1517 WorkerEvent::WorkerCancelled,
1518 ] {
1519 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1520 }
1521 for non_terminal in [
1522 WorkerEvent::WorkerSpawned,
1523 WorkerEvent::WorkerProgressed,
1524 WorkerEvent::WorkerWaitingForInput,
1525 ] {
1526 assert!(
1527 !non_terminal.is_terminal(),
1528 "{non_terminal:?} should not be terminal"
1529 );
1530 }
1531 }
1532
1533 #[test]
1534 fn worker_update_event_routes_through_session_keyed_sink() {
1535 reset_all_sinks();
1540 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1541 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1542 impl AgentEventSink for CapturingSink {
1543 fn handle_event(&self, event: &AgentEvent) {
1544 self.0
1545 .lock()
1546 .expect("captured sink mutex poisoned")
1547 .push(event.clone());
1548 }
1549 }
1550 register_sink(
1551 "worker-session-1",
1552 Arc::new(CapturingSink(captured.clone())),
1553 );
1554 emit_event(&AgentEvent::WorkerUpdate {
1555 session_id: "worker-session-1".into(),
1556 worker_id: "worker_42".into(),
1557 worker_name: "review_captain".into(),
1558 worker_task: "review pr".into(),
1559 worker_mode: "delegated_stage".into(),
1560 event: WorkerEvent::WorkerWaitingForInput,
1561 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1562 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1563 audit: None,
1564 });
1565 emit_event(&AgentEvent::WorkerUpdate {
1567 session_id: "other-session".into(),
1568 worker_id: "w2".into(),
1569 worker_name: "n2".into(),
1570 worker_task: "t2".into(),
1571 worker_mode: "delegated_stage".into(),
1572 event: WorkerEvent::WorkerCompleted,
1573 status: "completed".into(),
1574 metadata: serde_json::json!({}),
1575 audit: None,
1576 });
1577 let received = captured.lock().unwrap().clone();
1578 assert_eq!(received.len(), 1, "got: {received:?}");
1579 match &received[0] {
1580 AgentEvent::WorkerUpdate {
1581 session_id,
1582 worker_id,
1583 event,
1584 status,
1585 ..
1586 } => {
1587 assert_eq!(session_id, "worker-session-1");
1588 assert_eq!(worker_id, "worker_42");
1589 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1590 assert_eq!(status, "awaiting_input");
1591 }
1592 other => panic!("expected WorkerUpdate, got {other:?}"),
1593 }
1594 reset_all_sinks();
1595 }
1596
1597 #[test]
1598 fn worker_update_event_serializes_to_canonical_shape() {
1599 let event = AgentEvent::WorkerUpdate {
1605 session_id: "s".into(),
1606 worker_id: "w".into(),
1607 worker_name: "n".into(),
1608 worker_task: "t".into(),
1609 worker_mode: "delegated_stage".into(),
1610 event: WorkerEvent::WorkerProgressed,
1611 status: "progressed".into(),
1612 metadata: serde_json::json!({"started_at": "0193..."}),
1613 audit: Some(serde_json::json!({"run_id": "run_x"})),
1614 };
1615 let value = serde_json::to_value(&event).unwrap();
1616 assert_eq!(value["type"], "worker_update");
1617 assert_eq!(value["session_id"], "s");
1618 assert_eq!(value["worker_id"], "w");
1619 assert_eq!(value["status"], "progressed");
1620 assert_eq!(value["audit"]["run_id"], "run_x");
1621
1622 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1626 match recovered {
1627 AgentEvent::WorkerUpdate {
1628 event: recovered_event,
1629 ..
1630 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1631 other => panic!("expected WorkerUpdate, got {other:?}"),
1632 }
1633 }
1634
1635 #[test]
1636 fn tool_call_update_includes_executor_when_present() {
1637 let event = AgentEvent::ToolCallUpdate {
1638 session_id: "s".into(),
1639 tool_call_id: "tc-1".into(),
1640 tool_name: "read".into(),
1641 status: ToolCallStatus::Completed,
1642 raw_output: None,
1643 error: None,
1644 duration_ms: None,
1645 execution_duration_ms: None,
1646 error_category: None,
1647 executor: Some(ToolExecutor::McpServer {
1648 server_name: "github".into(),
1649 }),
1650 parsing: None,
1651
1652 raw_input: None,
1653 raw_input_partial: None,
1654 audit: None,
1655 };
1656 let json = serde_json::to_value(&event).unwrap();
1657 assert_eq!(json["executor"]["kind"], "mcp_server");
1658 assert_eq!(json["executor"]["server_name"], "github");
1659 }
1660
1661 #[test]
1662 fn tool_call_update_omits_audit_when_absent() {
1663 let event = AgentEvent::ToolCallUpdate {
1664 session_id: "s".into(),
1665 tool_call_id: "tc-1".into(),
1666 tool_name: "read".into(),
1667 status: ToolCallStatus::Completed,
1668 raw_output: None,
1669 error: None,
1670 duration_ms: None,
1671 execution_duration_ms: None,
1672 error_category: None,
1673 executor: None,
1674 parsing: None,
1675 raw_input: None,
1676 raw_input_partial: None,
1677 audit: None,
1678 };
1679 let json = serde_json::to_value(&event).unwrap();
1680 assert!(json.get("audit").is_none(), "got: {json}");
1681 }
1682
1683 #[test]
1684 fn tool_call_update_includes_audit_when_present() {
1685 let audit = MutationSessionRecord {
1686 session_id: "session_42".into(),
1687 run_id: Some("run_42".into()),
1688 mutation_scope: "apply_workspace".into(),
1689 execution_kind: Some("worker".into()),
1690 ..Default::default()
1691 };
1692 let event = AgentEvent::ToolCallUpdate {
1693 session_id: "s".into(),
1694 tool_call_id: "tc-1".into(),
1695 tool_name: "edit_file".into(),
1696 status: ToolCallStatus::Completed,
1697 raw_output: None,
1698 error: None,
1699 duration_ms: None,
1700 execution_duration_ms: None,
1701 error_category: None,
1702 executor: Some(ToolExecutor::HostBridge),
1703 parsing: None,
1704 raw_input: None,
1705 raw_input_partial: None,
1706 audit: Some(audit),
1707 };
1708 let json = serde_json::to_value(&event).unwrap();
1709 assert_eq!(json["audit"]["session_id"], "session_42");
1710 assert_eq!(json["audit"]["run_id"], "run_42");
1711 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1712 assert_eq!(json["audit"]["execution_kind"], "worker");
1713 }
1714
1715 #[test]
1716 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1717 let raw = serde_json::json!({
1718 "type": "tool_call_update",
1719 "session_id": "s",
1720 "tool_call_id": "tc-1",
1721 "tool_name": "read",
1722 "status": "completed",
1723 "raw_output": null,
1724 "error": null,
1725 });
1726 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1727 match event {
1728 AgentEvent::ToolCallUpdate { audit, .. } => {
1729 assert!(audit.is_none());
1730 }
1731 other => panic!("expected ToolCallUpdate, got {other:?}"),
1732 }
1733 }
1734
1735 #[test]
1736 fn tool_call_audit_serializes_with_free_form_audit_payload() {
1737 let audit = serde_json::json!({
1742 "summary": "Searched codebase",
1743 "kind": "search",
1744 "consent": {"decision": "approved", "decided_by": "auto"},
1745 "layers": [{"name": "with_required_reason", "status": "ok"}],
1746 });
1747 let event = AgentEvent::ToolCallAudit {
1748 session_id: "s".into(),
1749 tool_call_id: "tc-1".into(),
1750 tool_name: "search_files".into(),
1751 audit: audit.clone(),
1752 };
1753 let json = serde_json::to_value(&event).unwrap();
1754 assert_eq!(json["type"], "tool_call_audit");
1755 assert_eq!(json["session_id"], "s");
1756 assert_eq!(json["tool_call_id"], "tc-1");
1757 assert_eq!(json["tool_name"], "search_files");
1758 assert_eq!(json["audit"], audit);
1759 }
1760
1761 #[test]
1762 fn tool_call_audit_session_id_routes_correctly() {
1763 let event = AgentEvent::ToolCallAudit {
1764 session_id: "abc".into(),
1765 tool_call_id: "tc".into(),
1766 tool_name: "read".into(),
1767 audit: serde_json::Value::Null,
1768 };
1769 assert_eq!(event.session_id(), "abc");
1770 }
1771}