1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117impl ToolCallStatus {
118 pub const ALL: [Self; 4] = [
119 Self::Pending,
120 Self::InProgress,
121 Self::Completed,
122 Self::Failed,
123 ];
124
125 pub fn as_str(self) -> &'static str {
126 match self {
127 Self::Pending => "pending",
128 Self::InProgress => "in_progress",
129 Self::Completed => "completed",
130 Self::Failed => "failed",
131 }
132 }
133}
134
135#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144 SchemaValidation,
147 ToolError,
150 McpServerError,
152 HostBridgeError,
154 PermissionDenied,
157 RejectedLoop,
160 ParseAborted,
168 Timeout,
170 Network,
172 Cancelled,
174 Unknown,
176}
177
178impl ToolCallErrorCategory {
179 pub const ALL: [Self; 11] = [
180 Self::SchemaValidation,
181 Self::ToolError,
182 Self::McpServerError,
183 Self::HostBridgeError,
184 Self::PermissionDenied,
185 Self::RejectedLoop,
186 Self::ParseAborted,
187 Self::Timeout,
188 Self::Network,
189 Self::Cancelled,
190 Self::Unknown,
191 ];
192
193 pub fn as_str(self) -> &'static str {
194 match self {
195 Self::SchemaValidation => "schema_validation",
196 Self::ToolError => "tool_error",
197 Self::McpServerError => "mcp_server_error",
198 Self::HostBridgeError => "host_bridge_error",
199 Self::PermissionDenied => "permission_denied",
200 Self::RejectedLoop => "rejected_loop",
201 Self::ParseAborted => "parse_aborted",
202 Self::Timeout => "timeout",
203 Self::Network => "network",
204 Self::Cancelled => "cancelled",
205 Self::Unknown => "unknown",
206 }
207 }
208
209 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216 use crate::value::ErrorCategory as Internal;
217 match category {
218 Internal::Timeout => Self::Timeout,
219 Internal::RateLimit
220 | Internal::Overloaded
221 | Internal::ServerError
222 | Internal::TransientNetwork => Self::Network,
223 Internal::SchemaValidation => Self::SchemaValidation,
224 Internal::ToolError => Self::ToolError,
225 Internal::ToolRejected => Self::PermissionDenied,
226 Internal::Cancelled => Self::Cancelled,
227 Internal::Auth
228 | Internal::EgressBlocked
229 | Internal::NotFound
230 | Internal::CircuitOpen
231 | Internal::BudgetExceeded
232 | Internal::Generic => Self::HostBridgeError,
233 }
234 }
235}
236
237#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250 HarnBuiltin,
253 HostBridge,
256 McpServer { server_name: String },
260 ProviderNative,
265}
266
267#[derive(Clone, Debug, Serialize, Deserialize)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum AgentEvent {
272 AgentMessageChunk {
273 session_id: String,
274 content: String,
275 },
276 AgentThoughtChunk {
277 session_id: String,
278 content: String,
279 },
280 ToolCall {
281 session_id: String,
282 tool_call_id: String,
283 tool_name: String,
284 kind: Option<ToolKind>,
285 status: ToolCallStatus,
286 raw_input: serde_json::Value,
287 #[serde(default, skip_serializing_if = "Option::is_none")]
300 parsing: Option<bool>,
301 #[serde(default, skip_serializing_if = "Option::is_none")]
305 audit: Option<MutationSessionRecord>,
306 },
307 ToolCallUpdate {
308 session_id: String,
309 tool_call_id: String,
310 tool_name: String,
311 status: ToolCallStatus,
312 raw_output: Option<serde_json::Value>,
313 error: Option<String>,
314 #[serde(default, skip_serializing_if = "Option::is_none")]
322 duration_ms: Option<u64>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
327 execution_duration_ms: Option<u64>,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
334 error_category: Option<ToolCallErrorCategory>,
335 #[serde(default, skip_serializing_if = "Option::is_none")]
340 executor: Option<ToolExecutor>,
341 #[serde(default, skip_serializing_if = "Option::is_none")]
350 parsing: Option<bool>,
351 #[serde(default, skip_serializing_if = "Option::is_none")]
359 raw_input: Option<serde_json::Value>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
364 raw_input_partial: Option<String>,
365 #[serde(default, skip_serializing_if = "Option::is_none")]
370 audit: Option<MutationSessionRecord>,
371 },
372 Plan {
373 session_id: String,
374 plan: serde_json::Value,
375 },
376 TurnStart {
377 session_id: String,
378 iteration: usize,
379 },
380 TurnEnd {
381 session_id: String,
382 iteration: usize,
383 turn_info: serde_json::Value,
384 },
385 JudgeDecision {
386 session_id: String,
387 iteration: usize,
388 verdict: String,
389 reasoning: String,
390 next_step: Option<String>,
391 judge_duration_ms: u64,
392 },
393 TypedCheckpoint {
394 session_id: String,
395 checkpoint: serde_json::Value,
396 },
397 FeedbackInjected {
398 session_id: String,
399 kind: String,
400 content: String,
401 },
402 BudgetExhausted {
406 session_id: String,
407 max_iterations: usize,
408 },
409 LoopStuck {
413 session_id: String,
414 max_nudges: usize,
415 last_iteration: usize,
416 tail_excerpt: String,
417 },
418 DaemonWatchdogTripped {
423 session_id: String,
424 attempts: usize,
425 elapsed_ms: u64,
426 },
427 SkillActivated {
431 session_id: String,
432 skill_name: String,
433 iteration: usize,
434 reason: String,
435 },
436 SkillDeactivated {
439 session_id: String,
440 skill_name: String,
441 iteration: usize,
442 },
443 SkillScopeTools {
446 session_id: String,
447 skill_name: String,
448 allowed_tools: Vec<String>,
449 },
450 ToolSearchQuery {
458 session_id: String,
459 tool_use_id: String,
460 name: String,
461 query: serde_json::Value,
462 strategy: String,
463 mode: String,
464 },
465 ToolSearchResult {
469 session_id: String,
470 tool_use_id: String,
471 promoted: Vec<String>,
472 strategy: String,
473 mode: String,
474 },
475 TranscriptCompacted {
476 session_id: String,
477 mode: String,
478 strategy: String,
479 archived_messages: usize,
480 estimated_tokens_before: usize,
481 estimated_tokens_after: usize,
482 snapshot_asset_id: Option<String>,
483 },
484 Handoff {
485 session_id: String,
486 artifact_id: String,
487 handoff: Box<HandoffArtifact>,
488 },
489 FsWatch {
490 session_id: String,
491 subscription_id: String,
492 events: Vec<FsWatchEvent>,
493 },
494 WorkerUpdate {
510 session_id: String,
511 worker_id: String,
512 worker_name: String,
513 worker_task: String,
514 worker_mode: String,
515 event: WorkerEvent,
516 status: String,
517 metadata: serde_json::Value,
518 audit: Option<serde_json::Value>,
519 },
520 HitlRequested {
528 session_id: String,
529 request_id: String,
530 kind: String,
531 payload: serde_json::Value,
532 },
533 HitlResolved {
539 session_id: String,
540 request_id: String,
541 kind: String,
542 outcome: String,
543 },
544 LoopControlDecision {
551 session_id: String,
552 iteration: usize,
553 action: String,
554 old_limit: usize,
555 new_limit: usize,
556 reason: String,
557 status: String,
558 },
559 AgentLoopStallWarning {
564 session_id: String,
565 warning: serde_json::Value,
566 },
567 ToolCallAudit {
580 session_id: String,
581 tool_call_id: String,
582 tool_name: String,
583 audit: serde_json::Value,
584 },
585 CacheHit {
593 session_id: String,
594 key: String,
595 backend: String,
596 namespace: String,
597 payload: serde_json::Value,
598 },
599 CacheMiss {
605 session_id: String,
606 key: String,
607 backend: String,
608 namespace: String,
609 payload: serde_json::Value,
610 },
611}
612
613impl AgentEvent {
614 pub fn session_id(&self) -> &str {
615 match self {
616 Self::AgentMessageChunk { session_id, .. }
617 | Self::AgentThoughtChunk { session_id, .. }
618 | Self::ToolCall { session_id, .. }
619 | Self::ToolCallUpdate { session_id, .. }
620 | Self::Plan { session_id, .. }
621 | Self::TurnStart { session_id, .. }
622 | Self::TurnEnd { session_id, .. }
623 | Self::JudgeDecision { session_id, .. }
624 | Self::TypedCheckpoint { session_id, .. }
625 | Self::FeedbackInjected { session_id, .. }
626 | Self::BudgetExhausted { session_id, .. }
627 | Self::LoopStuck { session_id, .. }
628 | Self::DaemonWatchdogTripped { session_id, .. }
629 | Self::SkillActivated { session_id, .. }
630 | Self::SkillDeactivated { session_id, .. }
631 | Self::SkillScopeTools { session_id, .. }
632 | Self::ToolSearchQuery { session_id, .. }
633 | Self::ToolSearchResult { session_id, .. }
634 | Self::TranscriptCompacted { session_id, .. }
635 | Self::Handoff { session_id, .. }
636 | Self::FsWatch { session_id, .. }
637 | Self::WorkerUpdate { session_id, .. }
638 | Self::HitlRequested { session_id, .. }
639 | Self::HitlResolved { session_id, .. }
640 | Self::LoopControlDecision { session_id, .. }
641 | Self::AgentLoopStallWarning { session_id, .. }
642 | Self::ToolCallAudit { session_id, .. }
643 | Self::CacheHit { session_id, .. }
644 | Self::CacheMiss { session_id, .. } => session_id,
645 }
646 }
647}
648
649pub trait AgentEventSink: Send + Sync {
652 fn handle_event(&self, event: &AgentEvent);
653}
654
655#[derive(Clone, Debug, Serialize, Deserialize)]
662pub struct PersistedAgentEvent {
663 pub index: u64,
667 pub emitted_at_ms: i64,
671 pub frame_depth: Option<u32>,
675 #[serde(flatten)]
677 pub event: AgentEvent,
678}
679
680pub struct JsonlEventSink {
685 state: Mutex<JsonlEventSinkState>,
686 base_path: std::path::PathBuf,
687}
688
689struct JsonlEventSinkState {
690 writer: std::io::BufWriter<std::fs::File>,
691 index: u64,
692 bytes_written: u64,
693 rotation: u32,
694}
695
696impl JsonlEventSink {
697 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
701
702 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
706 let base_path = base_path.into();
707 if let Some(parent) = base_path.parent() {
708 std::fs::create_dir_all(parent)?;
709 }
710 let file = std::fs::OpenOptions::new()
711 .create(true)
712 .truncate(true)
713 .write(true)
714 .open(&base_path)?;
715 Ok(Arc::new(Self {
716 state: Mutex::new(JsonlEventSinkState {
717 writer: std::io::BufWriter::new(file),
718 index: 0,
719 bytes_written: 0,
720 rotation: 0,
721 }),
722 base_path,
723 }))
724 }
725
726 pub fn flush(&self) -> std::io::Result<()> {
729 use std::io::Write as _;
730 self.state
731 .lock()
732 .expect("jsonl sink mutex poisoned")
733 .writer
734 .flush()
735 }
736
737 pub fn event_count(&self) -> u64 {
740 self.state.lock().expect("jsonl sink mutex poisoned").index
741 }
742
743 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
744 use std::io::Write as _;
745 if state.bytes_written < Self::ROTATE_BYTES {
746 return Ok(());
747 }
748 state.writer.flush()?;
749 state.rotation += 1;
750 let suffix = format!("-{:06}", state.rotation);
751 let rotated = self.base_path.with_file_name({
752 let stem = self
753 .base_path
754 .file_stem()
755 .and_then(|s| s.to_str())
756 .unwrap_or("event_log");
757 let ext = self
758 .base_path
759 .extension()
760 .and_then(|e| e.to_str())
761 .unwrap_or("jsonl");
762 format!("{stem}{suffix}.{ext}")
763 });
764 let file = std::fs::OpenOptions::new()
765 .create(true)
766 .truncate(true)
767 .write(true)
768 .open(&rotated)?;
769 state.writer = std::io::BufWriter::new(file);
770 state.bytes_written = 0;
771 Ok(())
772 }
773}
774
775pub struct EventLogSink {
780 log: Arc<AnyEventLog>,
781 topic: Topic,
782 session_id: String,
783}
784
785impl EventLogSink {
786 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
787 let session_id = session_id.into();
788 let topic = Topic::new(format!(
789 "observability.agent_events.{}",
790 crate::event_log::sanitize_topic_component(&session_id)
791 ))
792 .expect("session id should sanitize to a valid topic");
793 Arc::new(Self {
794 log,
795 topic,
796 session_id,
797 })
798 }
799}
800
801impl AgentEventSink for JsonlEventSink {
802 fn handle_event(&self, event: &AgentEvent) {
803 use std::io::Write as _;
804 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
805 let index = state.index;
806 state.index += 1;
807 let emitted_at_ms = std::time::SystemTime::now()
808 .duration_since(std::time::UNIX_EPOCH)
809 .map(|d| d.as_millis() as i64)
810 .unwrap_or(0);
811 let envelope = PersistedAgentEvent {
812 index,
813 emitted_at_ms,
814 frame_depth: None,
815 event: event.clone(),
816 };
817 if let Ok(line) = serde_json::to_string(&envelope) {
818 let _ = state.writer.write_all(line.as_bytes());
823 let _ = state.writer.write_all(b"\n");
824 state.bytes_written += line.len() as u64 + 1;
825 let _ = self.rotate_if_needed(&mut state);
826 }
827 }
828}
829
830impl AgentEventSink for EventLogSink {
831 fn handle_event(&self, event: &AgentEvent) {
832 let event_json = match serde_json::to_value(event) {
833 Ok(value) => value,
834 Err(_) => return,
835 };
836 let event_kind = event_json
837 .get("type")
838 .and_then(|value| value.as_str())
839 .unwrap_or("agent_event")
840 .to_string();
841 let payload = serde_json::json!({
842 "index_hint": now_ms(),
843 "session_id": self.session_id,
844 "event": event_json,
845 });
846 let mut headers = std::collections::BTreeMap::new();
847 headers.insert("session_id".to_string(), self.session_id.clone());
848 let log = self.log.clone();
849 let topic = self.topic.clone();
850 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
851 if let Ok(handle) = tokio::runtime::Handle::try_current() {
852 handle.spawn(async move {
853 let _ = log.append(&topic, record).await;
854 });
855 } else {
856 let _ = futures::executor::block_on(log.append(&topic, record));
857 }
858 }
859}
860
861impl Drop for JsonlEventSink {
862 fn drop(&mut self) {
863 if let Ok(mut state) = self.state.lock() {
864 use std::io::Write as _;
865 let _ = state.writer.flush();
866 }
867 }
868}
869
870pub struct MultiSink {
872 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
873}
874
875impl MultiSink {
876 pub fn new() -> Self {
877 Self {
878 sinks: Mutex::new(Vec::new()),
879 }
880 }
881 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
882 self.sinks.lock().expect("sink mutex poisoned").push(sink);
883 }
884 pub fn len(&self) -> usize {
885 self.sinks.lock().expect("sink mutex poisoned").len()
886 }
887 pub fn is_empty(&self) -> bool {
888 self.len() == 0
889 }
890}
891
892impl Default for MultiSink {
893 fn default() -> Self {
894 Self::new()
895 }
896}
897
898impl AgentEventSink for MultiSink {
899 fn handle_event(&self, event: &AgentEvent) {
900 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
906 for sink in sinks {
907 sink.handle_event(event);
908 }
909 }
910}
911
912#[cfg(test)]
913#[derive(Clone)]
914struct RegisteredSink {
915 owner: std::thread::ThreadId,
916 sink: Arc<dyn AgentEventSink>,
917}
918
919#[cfg(not(test))]
920type RegisteredSink = Arc<dyn AgentEventSink>;
921
922type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
923
924fn external_sinks() -> &'static ExternalSinkRegistry {
925 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
926 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
927}
928
929pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
930 let session_id = session_id.into();
931 let mut reg = external_sinks().write().expect("sink registry poisoned");
932 #[cfg(test)]
933 let sink = RegisteredSink {
934 owner: std::thread::current().id(),
935 sink,
936 };
937 reg.entry(session_id).or_default().push(sink);
938}
939
940pub fn clear_session_sinks(session_id: &str) {
944 #[cfg(test)]
945 {
946 let owner = std::thread::current().id();
947 let mut reg = external_sinks().write().expect("sink registry poisoned");
948 if let Some(sinks) = reg.get_mut(session_id) {
949 sinks.retain(|sink| sink.owner != owner);
950 if sinks.is_empty() {
951 reg.remove(session_id);
952 }
953 }
954 }
955 #[cfg(not(test))]
956 {
957 external_sinks()
958 .write()
959 .expect("sink registry poisoned")
960 .remove(session_id);
961 }
962}
963
964pub fn reset_all_sinks() {
965 #[cfg(test)]
966 {
967 let owner = std::thread::current().id();
968 let mut reg = external_sinks().write().expect("sink registry poisoned");
969 reg.retain(|_, sinks| {
970 sinks.retain(|sink| sink.owner != owner);
971 !sinks.is_empty()
972 });
973 crate::agent_sessions::reset_session_store();
974 }
975 #[cfg(not(test))]
976 {
977 external_sinks()
978 .write()
979 .expect("sink registry poisoned")
980 .clear();
981 crate::agent_sessions::reset_session_store();
982 }
983}
984
985pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
992 if source_session_id.is_empty() || target_session_id.is_empty() {
993 return;
994 }
995 if source_session_id == target_session_id {
996 return;
997 }
998 let mut reg = external_sinks().write().expect("sink registry poisoned");
999 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1000 return;
1001 };
1002 let target = reg.entry(target_session_id.to_string()).or_default();
1003 #[cfg(test)]
1004 {
1005 for source in source_sinks {
1006 let already_present = target
1007 .iter()
1008 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1009 if !already_present {
1010 target.push(source);
1011 }
1012 }
1013 }
1014 #[cfg(not(test))]
1015 {
1016 for source in source_sinks {
1017 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1018 if !already_present {
1019 target.push(source);
1020 }
1021 }
1022 }
1023}
1024
1025pub fn emit_event(event: &AgentEvent) {
1029 let sinks: Vec<Arc<dyn AgentEventSink>> = {
1030 let reg = external_sinks().read().expect("sink registry poisoned");
1031 #[cfg(test)]
1032 {
1033 let owner = std::thread::current().id();
1034 reg.get(event.session_id())
1035 .map(|sinks| {
1036 sinks
1037 .iter()
1038 .filter(|sink| sink.owner == owner)
1039 .map(|sink| sink.sink.clone())
1040 .collect()
1041 })
1042 .unwrap_or_default()
1043 }
1044 #[cfg(not(test))]
1045 {
1046 reg.get(event.session_id()).cloned().unwrap_or_default()
1047 }
1048 };
1049 for sink in sinks {
1050 sink.handle_event(event);
1051 }
1052}
1053
1054fn now_ms() -> i64 {
1055 std::time::SystemTime::now()
1056 .duration_since(std::time::UNIX_EPOCH)
1057 .map(|duration| duration.as_millis() as i64)
1058 .unwrap_or(0)
1059}
1060
1061pub fn session_external_sink_count(session_id: &str) -> usize {
1062 #[cfg(test)]
1063 {
1064 let owner = std::thread::current().id();
1065 return external_sinks()
1066 .read()
1067 .expect("sink registry poisoned")
1068 .get(session_id)
1069 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1070 .unwrap_or(0);
1071 }
1072 #[cfg(not(test))]
1073 {
1074 external_sinks()
1075 .read()
1076 .expect("sink registry poisoned")
1077 .get(session_id)
1078 .map(|v| v.len())
1079 .unwrap_or(0)
1080 }
1081}
1082
1083pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1084 crate::agent_sessions::subscriber_count(session_id)
1085}
1086
1087#[cfg(test)]
1088mod tests {
1089 use super::*;
1090 use std::sync::atomic::{AtomicUsize, Ordering};
1091
1092 struct CountingSink(Arc<AtomicUsize>);
1093 impl AgentEventSink for CountingSink {
1094 fn handle_event(&self, _event: &AgentEvent) {
1095 self.0.fetch_add(1, Ordering::SeqCst);
1096 }
1097 }
1098
1099 #[test]
1100 fn multi_sink_fans_out_in_order() {
1101 let multi = MultiSink::new();
1102 let a = Arc::new(AtomicUsize::new(0));
1103 let b = Arc::new(AtomicUsize::new(0));
1104 multi.push(Arc::new(CountingSink(a.clone())));
1105 multi.push(Arc::new(CountingSink(b.clone())));
1106 let event = AgentEvent::TurnStart {
1107 session_id: "s1".into(),
1108 iteration: 1,
1109 };
1110 multi.handle_event(&event);
1111 assert_eq!(a.load(Ordering::SeqCst), 1);
1112 assert_eq!(b.load(Ordering::SeqCst), 1);
1113 }
1114
1115 #[test]
1116 fn session_scoped_sink_routing() {
1117 reset_all_sinks();
1118 let a = Arc::new(AtomicUsize::new(0));
1119 let b = Arc::new(AtomicUsize::new(0));
1120 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1121 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1122 emit_event(&AgentEvent::TurnStart {
1123 session_id: "session-a".into(),
1124 iteration: 0,
1125 });
1126 assert_eq!(a.load(Ordering::SeqCst), 1);
1127 assert_eq!(b.load(Ordering::SeqCst), 0);
1128 emit_event(&AgentEvent::TurnEnd {
1129 session_id: "session-b".into(),
1130 iteration: 0,
1131 turn_info: serde_json::json!({}),
1132 });
1133 assert_eq!(a.load(Ordering::SeqCst), 1);
1134 assert_eq!(b.load(Ordering::SeqCst), 1);
1135 clear_session_sinks("session-a");
1136 assert_eq!(session_external_sink_count("session-a"), 0);
1137 assert_eq!(session_external_sink_count("session-b"), 1);
1138 reset_all_sinks();
1139 }
1140
1141 #[test]
1142 fn newly_opened_child_session_inherits_current_external_sinks() {
1143 reset_all_sinks();
1144 let delivered = Arc::new(AtomicUsize::new(0));
1145 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1146 {
1147 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1148 let inner = crate::agent_sessions::open_or_create(None);
1149 assert_ne!(inner, "outer-session");
1150 emit_event(&AgentEvent::TurnStart {
1151 session_id: inner,
1152 iteration: 0,
1153 });
1154 }
1155 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1156 reset_all_sinks();
1157 }
1158
1159 #[test]
1160 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1161 use std::io::{BufRead, BufReader};
1162 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1163 std::fs::create_dir_all(&dir).unwrap();
1164 let path = dir.join("event_log.jsonl");
1165 let sink = JsonlEventSink::open(&path).unwrap();
1166 for i in 0..5 {
1167 sink.handle_event(&AgentEvent::TurnStart {
1168 session_id: "s".into(),
1169 iteration: i,
1170 });
1171 }
1172 assert_eq!(sink.event_count(), 5);
1173 sink.flush().unwrap();
1174
1175 let file = std::fs::File::open(&path).unwrap();
1177 let mut last_idx: i64 = -1;
1178 let mut last_ts: i64 = 0;
1179 for line in BufReader::new(file).lines() {
1180 let line = line.unwrap();
1181 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1182 let idx = val["index"].as_i64().unwrap();
1183 let ts = val["emitted_at_ms"].as_i64().unwrap();
1184 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1185 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1186 last_idx = idx;
1187 last_ts = ts;
1188 assert_eq!(val["type"], "turn_start");
1190 }
1191 assert_eq!(last_idx, 4);
1192 let _ = std::fs::remove_file(&path);
1193 }
1194
1195 #[test]
1196 fn judge_decision_round_trips_through_jsonl_sink() {
1197 use std::io::{BufRead, BufReader};
1198 let dir =
1199 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1200 std::fs::create_dir_all(&dir).unwrap();
1201 let path = dir.join("event_log.jsonl");
1202 let sink = JsonlEventSink::open(&path).unwrap();
1203 sink.handle_event(&AgentEvent::JudgeDecision {
1204 session_id: "s".into(),
1205 iteration: 2,
1206 verdict: "continue".into(),
1207 reasoning: "needs a concrete next step".into(),
1208 next_step: Some("run the verifier".into()),
1209 judge_duration_ms: 17,
1210 });
1211 sink.flush().unwrap();
1212
1213 let file = std::fs::File::open(&path).unwrap();
1214 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1215 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1216 match recovered.event {
1217 AgentEvent::JudgeDecision {
1218 session_id,
1219 iteration,
1220 verdict,
1221 reasoning,
1222 next_step,
1223 judge_duration_ms,
1224 } => {
1225 assert_eq!(session_id, "s");
1226 assert_eq!(iteration, 2);
1227 assert_eq!(verdict, "continue");
1228 assert_eq!(reasoning, "needs a concrete next step");
1229 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1230 assert_eq!(judge_duration_ms, 17);
1231 }
1232 other => panic!("expected JudgeDecision, got {other:?}"),
1233 }
1234 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1235 assert_eq!(value["type"], "judge_decision");
1236 let _ = std::fs::remove_file(&path);
1237 let _ = std::fs::remove_dir(&dir);
1238 }
1239
1240 #[test]
1241 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1242 let terminal = AgentEvent::ToolCallUpdate {
1247 session_id: "s".into(),
1248 tool_call_id: "tc-1".into(),
1249 tool_name: "read".into(),
1250 status: ToolCallStatus::Completed,
1251 raw_output: None,
1252 error: None,
1253 duration_ms: Some(42),
1254 execution_duration_ms: Some(7),
1255 error_category: None,
1256 executor: None,
1257 parsing: None,
1258
1259 raw_input: None,
1260 raw_input_partial: None,
1261 audit: None,
1262 };
1263 let value = serde_json::to_value(&terminal).unwrap();
1264 assert_eq!(value["duration_ms"], serde_json::json!(42));
1265 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1266
1267 let intermediate = AgentEvent::ToolCallUpdate {
1271 session_id: "s".into(),
1272 tool_call_id: "tc-1".into(),
1273 tool_name: "read".into(),
1274 status: ToolCallStatus::InProgress,
1275 raw_output: None,
1276 error: None,
1277 duration_ms: None,
1278 execution_duration_ms: None,
1279 error_category: None,
1280 executor: None,
1281 parsing: None,
1282
1283 raw_input: None,
1284 raw_input_partial: None,
1285 audit: None,
1286 };
1287 let value = serde_json::to_value(&intermediate).unwrap();
1288 let object = value.as_object().expect("update serializes as object");
1289 assert!(
1290 !object.contains_key("duration_ms"),
1291 "duration_ms must be omitted when None: {value}"
1292 );
1293 assert!(
1294 !object.contains_key("execution_duration_ms"),
1295 "execution_duration_ms must be omitted when None: {value}"
1296 );
1297 }
1298
1299 #[test]
1300 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1301 let raw = serde_json::json!({
1304 "type": "tool_call_update",
1305 "session_id": "s",
1306 "tool_call_id": "tc-1",
1307 "tool_name": "read",
1308 "status": "completed",
1309 "raw_output": null,
1310 "error": null,
1311 });
1312 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1313 match event {
1314 AgentEvent::ToolCallUpdate {
1315 duration_ms,
1316 execution_duration_ms,
1317 ..
1318 } => {
1319 assert!(duration_ms.is_none());
1320 assert!(execution_duration_ms.is_none());
1321 }
1322 other => panic!("expected ToolCallUpdate, got {other:?}"),
1323 }
1324 }
1325
1326 #[test]
1327 fn tool_call_status_serde() {
1328 assert_eq!(
1329 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1330 "\"pending\""
1331 );
1332 assert_eq!(
1333 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1334 "\"in_progress\""
1335 );
1336 assert_eq!(
1337 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1338 "\"completed\""
1339 );
1340 assert_eq!(
1341 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1342 "\"failed\""
1343 );
1344 }
1345
1346 #[test]
1347 fn tool_call_error_category_serializes_as_snake_case() {
1348 let pairs = [
1349 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1350 (ToolCallErrorCategory::ToolError, "tool_error"),
1351 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1352 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1353 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1354 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1355 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1356 (ToolCallErrorCategory::Timeout, "timeout"),
1357 (ToolCallErrorCategory::Network, "network"),
1358 (ToolCallErrorCategory::Cancelled, "cancelled"),
1359 (ToolCallErrorCategory::Unknown, "unknown"),
1360 ];
1361 for (variant, wire) in pairs {
1362 let encoded = serde_json::to_string(&variant).unwrap();
1363 assert_eq!(encoded, format!("\"{wire}\""));
1364 assert_eq!(variant.as_str(), wire);
1365 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1368 assert_eq!(decoded, variant);
1369 }
1370 }
1371
1372 #[test]
1373 fn tool_executor_round_trips_with_adjacent_tag() {
1374 for executor in [
1379 ToolExecutor::HarnBuiltin,
1380 ToolExecutor::HostBridge,
1381 ToolExecutor::McpServer {
1382 server_name: "linear".to_string(),
1383 },
1384 ToolExecutor::ProviderNative,
1385 ] {
1386 let json = serde_json::to_value(&executor).unwrap();
1387 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1388 match &executor {
1389 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1390 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1391 ToolExecutor::McpServer { server_name } => {
1392 assert_eq!(kind, "mcp_server");
1393 assert_eq!(json["server_name"], *server_name);
1394 }
1395 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1396 }
1397 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1398 assert_eq!(recovered, executor);
1399 }
1400 }
1401
1402 #[test]
1403 fn tool_call_error_category_from_internal_collapses_transient_family() {
1404 use crate::value::ErrorCategory as Internal;
1405 assert_eq!(
1406 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1407 ToolCallErrorCategory::Timeout
1408 );
1409 for net in [
1410 Internal::RateLimit,
1411 Internal::Overloaded,
1412 Internal::ServerError,
1413 Internal::TransientNetwork,
1414 ] {
1415 assert_eq!(
1416 ToolCallErrorCategory::from_internal(&net),
1417 ToolCallErrorCategory::Network,
1418 "{net:?} should map to Network",
1419 );
1420 }
1421 assert_eq!(
1422 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1423 ToolCallErrorCategory::SchemaValidation
1424 );
1425 assert_eq!(
1426 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1427 ToolCallErrorCategory::ToolError
1428 );
1429 assert_eq!(
1430 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1431 ToolCallErrorCategory::PermissionDenied
1432 );
1433 assert_eq!(
1434 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1435 ToolCallErrorCategory::Cancelled
1436 );
1437 for bridge in [
1438 Internal::Auth,
1439 Internal::EgressBlocked,
1440 Internal::NotFound,
1441 Internal::CircuitOpen,
1442 Internal::Generic,
1443 ] {
1444 assert_eq!(
1445 ToolCallErrorCategory::from_internal(&bridge),
1446 ToolCallErrorCategory::HostBridgeError,
1447 "{bridge:?} should map to HostBridgeError",
1448 );
1449 }
1450 }
1451
1452 #[test]
1453 fn tool_call_update_event_omits_error_category_when_none() {
1454 let event = AgentEvent::ToolCallUpdate {
1455 session_id: "s".into(),
1456 tool_call_id: "t".into(),
1457 tool_name: "read".into(),
1458 status: ToolCallStatus::Completed,
1459 raw_output: None,
1460 error: None,
1461 duration_ms: None,
1462 execution_duration_ms: None,
1463 error_category: None,
1464 executor: None,
1465 parsing: None,
1466
1467 raw_input: None,
1468 raw_input_partial: None,
1469 audit: None,
1470 };
1471 let v = serde_json::to_value(&event).unwrap();
1472 assert_eq!(v["type"], "tool_call_update");
1473 assert!(v.get("error_category").is_none());
1474 }
1475
1476 #[test]
1477 fn tool_call_update_event_serializes_error_category_when_set() {
1478 let event = AgentEvent::ToolCallUpdate {
1479 session_id: "s".into(),
1480 tool_call_id: "t".into(),
1481 tool_name: "read".into(),
1482 status: ToolCallStatus::Failed,
1483 raw_output: None,
1484 error: Some("missing required field".into()),
1485 duration_ms: None,
1486 execution_duration_ms: None,
1487 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1488 executor: None,
1489 parsing: None,
1490
1491 raw_input: None,
1492 raw_input_partial: None,
1493 audit: None,
1494 };
1495 let v = serde_json::to_value(&event).unwrap();
1496 assert_eq!(v["error_category"], "schema_validation");
1497 assert_eq!(v["error"], "missing required field");
1498 }
1499
1500 #[test]
1501 fn tool_call_update_omits_executor_when_absent() {
1502 let event = AgentEvent::ToolCallUpdate {
1506 session_id: "s".into(),
1507 tool_call_id: "tc-1".into(),
1508 tool_name: "read".into(),
1509 status: ToolCallStatus::Completed,
1510 raw_output: None,
1511 error: None,
1512 duration_ms: None,
1513 execution_duration_ms: None,
1514 error_category: None,
1515 executor: None,
1516 parsing: None,
1517
1518 raw_input: None,
1519 raw_input_partial: None,
1520 audit: None,
1521 };
1522 let json = serde_json::to_value(&event).unwrap();
1523 assert!(json.get("executor").is_none(), "got: {json}");
1524 }
1525
1526 #[test]
1527 fn worker_event_status_strings_cover_all_variants() {
1528 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1533 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1534 assert_eq!(
1535 WorkerEvent::WorkerWaitingForInput.as_status(),
1536 "awaiting_input"
1537 );
1538 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1539 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1540 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1541
1542 for terminal in [
1543 WorkerEvent::WorkerCompleted,
1544 WorkerEvent::WorkerFailed,
1545 WorkerEvent::WorkerCancelled,
1546 ] {
1547 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1548 }
1549 for non_terminal in [
1550 WorkerEvent::WorkerSpawned,
1551 WorkerEvent::WorkerProgressed,
1552 WorkerEvent::WorkerWaitingForInput,
1553 ] {
1554 assert!(
1555 !non_terminal.is_terminal(),
1556 "{non_terminal:?} should not be terminal"
1557 );
1558 }
1559 }
1560
1561 #[test]
1562 fn worker_update_event_routes_through_session_keyed_sink() {
1563 reset_all_sinks();
1568 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1569 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1570 impl AgentEventSink for CapturingSink {
1571 fn handle_event(&self, event: &AgentEvent) {
1572 self.0
1573 .lock()
1574 .expect("captured sink mutex poisoned")
1575 .push(event.clone());
1576 }
1577 }
1578 register_sink(
1579 "worker-session-1",
1580 Arc::new(CapturingSink(captured.clone())),
1581 );
1582 emit_event(&AgentEvent::WorkerUpdate {
1583 session_id: "worker-session-1".into(),
1584 worker_id: "worker_42".into(),
1585 worker_name: "review_captain".into(),
1586 worker_task: "review pr".into(),
1587 worker_mode: "delegated_stage".into(),
1588 event: WorkerEvent::WorkerWaitingForInput,
1589 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1590 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1591 audit: None,
1592 });
1593 emit_event(&AgentEvent::WorkerUpdate {
1595 session_id: "other-session".into(),
1596 worker_id: "w2".into(),
1597 worker_name: "n2".into(),
1598 worker_task: "t2".into(),
1599 worker_mode: "delegated_stage".into(),
1600 event: WorkerEvent::WorkerCompleted,
1601 status: "completed".into(),
1602 metadata: serde_json::json!({}),
1603 audit: None,
1604 });
1605 let received = captured.lock().unwrap().clone();
1606 assert_eq!(received.len(), 1, "got: {received:?}");
1607 match &received[0] {
1608 AgentEvent::WorkerUpdate {
1609 session_id,
1610 worker_id,
1611 event,
1612 status,
1613 ..
1614 } => {
1615 assert_eq!(session_id, "worker-session-1");
1616 assert_eq!(worker_id, "worker_42");
1617 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1618 assert_eq!(status, "awaiting_input");
1619 }
1620 other => panic!("expected WorkerUpdate, got {other:?}"),
1621 }
1622 reset_all_sinks();
1623 }
1624
1625 #[test]
1626 fn worker_update_event_serializes_to_canonical_shape() {
1627 let event = AgentEvent::WorkerUpdate {
1633 session_id: "s".into(),
1634 worker_id: "w".into(),
1635 worker_name: "n".into(),
1636 worker_task: "t".into(),
1637 worker_mode: "delegated_stage".into(),
1638 event: WorkerEvent::WorkerProgressed,
1639 status: "progressed".into(),
1640 metadata: serde_json::json!({"started_at": "0193..."}),
1641 audit: Some(serde_json::json!({"run_id": "run_x"})),
1642 };
1643 let value = serde_json::to_value(&event).unwrap();
1644 assert_eq!(value["type"], "worker_update");
1645 assert_eq!(value["session_id"], "s");
1646 assert_eq!(value["worker_id"], "w");
1647 assert_eq!(value["status"], "progressed");
1648 assert_eq!(value["audit"]["run_id"], "run_x");
1649
1650 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1654 match recovered {
1655 AgentEvent::WorkerUpdate {
1656 event: recovered_event,
1657 ..
1658 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1659 other => panic!("expected WorkerUpdate, got {other:?}"),
1660 }
1661 }
1662
1663 #[test]
1664 fn tool_call_update_includes_executor_when_present() {
1665 let event = AgentEvent::ToolCallUpdate {
1666 session_id: "s".into(),
1667 tool_call_id: "tc-1".into(),
1668 tool_name: "read".into(),
1669 status: ToolCallStatus::Completed,
1670 raw_output: None,
1671 error: None,
1672 duration_ms: None,
1673 execution_duration_ms: None,
1674 error_category: None,
1675 executor: Some(ToolExecutor::McpServer {
1676 server_name: "github".into(),
1677 }),
1678 parsing: None,
1679
1680 raw_input: None,
1681 raw_input_partial: None,
1682 audit: None,
1683 };
1684 let json = serde_json::to_value(&event).unwrap();
1685 assert_eq!(json["executor"]["kind"], "mcp_server");
1686 assert_eq!(json["executor"]["server_name"], "github");
1687 }
1688
1689 #[test]
1690 fn tool_call_update_omits_audit_when_absent() {
1691 let event = AgentEvent::ToolCallUpdate {
1692 session_id: "s".into(),
1693 tool_call_id: "tc-1".into(),
1694 tool_name: "read".into(),
1695 status: ToolCallStatus::Completed,
1696 raw_output: None,
1697 error: None,
1698 duration_ms: None,
1699 execution_duration_ms: None,
1700 error_category: None,
1701 executor: None,
1702 parsing: None,
1703 raw_input: None,
1704 raw_input_partial: None,
1705 audit: None,
1706 };
1707 let json = serde_json::to_value(&event).unwrap();
1708 assert!(json.get("audit").is_none(), "got: {json}");
1709 }
1710
1711 #[test]
1712 fn tool_call_update_includes_audit_when_present() {
1713 let audit = MutationSessionRecord {
1714 session_id: "session_42".into(),
1715 run_id: Some("run_42".into()),
1716 mutation_scope: "apply_workspace".into(),
1717 execution_kind: Some("worker".into()),
1718 ..Default::default()
1719 };
1720 let event = AgentEvent::ToolCallUpdate {
1721 session_id: "s".into(),
1722 tool_call_id: "tc-1".into(),
1723 tool_name: "edit_file".into(),
1724 status: ToolCallStatus::Completed,
1725 raw_output: None,
1726 error: None,
1727 duration_ms: None,
1728 execution_duration_ms: None,
1729 error_category: None,
1730 executor: Some(ToolExecutor::HostBridge),
1731 parsing: None,
1732 raw_input: None,
1733 raw_input_partial: None,
1734 audit: Some(audit),
1735 };
1736 let json = serde_json::to_value(&event).unwrap();
1737 assert_eq!(json["audit"]["session_id"], "session_42");
1738 assert_eq!(json["audit"]["run_id"], "run_42");
1739 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1740 assert_eq!(json["audit"]["execution_kind"], "worker");
1741 }
1742
1743 #[test]
1744 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1745 let raw = serde_json::json!({
1746 "type": "tool_call_update",
1747 "session_id": "s",
1748 "tool_call_id": "tc-1",
1749 "tool_name": "read",
1750 "status": "completed",
1751 "raw_output": null,
1752 "error": null,
1753 });
1754 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1755 match event {
1756 AgentEvent::ToolCallUpdate { audit, .. } => {
1757 assert!(audit.is_none());
1758 }
1759 other => panic!("expected ToolCallUpdate, got {other:?}"),
1760 }
1761 }
1762
1763 #[test]
1764 fn tool_call_audit_serializes_with_free_form_audit_payload() {
1765 let audit = serde_json::json!({
1770 "summary": "Searched codebase",
1771 "kind": "search",
1772 "consent": {"decision": "approved", "decided_by": "auto"},
1773 "layers": [{"name": "with_required_reason", "status": "ok"}],
1774 });
1775 let event = AgentEvent::ToolCallAudit {
1776 session_id: "s".into(),
1777 tool_call_id: "tc-1".into(),
1778 tool_name: "search_files".into(),
1779 audit: audit.clone(),
1780 };
1781 let json = serde_json::to_value(&event).unwrap();
1782 assert_eq!(json["type"], "tool_call_audit");
1783 assert_eq!(json["session_id"], "s");
1784 assert_eq!(json["tool_call_id"], "tc-1");
1785 assert_eq!(json["tool_name"], "search_files");
1786 assert_eq!(json["audit"], audit);
1787 }
1788
1789 #[test]
1790 fn tool_call_audit_session_id_routes_correctly() {
1791 let event = AgentEvent::ToolCallAudit {
1792 session_id: "abc".into(),
1793 tool_call_id: "tc".into(),
1794 tool_name: "read".into(),
1795 audit: serde_json::Value::Null,
1796 };
1797 assert_eq!(event.session_id(), "abc");
1798 }
1799}