1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117impl ToolCallStatus {
118 pub const ALL: [Self; 4] = [
119 Self::Pending,
120 Self::InProgress,
121 Self::Completed,
122 Self::Failed,
123 ];
124
125 pub fn as_str(self) -> &'static str {
126 match self {
127 Self::Pending => "pending",
128 Self::InProgress => "in_progress",
129 Self::Completed => "completed",
130 Self::Failed => "failed",
131 }
132 }
133}
134
135#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144 SchemaValidation,
147 ToolError,
150 McpServerError,
152 HostBridgeError,
154 PermissionDenied,
157 RejectedLoop,
160 ParseAborted,
168 Timeout,
170 Network,
172 Cancelled,
174 Unknown,
176}
177
178impl ToolCallErrorCategory {
179 pub const ALL: [Self; 11] = [
180 Self::SchemaValidation,
181 Self::ToolError,
182 Self::McpServerError,
183 Self::HostBridgeError,
184 Self::PermissionDenied,
185 Self::RejectedLoop,
186 Self::ParseAborted,
187 Self::Timeout,
188 Self::Network,
189 Self::Cancelled,
190 Self::Unknown,
191 ];
192
193 pub fn as_str(self) -> &'static str {
194 match self {
195 Self::SchemaValidation => "schema_validation",
196 Self::ToolError => "tool_error",
197 Self::McpServerError => "mcp_server_error",
198 Self::HostBridgeError => "host_bridge_error",
199 Self::PermissionDenied => "permission_denied",
200 Self::RejectedLoop => "rejected_loop",
201 Self::ParseAborted => "parse_aborted",
202 Self::Timeout => "timeout",
203 Self::Network => "network",
204 Self::Cancelled => "cancelled",
205 Self::Unknown => "unknown",
206 }
207 }
208
209 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216 use crate::value::ErrorCategory as Internal;
217 match category {
218 Internal::Timeout => Self::Timeout,
219 Internal::RateLimit
220 | Internal::Overloaded
221 | Internal::ServerError
222 | Internal::TransientNetwork => Self::Network,
223 Internal::SchemaValidation => Self::SchemaValidation,
224 Internal::ToolError => Self::ToolError,
225 Internal::ToolRejected => Self::PermissionDenied,
226 Internal::Cancelled => Self::Cancelled,
227 Internal::Auth
228 | Internal::EgressBlocked
229 | Internal::NotFound
230 | Internal::CircuitOpen
231 | Internal::BudgetExceeded
232 | Internal::Generic => Self::HostBridgeError,
233 }
234 }
235}
236
237#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250 HarnBuiltin,
253 HostBridge,
256 McpServer { server_name: String },
260 ProviderNative,
265}
266
267#[derive(Clone, Debug, Serialize, Deserialize)]
271#[serde(tag = "type", rename_all = "snake_case")]
272pub enum AgentEvent {
273 AgentMessageChunk {
274 session_id: String,
275 content: String,
276 },
277 AgentThoughtChunk {
278 session_id: String,
279 content: String,
280 },
281 ToolCall {
282 session_id: String,
283 tool_call_id: String,
284 tool_name: String,
285 kind: Option<ToolKind>,
286 status: ToolCallStatus,
287 raw_input: serde_json::Value,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
301 parsing: Option<bool>,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
306 audit: Option<MutationSessionRecord>,
307 },
308 ToolCallUpdate {
309 session_id: String,
310 tool_call_id: String,
311 tool_name: String,
312 status: ToolCallStatus,
313 raw_output: Option<serde_json::Value>,
314 error: Option<String>,
315 #[serde(default, skip_serializing_if = "Option::is_none")]
323 duration_ms: Option<u64>,
324 #[serde(default, skip_serializing_if = "Option::is_none")]
328 execution_duration_ms: Option<u64>,
329 #[serde(default, skip_serializing_if = "Option::is_none")]
335 error_category: Option<ToolCallErrorCategory>,
336 #[serde(default, skip_serializing_if = "Option::is_none")]
341 executor: Option<ToolExecutor>,
342 #[serde(default, skip_serializing_if = "Option::is_none")]
351 parsing: Option<bool>,
352 #[serde(default, skip_serializing_if = "Option::is_none")]
360 raw_input: Option<serde_json::Value>,
361 #[serde(default, skip_serializing_if = "Option::is_none")]
365 raw_input_partial: Option<String>,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
371 audit: Option<MutationSessionRecord>,
372 },
373 Plan {
374 session_id: String,
375 plan: serde_json::Value,
376 },
377 TurnStart {
378 session_id: String,
379 iteration: usize,
380 },
381 TurnEnd {
382 session_id: String,
383 iteration: usize,
384 turn_info: serde_json::Value,
385 },
386 SessionClosed {
390 session_id: String,
391 reason: String,
392 status: String,
393 metadata: serde_json::Value,
394 },
395 JudgeDecision {
396 session_id: String,
397 iteration: usize,
398 verdict: String,
399 reasoning: String,
400 next_step: Option<String>,
401 judge_duration_ms: u64,
402 },
403 TypedCheckpoint {
404 session_id: String,
405 checkpoint: serde_json::Value,
406 },
407 FeedbackInjected {
408 session_id: String,
409 kind: String,
410 content: String,
411 },
412 BudgetExhausted {
416 session_id: String,
417 max_iterations: usize,
418 },
419 LoopStuck {
423 session_id: String,
424 max_nudges: usize,
425 last_iteration: usize,
426 tail_excerpt: String,
427 },
428 DaemonWatchdogTripped {
433 session_id: String,
434 attempts: usize,
435 elapsed_ms: u64,
436 },
437 SkillActivated {
441 session_id: String,
442 skill_name: String,
443 iteration: usize,
444 reason: String,
445 },
446 SkillDeactivated {
449 session_id: String,
450 skill_name: String,
451 iteration: usize,
452 },
453 SkillScopeTools {
456 session_id: String,
457 skill_name: String,
458 allowed_tools: Vec<String>,
459 },
460 ToolSearchQuery {
468 session_id: String,
469 tool_use_id: String,
470 name: String,
471 query: serde_json::Value,
472 strategy: String,
473 mode: String,
474 },
475 ToolSearchResult {
479 session_id: String,
480 tool_use_id: String,
481 promoted: Vec<String>,
482 strategy: String,
483 mode: String,
484 },
485 TranscriptCompacted {
486 session_id: String,
487 mode: String,
488 strategy: String,
489 archived_messages: usize,
490 estimated_tokens_before: usize,
491 estimated_tokens_after: usize,
492 snapshot_asset_id: Option<String>,
493 },
494 Handoff {
495 session_id: String,
496 artifact_id: String,
497 handoff: Box<HandoffArtifact>,
498 },
499 FsWatch {
500 session_id: String,
501 subscription_id: String,
502 events: Vec<FsWatchEvent>,
503 },
504 WorkerUpdate {
520 session_id: String,
521 worker_id: String,
522 worker_name: String,
523 worker_task: String,
524 worker_mode: String,
525 event: WorkerEvent,
526 status: String,
527 metadata: serde_json::Value,
528 audit: Option<serde_json::Value>,
529 },
530 HitlRequested {
538 session_id: String,
539 request_id: String,
540 kind: String,
541 payload: serde_json::Value,
542 },
543 HitlResolved {
549 session_id: String,
550 request_id: String,
551 kind: String,
552 outcome: String,
553 },
554 LoopControlDecision {
561 session_id: String,
562 iteration: usize,
563 action: String,
564 old_limit: usize,
565 new_limit: usize,
566 reason: String,
567 status: String,
568 },
569 AgentLoopStallWarning {
574 session_id: String,
575 warning: serde_json::Value,
576 },
577 ToolCallAudit {
590 session_id: String,
591 tool_call_id: String,
592 tool_name: String,
593 audit: serde_json::Value,
594 },
595 CacheHit {
603 session_id: String,
604 key: String,
605 backend: String,
606 namespace: String,
607 payload: serde_json::Value,
608 },
609 CacheMiss {
615 session_id: String,
616 key: String,
617 backend: String,
618 namespace: String,
619 payload: serde_json::Value,
620 },
621}
622
623impl AgentEvent {
624 pub fn session_id(&self) -> &str {
625 match self {
626 Self::AgentMessageChunk { session_id, .. }
627 | Self::AgentThoughtChunk { session_id, .. }
628 | Self::ToolCall { session_id, .. }
629 | Self::ToolCallUpdate { session_id, .. }
630 | Self::Plan { session_id, .. }
631 | Self::TurnStart { session_id, .. }
632 | Self::TurnEnd { session_id, .. }
633 | Self::SessionClosed { session_id, .. }
634 | Self::JudgeDecision { session_id, .. }
635 | Self::TypedCheckpoint { session_id, .. }
636 | Self::FeedbackInjected { session_id, .. }
637 | Self::BudgetExhausted { session_id, .. }
638 | Self::LoopStuck { session_id, .. }
639 | Self::DaemonWatchdogTripped { session_id, .. }
640 | Self::SkillActivated { session_id, .. }
641 | Self::SkillDeactivated { session_id, .. }
642 | Self::SkillScopeTools { session_id, .. }
643 | Self::ToolSearchQuery { session_id, .. }
644 | Self::ToolSearchResult { session_id, .. }
645 | Self::TranscriptCompacted { session_id, .. }
646 | Self::Handoff { session_id, .. }
647 | Self::FsWatch { session_id, .. }
648 | Self::WorkerUpdate { session_id, .. }
649 | Self::HitlRequested { session_id, .. }
650 | Self::HitlResolved { session_id, .. }
651 | Self::LoopControlDecision { session_id, .. }
652 | Self::AgentLoopStallWarning { session_id, .. }
653 | Self::ToolCallAudit { session_id, .. }
654 | Self::CacheHit { session_id, .. }
655 | Self::CacheMiss { session_id, .. } => session_id,
656 }
657 }
658}
659
660pub trait AgentEventSink: Send + Sync {
663 fn handle_event(&self, event: &AgentEvent);
664}
665
666#[derive(Clone, Debug, Serialize, Deserialize)]
673pub struct PersistedAgentEvent {
674 pub index: u64,
678 pub emitted_at_ms: i64,
682 pub frame_depth: Option<u32>,
686 #[serde(flatten)]
688 pub event: AgentEvent,
689}
690
691pub struct JsonlEventSink {
696 state: Mutex<JsonlEventSinkState>,
697 base_path: std::path::PathBuf,
698}
699
700struct JsonlEventSinkState {
701 writer: std::io::BufWriter<std::fs::File>,
702 index: u64,
703 bytes_written: u64,
704 rotation: u32,
705}
706
707impl JsonlEventSink {
708 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
712
713 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
717 let base_path = base_path.into();
718 if let Some(parent) = base_path.parent() {
719 std::fs::create_dir_all(parent)?;
720 }
721 let file = std::fs::OpenOptions::new()
722 .create(true)
723 .truncate(true)
724 .write(true)
725 .open(&base_path)?;
726 Ok(Arc::new(Self {
727 state: Mutex::new(JsonlEventSinkState {
728 writer: std::io::BufWriter::new(file),
729 index: 0,
730 bytes_written: 0,
731 rotation: 0,
732 }),
733 base_path,
734 }))
735 }
736
737 pub fn flush(&self) -> std::io::Result<()> {
740 use std::io::Write as _;
741 self.state
742 .lock()
743 .expect("jsonl sink mutex poisoned")
744 .writer
745 .flush()
746 }
747
748 pub fn event_count(&self) -> u64 {
751 self.state.lock().expect("jsonl sink mutex poisoned").index
752 }
753
754 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
755 use std::io::Write as _;
756 if state.bytes_written < Self::ROTATE_BYTES {
757 return Ok(());
758 }
759 state.writer.flush()?;
760 state.rotation += 1;
761 let suffix = format!("-{:06}", state.rotation);
762 let rotated = self.base_path.with_file_name({
763 let stem = self
764 .base_path
765 .file_stem()
766 .and_then(|s| s.to_str())
767 .unwrap_or("event_log");
768 let ext = self
769 .base_path
770 .extension()
771 .and_then(|e| e.to_str())
772 .unwrap_or("jsonl");
773 format!("{stem}{suffix}.{ext}")
774 });
775 let file = std::fs::OpenOptions::new()
776 .create(true)
777 .truncate(true)
778 .write(true)
779 .open(&rotated)?;
780 state.writer = std::io::BufWriter::new(file);
781 state.bytes_written = 0;
782 Ok(())
783 }
784}
785
786pub struct EventLogSink {
791 log: Arc<AnyEventLog>,
792 topic: Topic,
793 session_id: String,
794}
795
796impl EventLogSink {
797 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
798 let session_id = session_id.into();
799 let topic = Topic::new(format!(
800 "observability.agent_events.{}",
801 crate::event_log::sanitize_topic_component(&session_id)
802 ))
803 .expect("session id should sanitize to a valid topic");
804 Arc::new(Self {
805 log,
806 topic,
807 session_id,
808 })
809 }
810}
811
812impl AgentEventSink for JsonlEventSink {
813 fn handle_event(&self, event: &AgentEvent) {
814 use std::io::Write as _;
815 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
816 let index = state.index;
817 state.index += 1;
818 let emitted_at_ms = std::time::SystemTime::now()
819 .duration_since(std::time::UNIX_EPOCH)
820 .map(|d| d.as_millis() as i64)
821 .unwrap_or(0);
822 let envelope = PersistedAgentEvent {
823 index,
824 emitted_at_ms,
825 frame_depth: None,
826 event: event.clone(),
827 };
828 if let Ok(line) = serde_json::to_string(&envelope) {
829 let _ = state.writer.write_all(line.as_bytes());
834 let _ = state.writer.write_all(b"\n");
835 state.bytes_written += line.len() as u64 + 1;
836 let _ = self.rotate_if_needed(&mut state);
837 }
838 }
839}
840
841impl AgentEventSink for EventLogSink {
842 fn handle_event(&self, event: &AgentEvent) {
843 let event_json = match serde_json::to_value(event) {
844 Ok(value) => value,
845 Err(_) => return,
846 };
847 let event_kind = event_json
848 .get("type")
849 .and_then(|value| value.as_str())
850 .unwrap_or("agent_event")
851 .to_string();
852 let payload = serde_json::json!({
853 "index_hint": now_ms(),
854 "session_id": self.session_id,
855 "event": event_json,
856 });
857 let mut headers = std::collections::BTreeMap::new();
858 headers.insert("session_id".to_string(), self.session_id.clone());
859 let log = self.log.clone();
860 let topic = self.topic.clone();
861 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
862 if let Ok(handle) = tokio::runtime::Handle::try_current() {
863 handle.spawn(async move {
864 let _ = log.append(&topic, record).await;
865 });
866 } else {
867 let _ = futures::executor::block_on(log.append(&topic, record));
868 }
869 }
870}
871
872impl Drop for JsonlEventSink {
873 fn drop(&mut self) {
874 if let Ok(mut state) = self.state.lock() {
875 use std::io::Write as _;
876 let _ = state.writer.flush();
877 }
878 }
879}
880
881pub struct MultiSink {
883 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
884}
885
886impl MultiSink {
887 pub fn new() -> Self {
888 Self {
889 sinks: Mutex::new(Vec::new()),
890 }
891 }
892 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
893 self.sinks.lock().expect("sink mutex poisoned").push(sink);
894 }
895 pub fn len(&self) -> usize {
896 self.sinks.lock().expect("sink mutex poisoned").len()
897 }
898 pub fn is_empty(&self) -> bool {
899 self.len() == 0
900 }
901}
902
903impl Default for MultiSink {
904 fn default() -> Self {
905 Self::new()
906 }
907}
908
909impl AgentEventSink for MultiSink {
910 fn handle_event(&self, event: &AgentEvent) {
911 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
917 for sink in sinks {
918 sink.handle_event(event);
919 }
920 }
921}
922
923#[cfg(test)]
924#[derive(Clone)]
925struct RegisteredSink {
926 owner: std::thread::ThreadId,
927 sink: Arc<dyn AgentEventSink>,
928}
929
930#[cfg(not(test))]
931type RegisteredSink = Arc<dyn AgentEventSink>;
932
933type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
934
935fn external_sinks() -> &'static ExternalSinkRegistry {
936 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
937 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
938}
939
940pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
941 let session_id = session_id.into();
942 let mut reg = external_sinks().write().expect("sink registry poisoned");
943 #[cfg(test)]
944 let sink = RegisteredSink {
945 owner: std::thread::current().id(),
946 sink,
947 };
948 reg.entry(session_id).or_default().push(sink);
949}
950
951pub fn clear_session_sinks(session_id: &str) {
955 #[cfg(test)]
956 {
957 let owner = std::thread::current().id();
958 let mut reg = external_sinks().write().expect("sink registry poisoned");
959 if let Some(sinks) = reg.get_mut(session_id) {
960 sinks.retain(|sink| sink.owner != owner);
961 if sinks.is_empty() {
962 reg.remove(session_id);
963 }
964 }
965 }
966 #[cfg(not(test))]
967 {
968 external_sinks()
969 .write()
970 .expect("sink registry poisoned")
971 .remove(session_id);
972 }
973}
974
975pub fn reset_all_sinks() {
976 #[cfg(test)]
977 {
978 let owner = std::thread::current().id();
979 let mut reg = external_sinks().write().expect("sink registry poisoned");
980 reg.retain(|_, sinks| {
981 sinks.retain(|sink| sink.owner != owner);
982 !sinks.is_empty()
983 });
984 crate::agent_sessions::reset_session_store();
985 }
986 #[cfg(not(test))]
987 {
988 external_sinks()
989 .write()
990 .expect("sink registry poisoned")
991 .clear();
992 crate::agent_sessions::reset_session_store();
993 }
994}
995
996pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
1003 if source_session_id.is_empty() || target_session_id.is_empty() {
1004 return;
1005 }
1006 if source_session_id == target_session_id {
1007 return;
1008 }
1009 let mut reg = external_sinks().write().expect("sink registry poisoned");
1010 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1011 return;
1012 };
1013 let target = reg.entry(target_session_id.to_string()).or_default();
1014 #[cfg(test)]
1015 {
1016 for source in source_sinks {
1017 let already_present = target
1018 .iter()
1019 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1020 if !already_present {
1021 target.push(source);
1022 }
1023 }
1024 }
1025 #[cfg(not(test))]
1026 {
1027 for source in source_sinks {
1028 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1029 if !already_present {
1030 target.push(source);
1031 }
1032 }
1033 }
1034}
1035
1036pub fn emit_event(event: &AgentEvent) {
1040 let sinks: Vec<Arc<dyn AgentEventSink>> = {
1041 let reg = external_sinks().read().expect("sink registry poisoned");
1042 #[cfg(test)]
1043 {
1044 let owner = std::thread::current().id();
1045 reg.get(event.session_id())
1046 .map(|sinks| {
1047 sinks
1048 .iter()
1049 .filter(|sink| sink.owner == owner)
1050 .map(|sink| sink.sink.clone())
1051 .collect()
1052 })
1053 .unwrap_or_default()
1054 }
1055 #[cfg(not(test))]
1056 {
1057 reg.get(event.session_id()).cloned().unwrap_or_default()
1058 }
1059 };
1060 for sink in sinks {
1061 sink.handle_event(event);
1062 }
1063}
1064
1065fn now_ms() -> i64 {
1066 std::time::SystemTime::now()
1067 .duration_since(std::time::UNIX_EPOCH)
1068 .map(|duration| duration.as_millis() as i64)
1069 .unwrap_or(0)
1070}
1071
1072pub fn session_external_sink_count(session_id: &str) -> usize {
1073 #[cfg(test)]
1074 {
1075 let owner = std::thread::current().id();
1076 return external_sinks()
1077 .read()
1078 .expect("sink registry poisoned")
1079 .get(session_id)
1080 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1081 .unwrap_or(0);
1082 }
1083 #[cfg(not(test))]
1084 {
1085 external_sinks()
1086 .read()
1087 .expect("sink registry poisoned")
1088 .get(session_id)
1089 .map(|v| v.len())
1090 .unwrap_or(0)
1091 }
1092}
1093
1094pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1095 crate::agent_sessions::subscriber_count(session_id)
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::*;
1101 use std::sync::atomic::{AtomicUsize, Ordering};
1102
1103 struct CountingSink(Arc<AtomicUsize>);
1104 impl AgentEventSink for CountingSink {
1105 fn handle_event(&self, _event: &AgentEvent) {
1106 self.0.fetch_add(1, Ordering::SeqCst);
1107 }
1108 }
1109
1110 #[test]
1111 fn multi_sink_fans_out_in_order() {
1112 let multi = MultiSink::new();
1113 let a = Arc::new(AtomicUsize::new(0));
1114 let b = Arc::new(AtomicUsize::new(0));
1115 multi.push(Arc::new(CountingSink(a.clone())));
1116 multi.push(Arc::new(CountingSink(b.clone())));
1117 let event = AgentEvent::TurnStart {
1118 session_id: "s1".into(),
1119 iteration: 1,
1120 };
1121 multi.handle_event(&event);
1122 assert_eq!(a.load(Ordering::SeqCst), 1);
1123 assert_eq!(b.load(Ordering::SeqCst), 1);
1124 }
1125
1126 #[test]
1127 fn session_scoped_sink_routing() {
1128 reset_all_sinks();
1129 let a = Arc::new(AtomicUsize::new(0));
1130 let b = Arc::new(AtomicUsize::new(0));
1131 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1132 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1133 emit_event(&AgentEvent::TurnStart {
1134 session_id: "session-a".into(),
1135 iteration: 0,
1136 });
1137 assert_eq!(a.load(Ordering::SeqCst), 1);
1138 assert_eq!(b.load(Ordering::SeqCst), 0);
1139 emit_event(&AgentEvent::TurnEnd {
1140 session_id: "session-b".into(),
1141 iteration: 0,
1142 turn_info: serde_json::json!({}),
1143 });
1144 assert_eq!(a.load(Ordering::SeqCst), 1);
1145 assert_eq!(b.load(Ordering::SeqCst), 1);
1146 clear_session_sinks("session-a");
1147 assert_eq!(session_external_sink_count("session-a"), 0);
1148 assert_eq!(session_external_sink_count("session-b"), 1);
1149 reset_all_sinks();
1150 }
1151
1152 #[test]
1153 fn newly_opened_child_session_inherits_current_external_sinks() {
1154 reset_all_sinks();
1155 let delivered = Arc::new(AtomicUsize::new(0));
1156 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1157 {
1158 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1159 let inner = crate::agent_sessions::open_or_create(None);
1160 assert_ne!(inner, "outer-session");
1161 emit_event(&AgentEvent::TurnStart {
1162 session_id: inner,
1163 iteration: 0,
1164 });
1165 }
1166 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1167 reset_all_sinks();
1168 }
1169
1170 #[test]
1171 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1172 use std::io::{BufRead, BufReader};
1173 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1174 std::fs::create_dir_all(&dir).unwrap();
1175 let path = dir.join("event_log.jsonl");
1176 let sink = JsonlEventSink::open(&path).unwrap();
1177 for i in 0..5 {
1178 sink.handle_event(&AgentEvent::TurnStart {
1179 session_id: "s".into(),
1180 iteration: i,
1181 });
1182 }
1183 assert_eq!(sink.event_count(), 5);
1184 sink.flush().unwrap();
1185
1186 let file = std::fs::File::open(&path).unwrap();
1188 let mut last_idx: i64 = -1;
1189 let mut last_ts: i64 = 0;
1190 for line in BufReader::new(file).lines() {
1191 let line = line.unwrap();
1192 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1193 let idx = val["index"].as_i64().unwrap();
1194 let ts = val["emitted_at_ms"].as_i64().unwrap();
1195 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1196 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1197 last_idx = idx;
1198 last_ts = ts;
1199 assert_eq!(val["type"], "turn_start");
1201 }
1202 assert_eq!(last_idx, 4);
1203 let _ = std::fs::remove_file(&path);
1204 }
1205
1206 #[test]
1207 fn judge_decision_round_trips_through_jsonl_sink() {
1208 use std::io::{BufRead, BufReader};
1209 let dir =
1210 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1211 std::fs::create_dir_all(&dir).unwrap();
1212 let path = dir.join("event_log.jsonl");
1213 let sink = JsonlEventSink::open(&path).unwrap();
1214 sink.handle_event(&AgentEvent::JudgeDecision {
1215 session_id: "s".into(),
1216 iteration: 2,
1217 verdict: "continue".into(),
1218 reasoning: "needs a concrete next step".into(),
1219 next_step: Some("run the verifier".into()),
1220 judge_duration_ms: 17,
1221 });
1222 sink.flush().unwrap();
1223
1224 let file = std::fs::File::open(&path).unwrap();
1225 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1226 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1227 match recovered.event {
1228 AgentEvent::JudgeDecision {
1229 session_id,
1230 iteration,
1231 verdict,
1232 reasoning,
1233 next_step,
1234 judge_duration_ms,
1235 } => {
1236 assert_eq!(session_id, "s");
1237 assert_eq!(iteration, 2);
1238 assert_eq!(verdict, "continue");
1239 assert_eq!(reasoning, "needs a concrete next step");
1240 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1241 assert_eq!(judge_duration_ms, 17);
1242 }
1243 other => panic!("expected JudgeDecision, got {other:?}"),
1244 }
1245 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1246 assert_eq!(value["type"], "judge_decision");
1247 let _ = std::fs::remove_file(&path);
1248 let _ = std::fs::remove_dir(&dir);
1249 }
1250
1251 #[test]
1252 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1253 let terminal = AgentEvent::ToolCallUpdate {
1258 session_id: "s".into(),
1259 tool_call_id: "tc-1".into(),
1260 tool_name: "read".into(),
1261 status: ToolCallStatus::Completed,
1262 raw_output: None,
1263 error: None,
1264 duration_ms: Some(42),
1265 execution_duration_ms: Some(7),
1266 error_category: None,
1267 executor: None,
1268 parsing: None,
1269
1270 raw_input: None,
1271 raw_input_partial: None,
1272 audit: None,
1273 };
1274 let value = serde_json::to_value(&terminal).unwrap();
1275 assert_eq!(value["duration_ms"], serde_json::json!(42));
1276 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1277
1278 let intermediate = AgentEvent::ToolCallUpdate {
1282 session_id: "s".into(),
1283 tool_call_id: "tc-1".into(),
1284 tool_name: "read".into(),
1285 status: ToolCallStatus::InProgress,
1286 raw_output: None,
1287 error: None,
1288 duration_ms: None,
1289 execution_duration_ms: None,
1290 error_category: None,
1291 executor: None,
1292 parsing: None,
1293
1294 raw_input: None,
1295 raw_input_partial: None,
1296 audit: None,
1297 };
1298 let value = serde_json::to_value(&intermediate).unwrap();
1299 let object = value.as_object().expect("update serializes as object");
1300 assert!(
1301 !object.contains_key("duration_ms"),
1302 "duration_ms must be omitted when None: {value}"
1303 );
1304 assert!(
1305 !object.contains_key("execution_duration_ms"),
1306 "execution_duration_ms must be omitted when None: {value}"
1307 );
1308 }
1309
1310 #[test]
1311 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1312 let raw = serde_json::json!({
1315 "type": "tool_call_update",
1316 "session_id": "s",
1317 "tool_call_id": "tc-1",
1318 "tool_name": "read",
1319 "status": "completed",
1320 "raw_output": null,
1321 "error": null,
1322 });
1323 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1324 match event {
1325 AgentEvent::ToolCallUpdate {
1326 duration_ms,
1327 execution_duration_ms,
1328 ..
1329 } => {
1330 assert!(duration_ms.is_none());
1331 assert!(execution_duration_ms.is_none());
1332 }
1333 other => panic!("expected ToolCallUpdate, got {other:?}"),
1334 }
1335 }
1336
1337 #[test]
1338 fn tool_call_status_serde() {
1339 assert_eq!(
1340 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1341 "\"pending\""
1342 );
1343 assert_eq!(
1344 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1345 "\"in_progress\""
1346 );
1347 assert_eq!(
1348 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1349 "\"completed\""
1350 );
1351 assert_eq!(
1352 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1353 "\"failed\""
1354 );
1355 }
1356
1357 #[test]
1358 fn tool_call_error_category_serializes_as_snake_case() {
1359 let pairs = [
1360 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1361 (ToolCallErrorCategory::ToolError, "tool_error"),
1362 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1363 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1364 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1365 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1366 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1367 (ToolCallErrorCategory::Timeout, "timeout"),
1368 (ToolCallErrorCategory::Network, "network"),
1369 (ToolCallErrorCategory::Cancelled, "cancelled"),
1370 (ToolCallErrorCategory::Unknown, "unknown"),
1371 ];
1372 for (variant, wire) in pairs {
1373 let encoded = serde_json::to_string(&variant).unwrap();
1374 assert_eq!(encoded, format!("\"{wire}\""));
1375 assert_eq!(variant.as_str(), wire);
1376 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1379 assert_eq!(decoded, variant);
1380 }
1381 }
1382
1383 #[test]
1384 fn tool_executor_round_trips_with_adjacent_tag() {
1385 for executor in [
1390 ToolExecutor::HarnBuiltin,
1391 ToolExecutor::HostBridge,
1392 ToolExecutor::McpServer {
1393 server_name: "linear".to_string(),
1394 },
1395 ToolExecutor::ProviderNative,
1396 ] {
1397 let json = serde_json::to_value(&executor).unwrap();
1398 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1399 match &executor {
1400 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1401 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1402 ToolExecutor::McpServer { server_name } => {
1403 assert_eq!(kind, "mcp_server");
1404 assert_eq!(json["server_name"], *server_name);
1405 }
1406 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1407 }
1408 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1409 assert_eq!(recovered, executor);
1410 }
1411 }
1412
1413 #[test]
1414 fn tool_call_error_category_from_internal_collapses_transient_family() {
1415 use crate::value::ErrorCategory as Internal;
1416 assert_eq!(
1417 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1418 ToolCallErrorCategory::Timeout
1419 );
1420 for net in [
1421 Internal::RateLimit,
1422 Internal::Overloaded,
1423 Internal::ServerError,
1424 Internal::TransientNetwork,
1425 ] {
1426 assert_eq!(
1427 ToolCallErrorCategory::from_internal(&net),
1428 ToolCallErrorCategory::Network,
1429 "{net:?} should map to Network",
1430 );
1431 }
1432 assert_eq!(
1433 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1434 ToolCallErrorCategory::SchemaValidation
1435 );
1436 assert_eq!(
1437 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1438 ToolCallErrorCategory::ToolError
1439 );
1440 assert_eq!(
1441 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1442 ToolCallErrorCategory::PermissionDenied
1443 );
1444 assert_eq!(
1445 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1446 ToolCallErrorCategory::Cancelled
1447 );
1448 for bridge in [
1449 Internal::Auth,
1450 Internal::EgressBlocked,
1451 Internal::NotFound,
1452 Internal::CircuitOpen,
1453 Internal::Generic,
1454 ] {
1455 assert_eq!(
1456 ToolCallErrorCategory::from_internal(&bridge),
1457 ToolCallErrorCategory::HostBridgeError,
1458 "{bridge:?} should map to HostBridgeError",
1459 );
1460 }
1461 }
1462
1463 #[test]
1464 fn tool_call_update_event_omits_error_category_when_none() {
1465 let event = AgentEvent::ToolCallUpdate {
1466 session_id: "s".into(),
1467 tool_call_id: "t".into(),
1468 tool_name: "read".into(),
1469 status: ToolCallStatus::Completed,
1470 raw_output: None,
1471 error: None,
1472 duration_ms: None,
1473 execution_duration_ms: None,
1474 error_category: None,
1475 executor: None,
1476 parsing: None,
1477
1478 raw_input: None,
1479 raw_input_partial: None,
1480 audit: None,
1481 };
1482 let v = serde_json::to_value(&event).unwrap();
1483 assert_eq!(v["type"], "tool_call_update");
1484 assert!(v.get("error_category").is_none());
1485 }
1486
1487 #[test]
1488 fn tool_call_update_event_serializes_error_category_when_set() {
1489 let event = AgentEvent::ToolCallUpdate {
1490 session_id: "s".into(),
1491 tool_call_id: "t".into(),
1492 tool_name: "read".into(),
1493 status: ToolCallStatus::Failed,
1494 raw_output: None,
1495 error: Some("missing required field".into()),
1496 duration_ms: None,
1497 execution_duration_ms: None,
1498 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1499 executor: None,
1500 parsing: None,
1501
1502 raw_input: None,
1503 raw_input_partial: None,
1504 audit: None,
1505 };
1506 let v = serde_json::to_value(&event).unwrap();
1507 assert_eq!(v["error_category"], "schema_validation");
1508 assert_eq!(v["error"], "missing required field");
1509 }
1510
1511 #[test]
1512 fn tool_call_update_omits_executor_when_absent() {
1513 let event = AgentEvent::ToolCallUpdate {
1517 session_id: "s".into(),
1518 tool_call_id: "tc-1".into(),
1519 tool_name: "read".into(),
1520 status: ToolCallStatus::Completed,
1521 raw_output: None,
1522 error: None,
1523 duration_ms: None,
1524 execution_duration_ms: None,
1525 error_category: None,
1526 executor: None,
1527 parsing: None,
1528
1529 raw_input: None,
1530 raw_input_partial: None,
1531 audit: None,
1532 };
1533 let json = serde_json::to_value(&event).unwrap();
1534 assert!(json.get("executor").is_none(), "got: {json}");
1535 }
1536
1537 #[test]
1538 fn worker_event_status_strings_cover_all_variants() {
1539 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1544 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1545 assert_eq!(
1546 WorkerEvent::WorkerWaitingForInput.as_status(),
1547 "awaiting_input"
1548 );
1549 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1550 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1551 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1552
1553 for terminal in [
1554 WorkerEvent::WorkerCompleted,
1555 WorkerEvent::WorkerFailed,
1556 WorkerEvent::WorkerCancelled,
1557 ] {
1558 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1559 }
1560 for non_terminal in [
1561 WorkerEvent::WorkerSpawned,
1562 WorkerEvent::WorkerProgressed,
1563 WorkerEvent::WorkerWaitingForInput,
1564 ] {
1565 assert!(
1566 !non_terminal.is_terminal(),
1567 "{non_terminal:?} should not be terminal"
1568 );
1569 }
1570 }
1571
1572 #[test]
1573 fn worker_update_event_routes_through_session_keyed_sink() {
1574 reset_all_sinks();
1579 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1580 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1581 impl AgentEventSink for CapturingSink {
1582 fn handle_event(&self, event: &AgentEvent) {
1583 self.0
1584 .lock()
1585 .expect("captured sink mutex poisoned")
1586 .push(event.clone());
1587 }
1588 }
1589 register_sink(
1590 "worker-session-1",
1591 Arc::new(CapturingSink(captured.clone())),
1592 );
1593 emit_event(&AgentEvent::WorkerUpdate {
1594 session_id: "worker-session-1".into(),
1595 worker_id: "worker_42".into(),
1596 worker_name: "review_captain".into(),
1597 worker_task: "review pr".into(),
1598 worker_mode: "delegated_stage".into(),
1599 event: WorkerEvent::WorkerWaitingForInput,
1600 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1601 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1602 audit: None,
1603 });
1604 emit_event(&AgentEvent::WorkerUpdate {
1606 session_id: "other-session".into(),
1607 worker_id: "w2".into(),
1608 worker_name: "n2".into(),
1609 worker_task: "t2".into(),
1610 worker_mode: "delegated_stage".into(),
1611 event: WorkerEvent::WorkerCompleted,
1612 status: "completed".into(),
1613 metadata: serde_json::json!({}),
1614 audit: None,
1615 });
1616 let received = captured.lock().unwrap().clone();
1617 assert_eq!(received.len(), 1, "got: {received:?}");
1618 match &received[0] {
1619 AgentEvent::WorkerUpdate {
1620 session_id,
1621 worker_id,
1622 event,
1623 status,
1624 ..
1625 } => {
1626 assert_eq!(session_id, "worker-session-1");
1627 assert_eq!(worker_id, "worker_42");
1628 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1629 assert_eq!(status, "awaiting_input");
1630 }
1631 other => panic!("expected WorkerUpdate, got {other:?}"),
1632 }
1633 reset_all_sinks();
1634 }
1635
1636 #[test]
1637 fn worker_update_event_serializes_to_canonical_shape() {
1638 let event = AgentEvent::WorkerUpdate {
1644 session_id: "s".into(),
1645 worker_id: "w".into(),
1646 worker_name: "n".into(),
1647 worker_task: "t".into(),
1648 worker_mode: "delegated_stage".into(),
1649 event: WorkerEvent::WorkerProgressed,
1650 status: "progressed".into(),
1651 metadata: serde_json::json!({"started_at": "0193..."}),
1652 audit: Some(serde_json::json!({"run_id": "run_x"})),
1653 };
1654 let value = serde_json::to_value(&event).unwrap();
1655 assert_eq!(value["type"], "worker_update");
1656 assert_eq!(value["session_id"], "s");
1657 assert_eq!(value["worker_id"], "w");
1658 assert_eq!(value["status"], "progressed");
1659 assert_eq!(value["audit"]["run_id"], "run_x");
1660
1661 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1665 match recovered {
1666 AgentEvent::WorkerUpdate {
1667 event: recovered_event,
1668 ..
1669 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1670 other => panic!("expected WorkerUpdate, got {other:?}"),
1671 }
1672 }
1673
1674 #[test]
1675 fn tool_call_update_includes_executor_when_present() {
1676 let event = AgentEvent::ToolCallUpdate {
1677 session_id: "s".into(),
1678 tool_call_id: "tc-1".into(),
1679 tool_name: "read".into(),
1680 status: ToolCallStatus::Completed,
1681 raw_output: None,
1682 error: None,
1683 duration_ms: None,
1684 execution_duration_ms: None,
1685 error_category: None,
1686 executor: Some(ToolExecutor::McpServer {
1687 server_name: "github".into(),
1688 }),
1689 parsing: None,
1690
1691 raw_input: None,
1692 raw_input_partial: None,
1693 audit: None,
1694 };
1695 let json = serde_json::to_value(&event).unwrap();
1696 assert_eq!(json["executor"]["kind"], "mcp_server");
1697 assert_eq!(json["executor"]["server_name"], "github");
1698 }
1699
1700 #[test]
1701 fn tool_call_update_omits_audit_when_absent() {
1702 let event = AgentEvent::ToolCallUpdate {
1703 session_id: "s".into(),
1704 tool_call_id: "tc-1".into(),
1705 tool_name: "read".into(),
1706 status: ToolCallStatus::Completed,
1707 raw_output: None,
1708 error: None,
1709 duration_ms: None,
1710 execution_duration_ms: None,
1711 error_category: None,
1712 executor: None,
1713 parsing: None,
1714 raw_input: None,
1715 raw_input_partial: None,
1716 audit: None,
1717 };
1718 let json = serde_json::to_value(&event).unwrap();
1719 assert!(json.get("audit").is_none(), "got: {json}");
1720 }
1721
1722 #[test]
1723 fn tool_call_update_includes_audit_when_present() {
1724 let audit = MutationSessionRecord {
1725 session_id: "session_42".into(),
1726 run_id: Some("run_42".into()),
1727 mutation_scope: "apply_workspace".into(),
1728 execution_kind: Some("worker".into()),
1729 ..Default::default()
1730 };
1731 let event = AgentEvent::ToolCallUpdate {
1732 session_id: "s".into(),
1733 tool_call_id: "tc-1".into(),
1734 tool_name: "edit_file".into(),
1735 status: ToolCallStatus::Completed,
1736 raw_output: None,
1737 error: None,
1738 duration_ms: None,
1739 execution_duration_ms: None,
1740 error_category: None,
1741 executor: Some(ToolExecutor::HostBridge),
1742 parsing: None,
1743 raw_input: None,
1744 raw_input_partial: None,
1745 audit: Some(audit),
1746 };
1747 let json = serde_json::to_value(&event).unwrap();
1748 assert_eq!(json["audit"]["session_id"], "session_42");
1749 assert_eq!(json["audit"]["run_id"], "run_42");
1750 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1751 assert_eq!(json["audit"]["execution_kind"], "worker");
1752 }
1753
1754 #[test]
1755 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1756 let raw = serde_json::json!({
1757 "type": "tool_call_update",
1758 "session_id": "s",
1759 "tool_call_id": "tc-1",
1760 "tool_name": "read",
1761 "status": "completed",
1762 "raw_output": null,
1763 "error": null,
1764 });
1765 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1766 match event {
1767 AgentEvent::ToolCallUpdate { audit, .. } => {
1768 assert!(audit.is_none());
1769 }
1770 other => panic!("expected ToolCallUpdate, got {other:?}"),
1771 }
1772 }
1773
1774 #[test]
1775 fn tool_call_audit_serializes_with_free_form_audit_payload() {
1776 let audit = serde_json::json!({
1781 "summary": "Searched codebase",
1782 "kind": "search",
1783 "consent": {"decision": "approved", "decided_by": "auto"},
1784 "layers": [{"name": "with_required_reason", "status": "ok"}],
1785 });
1786 let event = AgentEvent::ToolCallAudit {
1787 session_id: "s".into(),
1788 tool_call_id: "tc-1".into(),
1789 tool_name: "search_files".into(),
1790 audit: audit.clone(),
1791 };
1792 let json = serde_json::to_value(&event).unwrap();
1793 assert_eq!(json["type"], "tool_call_audit");
1794 assert_eq!(json["session_id"], "s");
1795 assert_eq!(json["tool_call_id"], "tc-1");
1796 assert_eq!(json["tool_name"], "search_files");
1797 assert_eq!(json["audit"], audit);
1798 }
1799
1800 #[test]
1801 fn tool_call_audit_session_id_routes_correctly() {
1802 let event = AgentEvent::ToolCallAudit {
1803 session_id: "abc".into(),
1804 tool_call_id: "tc".into(),
1805 tool_name: "read".into(),
1806 audit: serde_json::Value::Null,
1807 };
1808 assert_eq!(event.session_id(), "abc");
1809 }
1810}