1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::composition::{CompositionChildCall, CompositionChildResult, CompositionRunEnvelope};
27use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
28use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
29use crate::tool_annotations::ToolKind;
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
34pub struct FsWatchEvent {
35 pub kind: String,
36 pub paths: Vec<String>,
37 pub relative_paths: Vec<String>,
38 pub raw_kind: String,
39 pub error: Option<String>,
40}
41
42#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
54pub enum WorkerEvent {
55 WorkerSpawned,
56 WorkerProgressed,
57 WorkerWaitingForInput,
58 WorkerCompleted,
59 WorkerFailed,
60 WorkerCancelled,
61}
62
63impl WorkerEvent {
64 pub const ALL: [Self; 6] = [
69 Self::WorkerSpawned,
70 Self::WorkerProgressed,
71 Self::WorkerWaitingForInput,
72 Self::WorkerCompleted,
73 Self::WorkerFailed,
74 Self::WorkerCancelled,
75 ];
76
77 pub fn as_status(self) -> &'static str {
84 match self {
85 Self::WorkerSpawned => "running",
86 Self::WorkerProgressed => "progressed",
87 Self::WorkerWaitingForInput => "awaiting_input",
88 Self::WorkerCompleted => "completed",
89 Self::WorkerFailed => "failed",
90 Self::WorkerCancelled => "cancelled",
91 }
92 }
93
94 pub fn as_str(self) -> &'static str {
95 match self {
96 Self::WorkerSpawned => "WorkerSpawned",
97 Self::WorkerProgressed => "WorkerProgressed",
98 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
99 Self::WorkerCompleted => "WorkerCompleted",
100 Self::WorkerFailed => "WorkerFailed",
101 Self::WorkerCancelled => "WorkerCancelled",
102 }
103 }
104
105 pub fn is_terminal(self) -> bool {
110 matches!(
111 self,
112 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
113 )
114 }
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
119#[serde(rename_all = "snake_case")]
120pub enum ToolCallStatus {
121 Pending,
123 InProgress,
125 Completed,
127 Failed,
129}
130
131impl ToolCallStatus {
132 pub const ALL: [Self; 4] = [
133 Self::Pending,
134 Self::InProgress,
135 Self::Completed,
136 Self::Failed,
137 ];
138
139 pub fn as_str(self) -> &'static str {
140 match self {
141 Self::Pending => "pending",
142 Self::InProgress => "in_progress",
143 Self::Completed => "completed",
144 Self::Failed => "failed",
145 }
146 }
147}
148
149#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub enum ToolCallErrorCategory {
158 SchemaValidation,
161 ToolError,
164 McpServerError,
166 HostBridgeError,
168 PermissionDenied,
171 RejectedLoop,
174 ParseAborted,
182 Timeout,
184 Network,
186 Cancelled,
188 Unknown,
190}
191
192impl ToolCallErrorCategory {
193 pub const ALL: [Self; 11] = [
194 Self::SchemaValidation,
195 Self::ToolError,
196 Self::McpServerError,
197 Self::HostBridgeError,
198 Self::PermissionDenied,
199 Self::RejectedLoop,
200 Self::ParseAborted,
201 Self::Timeout,
202 Self::Network,
203 Self::Cancelled,
204 Self::Unknown,
205 ];
206
207 pub fn as_str(self) -> &'static str {
208 match self {
209 Self::SchemaValidation => "schema_validation",
210 Self::ToolError => "tool_error",
211 Self::McpServerError => "mcp_server_error",
212 Self::HostBridgeError => "host_bridge_error",
213 Self::PermissionDenied => "permission_denied",
214 Self::RejectedLoop => "rejected_loop",
215 Self::ParseAborted => "parse_aborted",
216 Self::Timeout => "timeout",
217 Self::Network => "network",
218 Self::Cancelled => "cancelled",
219 Self::Unknown => "unknown",
220 }
221 }
222
223 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
230 use crate::value::ErrorCategory as Internal;
231 match category {
232 Internal::Timeout => Self::Timeout,
233 Internal::RateLimit
234 | Internal::Overloaded
235 | Internal::ServerError
236 | Internal::TransientNetwork => Self::Network,
237 Internal::SchemaValidation => Self::SchemaValidation,
238 Internal::ToolError => Self::ToolError,
239 Internal::ToolRejected => Self::PermissionDenied,
240 Internal::Cancelled => Self::Cancelled,
241 Internal::Auth
242 | Internal::EgressBlocked
243 | Internal::NotFound
244 | Internal::CircuitOpen
245 | Internal::BudgetExceeded
246 | Internal::Generic => Self::HostBridgeError,
247 }
248 }
249}
250
251#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
262#[serde(tag = "kind", rename_all = "snake_case")]
263pub enum ToolExecutor {
264 HarnBuiltin,
267 HostBridge,
270 McpServer { server_name: String },
274 ProviderNative,
279}
280
281#[derive(Clone, Debug, Serialize, Deserialize)]
285#[serde(tag = "type", rename_all = "snake_case")]
286pub enum AgentEvent {
287 AgentMessageChunk {
288 session_id: String,
289 content: String,
290 },
291 AgentThoughtChunk {
292 session_id: String,
293 content: String,
294 },
295 ToolCall {
296 session_id: String,
297 tool_call_id: String,
298 tool_name: String,
299 kind: Option<ToolKind>,
300 status: ToolCallStatus,
301 raw_input: serde_json::Value,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
315 parsing: Option<bool>,
316 #[serde(default, skip_serializing_if = "Option::is_none")]
320 audit: Option<MutationSessionRecord>,
321 },
322 ToolCallUpdate {
323 session_id: String,
324 tool_call_id: String,
325 tool_name: String,
326 status: ToolCallStatus,
327 raw_output: Option<serde_json::Value>,
328 error: Option<String>,
329 #[serde(default, skip_serializing_if = "Option::is_none")]
337 duration_ms: Option<u64>,
338 #[serde(default, skip_serializing_if = "Option::is_none")]
342 execution_duration_ms: Option<u64>,
343 #[serde(default, skip_serializing_if = "Option::is_none")]
349 error_category: Option<ToolCallErrorCategory>,
350 #[serde(default, skip_serializing_if = "Option::is_none")]
355 executor: Option<ToolExecutor>,
356 #[serde(default, skip_serializing_if = "Option::is_none")]
365 parsing: Option<bool>,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
374 raw_input: Option<serde_json::Value>,
375 #[serde(default, skip_serializing_if = "Option::is_none")]
379 raw_input_partial: Option<String>,
380 #[serde(default, skip_serializing_if = "Option::is_none")]
385 audit: Option<MutationSessionRecord>,
386 },
387 Plan {
388 session_id: String,
389 plan: serde_json::Value,
390 },
391 ProgressReported {
392 session_id: String,
393 message: Option<String>,
394 entries: serde_json::Value,
395 replace: bool,
396 metadata: serde_json::Value,
397 },
398 TurnStart {
399 session_id: String,
400 iteration: usize,
401 },
402 TurnEnd {
403 session_id: String,
404 iteration: usize,
405 turn_info: serde_json::Value,
406 },
407 SessionClosed {
411 session_id: String,
412 reason: String,
413 status: String,
414 metadata: serde_json::Value,
415 },
416 JudgeDecision {
417 session_id: String,
418 iteration: usize,
419 verdict: String,
420 reasoning: String,
421 next_step: Option<String>,
422 judge_duration_ms: u64,
423 #[serde(default, skip_serializing_if = "Option::is_none")]
424 trigger: Option<String>,
425 },
426 TypedCheckpoint {
427 session_id: String,
428 checkpoint: serde_json::Value,
429 },
430 FeedbackInjected {
431 session_id: String,
432 kind: String,
433 content: String,
434 },
435 BudgetExhausted {
439 session_id: String,
440 max_iterations: usize,
441 },
442 LoopStuck {
446 session_id: String,
447 max_nudges: usize,
448 last_iteration: usize,
449 tail_excerpt: String,
450 },
451 DaemonWatchdogTripped {
456 session_id: String,
457 attempts: usize,
458 elapsed_ms: u64,
459 },
460 SkillActivated {
464 session_id: String,
465 skill_name: String,
466 iteration: usize,
467 reason: String,
468 },
469 SkillDeactivated {
472 session_id: String,
473 skill_name: String,
474 iteration: usize,
475 },
476 SkillScopeTools {
479 session_id: String,
480 skill_name: String,
481 allowed_tools: Vec<String>,
482 },
483 ToolSearchQuery {
491 session_id: String,
492 tool_use_id: String,
493 name: String,
494 query: serde_json::Value,
495 strategy: String,
496 mode: String,
497 },
498 ToolSearchResult {
502 session_id: String,
503 tool_use_id: String,
504 promoted: Vec<String>,
505 strategy: String,
506 mode: String,
507 },
508 TranscriptCompacted {
509 session_id: String,
510 mode: String,
511 strategy: String,
512 archived_messages: usize,
513 estimated_tokens_before: usize,
514 estimated_tokens_after: usize,
515 snapshot_asset_id: Option<String>,
516 },
517 Handoff {
518 session_id: String,
519 artifact_id: String,
520 handoff: Box<HandoffArtifact>,
521 },
522 FsWatch {
523 session_id: String,
524 subscription_id: String,
525 events: Vec<FsWatchEvent>,
526 },
527 WorkerUpdate {
543 session_id: String,
544 worker_id: String,
545 worker_name: String,
546 worker_task: String,
547 worker_mode: String,
548 event: WorkerEvent,
549 status: String,
550 metadata: serde_json::Value,
551 audit: Option<serde_json::Value>,
552 },
553 HitlRequested {
561 session_id: String,
562 request_id: String,
563 kind: String,
564 payload: serde_json::Value,
565 },
566 HitlResolved {
572 session_id: String,
573 request_id: String,
574 kind: String,
575 outcome: String,
576 },
577 LoopControlDecision {
584 session_id: String,
585 iteration: usize,
586 action: String,
587 old_limit: usize,
588 new_limit: usize,
589 reason: String,
590 status: String,
591 },
592 AgentLoopStallWarning {
597 session_id: String,
598 warning: serde_json::Value,
599 },
600 ToolCallAudit {
613 session_id: String,
614 tool_call_id: String,
615 tool_name: String,
616 audit: serde_json::Value,
617 },
618 CacheHit {
626 session_id: String,
627 key: String,
628 backend: String,
629 namespace: String,
630 payload: serde_json::Value,
631 },
632 CacheMiss {
638 session_id: String,
639 key: String,
640 backend: String,
641 namespace: String,
642 payload: serde_json::Value,
643 },
644 CompositionStart {
648 session_id: String,
649 run: CompositionRunEnvelope,
650 },
651 CompositionChildCall {
655 session_id: String,
656 call: CompositionChildCall,
657 },
658 CompositionChildResult {
660 session_id: String,
661 result: CompositionChildResult,
662 },
663 CompositionFinish {
666 session_id: String,
667 run: CompositionRunEnvelope,
668 },
669 CompositionError {
672 session_id: String,
673 run: CompositionRunEnvelope,
674 },
675}
676
677impl AgentEvent {
678 pub fn session_id(&self) -> &str {
679 match self {
680 Self::AgentMessageChunk { session_id, .. }
681 | Self::AgentThoughtChunk { session_id, .. }
682 | Self::ToolCall { session_id, .. }
683 | Self::ToolCallUpdate { session_id, .. }
684 | Self::Plan { session_id, .. }
685 | Self::ProgressReported { session_id, .. }
686 | Self::TurnStart { session_id, .. }
687 | Self::TurnEnd { session_id, .. }
688 | Self::SessionClosed { session_id, .. }
689 | Self::JudgeDecision { session_id, .. }
690 | Self::TypedCheckpoint { session_id, .. }
691 | Self::FeedbackInjected { session_id, .. }
692 | Self::BudgetExhausted { session_id, .. }
693 | Self::LoopStuck { session_id, .. }
694 | Self::DaemonWatchdogTripped { session_id, .. }
695 | Self::SkillActivated { session_id, .. }
696 | Self::SkillDeactivated { session_id, .. }
697 | Self::SkillScopeTools { session_id, .. }
698 | Self::ToolSearchQuery { session_id, .. }
699 | Self::ToolSearchResult { session_id, .. }
700 | Self::TranscriptCompacted { session_id, .. }
701 | Self::Handoff { session_id, .. }
702 | Self::FsWatch { session_id, .. }
703 | Self::WorkerUpdate { session_id, .. }
704 | Self::HitlRequested { session_id, .. }
705 | Self::HitlResolved { session_id, .. }
706 | Self::LoopControlDecision { session_id, .. }
707 | Self::AgentLoopStallWarning { session_id, .. }
708 | Self::ToolCallAudit { session_id, .. }
709 | Self::CacheHit { session_id, .. }
710 | Self::CacheMiss { session_id, .. }
711 | Self::CompositionStart { session_id, .. }
712 | Self::CompositionChildCall { session_id, .. }
713 | Self::CompositionChildResult { session_id, .. }
714 | Self::CompositionFinish { session_id, .. }
715 | Self::CompositionError { session_id, .. } => session_id,
716 }
717 }
718}
719
720pub trait AgentEventSink: Send + Sync {
723 fn handle_event(&self, event: &AgentEvent);
724}
725
726#[derive(Clone, Debug, Serialize, Deserialize)]
733pub struct PersistedAgentEvent {
734 pub index: u64,
738 pub emitted_at_ms: i64,
742 pub frame_depth: Option<u32>,
746 #[serde(flatten)]
748 pub event: AgentEvent,
749}
750
751pub struct JsonlEventSink {
756 state: Mutex<JsonlEventSinkState>,
757 base_path: std::path::PathBuf,
758}
759
760struct JsonlEventSinkState {
761 writer: std::io::BufWriter<std::fs::File>,
762 index: u64,
763 bytes_written: u64,
764 rotation: u32,
765}
766
767impl JsonlEventSink {
768 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
772
773 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
777 let base_path = base_path.into();
778 if let Some(parent) = base_path.parent() {
779 std::fs::create_dir_all(parent)?;
780 }
781 let file = std::fs::OpenOptions::new()
782 .create(true)
783 .truncate(true)
784 .write(true)
785 .open(&base_path)?;
786 Ok(Arc::new(Self {
787 state: Mutex::new(JsonlEventSinkState {
788 writer: std::io::BufWriter::new(file),
789 index: 0,
790 bytes_written: 0,
791 rotation: 0,
792 }),
793 base_path,
794 }))
795 }
796
797 pub fn flush(&self) -> std::io::Result<()> {
800 use std::io::Write as _;
801 self.state
802 .lock()
803 .expect("jsonl sink mutex poisoned")
804 .writer
805 .flush()
806 }
807
808 pub fn event_count(&self) -> u64 {
811 self.state.lock().expect("jsonl sink mutex poisoned").index
812 }
813
814 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
815 use std::io::Write as _;
816 if state.bytes_written < Self::ROTATE_BYTES {
817 return Ok(());
818 }
819 state.writer.flush()?;
820 state.rotation += 1;
821 let suffix = format!("-{:06}", state.rotation);
822 let rotated = self.base_path.with_file_name({
823 let stem = self
824 .base_path
825 .file_stem()
826 .and_then(|s| s.to_str())
827 .unwrap_or("event_log");
828 let ext = self
829 .base_path
830 .extension()
831 .and_then(|e| e.to_str())
832 .unwrap_or("jsonl");
833 format!("{stem}{suffix}.{ext}")
834 });
835 let file = std::fs::OpenOptions::new()
836 .create(true)
837 .truncate(true)
838 .write(true)
839 .open(&rotated)?;
840 state.writer = std::io::BufWriter::new(file);
841 state.bytes_written = 0;
842 Ok(())
843 }
844}
845
846pub struct EventLogSink {
851 log: Arc<AnyEventLog>,
852 topic: Topic,
853 session_id: String,
854}
855
856impl EventLogSink {
857 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
858 let session_id = session_id.into();
859 let topic = Topic::new(format!(
860 "observability.agent_events.{}",
861 crate::event_log::sanitize_topic_component(&session_id)
862 ))
863 .expect("session id should sanitize to a valid topic");
864 Arc::new(Self {
865 log,
866 topic,
867 session_id,
868 })
869 }
870}
871
872impl AgentEventSink for JsonlEventSink {
873 fn handle_event(&self, event: &AgentEvent) {
874 use std::io::Write as _;
875 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
876 let index = state.index;
877 state.index += 1;
878 let emitted_at_ms = std::time::SystemTime::now()
879 .duration_since(std::time::UNIX_EPOCH)
880 .map(|d| d.as_millis() as i64)
881 .unwrap_or(0);
882 let envelope = PersistedAgentEvent {
883 index,
884 emitted_at_ms,
885 frame_depth: None,
886 event: event.clone(),
887 };
888 if let Ok(line) = serde_json::to_string(&envelope) {
889 let _ = state.writer.write_all(line.as_bytes());
894 let _ = state.writer.write_all(b"\n");
895 state.bytes_written += line.len() as u64 + 1;
896 let _ = self.rotate_if_needed(&mut state);
897 }
898 }
899}
900
901impl AgentEventSink for EventLogSink {
902 fn handle_event(&self, event: &AgentEvent) {
903 let event_json = match serde_json::to_value(event) {
904 Ok(value) => value,
905 Err(_) => return,
906 };
907 let event_kind = event_json
908 .get("type")
909 .and_then(|value| value.as_str())
910 .unwrap_or("agent_event")
911 .to_string();
912 let payload = serde_json::json!({
913 "index_hint": now_ms(),
914 "session_id": self.session_id,
915 "event": event_json,
916 });
917 let mut headers = std::collections::BTreeMap::new();
918 headers.insert("session_id".to_string(), self.session_id.clone());
919 let log = self.log.clone();
920 let topic = self.topic.clone();
921 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
922 if let Ok(handle) = tokio::runtime::Handle::try_current() {
923 handle.spawn(async move {
924 let _ = log.append(&topic, record).await;
925 });
926 } else {
927 let _ = futures::executor::block_on(log.append(&topic, record));
928 }
929 }
930}
931
932impl Drop for JsonlEventSink {
933 fn drop(&mut self) {
934 if let Ok(mut state) = self.state.lock() {
935 use std::io::Write as _;
936 let _ = state.writer.flush();
937 }
938 }
939}
940
941pub struct MultiSink {
943 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
944}
945
946impl MultiSink {
947 pub fn new() -> Self {
948 Self {
949 sinks: Mutex::new(Vec::new()),
950 }
951 }
952 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
953 self.sinks.lock().expect("sink mutex poisoned").push(sink);
954 }
955 pub fn len(&self) -> usize {
956 self.sinks.lock().expect("sink mutex poisoned").len()
957 }
958 pub fn is_empty(&self) -> bool {
959 self.len() == 0
960 }
961}
962
963impl Default for MultiSink {
964 fn default() -> Self {
965 Self::new()
966 }
967}
968
969impl AgentEventSink for MultiSink {
970 fn handle_event(&self, event: &AgentEvent) {
971 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
977 for sink in sinks {
978 sink.handle_event(event);
979 }
980 }
981}
982
983#[cfg(test)]
984#[derive(Clone)]
985struct RegisteredSink {
986 owner: std::thread::ThreadId,
987 sink: Arc<dyn AgentEventSink>,
988}
989
990#[cfg(not(test))]
991type RegisteredSink = Arc<dyn AgentEventSink>;
992
993type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
994
995fn external_sinks() -> &'static ExternalSinkRegistry {
996 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
997 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
998}
999
1000pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
1001 let session_id = session_id.into();
1002 let mut reg = external_sinks().write().expect("sink registry poisoned");
1003 #[cfg(test)]
1004 let sink = RegisteredSink {
1005 owner: std::thread::current().id(),
1006 sink,
1007 };
1008 reg.entry(session_id).or_default().push(sink);
1009}
1010
1011pub fn clear_session_sinks(session_id: &str) {
1015 #[cfg(test)]
1016 {
1017 let owner = std::thread::current().id();
1018 let mut reg = external_sinks().write().expect("sink registry poisoned");
1019 if let Some(sinks) = reg.get_mut(session_id) {
1020 sinks.retain(|sink| sink.owner != owner);
1021 if sinks.is_empty() {
1022 reg.remove(session_id);
1023 }
1024 }
1025 }
1026 #[cfg(not(test))]
1027 {
1028 external_sinks()
1029 .write()
1030 .expect("sink registry poisoned")
1031 .remove(session_id);
1032 }
1033}
1034
1035pub fn reset_all_sinks() {
1036 #[cfg(test)]
1037 {
1038 let owner = std::thread::current().id();
1039 let mut reg = external_sinks().write().expect("sink registry poisoned");
1040 reg.retain(|_, sinks| {
1041 sinks.retain(|sink| sink.owner != owner);
1042 !sinks.is_empty()
1043 });
1044 crate::agent_sessions::reset_session_store();
1045 }
1046 #[cfg(not(test))]
1047 {
1048 external_sinks()
1049 .write()
1050 .expect("sink registry poisoned")
1051 .clear();
1052 crate::agent_sessions::reset_session_store();
1053 }
1054}
1055
1056pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
1063 if source_session_id.is_empty() || target_session_id.is_empty() {
1064 return;
1065 }
1066 if source_session_id == target_session_id {
1067 return;
1068 }
1069 let mut reg = external_sinks().write().expect("sink registry poisoned");
1070 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1071 return;
1072 };
1073 let target = reg.entry(target_session_id.to_string()).or_default();
1074 #[cfg(test)]
1075 {
1076 for source in source_sinks {
1077 let already_present = target
1078 .iter()
1079 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1080 if !already_present {
1081 target.push(source);
1082 }
1083 }
1084 }
1085 #[cfg(not(test))]
1086 {
1087 for source in source_sinks {
1088 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1089 if !already_present {
1090 target.push(source);
1091 }
1092 }
1093 }
1094}
1095
1096pub fn emit_event(event: &AgentEvent) {
1100 let sinks: Vec<Arc<dyn AgentEventSink>> = {
1101 let reg = external_sinks().read().expect("sink registry poisoned");
1102 #[cfg(test)]
1103 {
1104 let owner = std::thread::current().id();
1105 reg.get(event.session_id())
1106 .map(|sinks| {
1107 sinks
1108 .iter()
1109 .filter(|sink| sink.owner == owner)
1110 .map(|sink| sink.sink.clone())
1111 .collect()
1112 })
1113 .unwrap_or_default()
1114 }
1115 #[cfg(not(test))]
1116 {
1117 reg.get(event.session_id()).cloned().unwrap_or_default()
1118 }
1119 };
1120 for sink in sinks {
1121 sink.handle_event(event);
1122 }
1123}
1124
1125fn now_ms() -> i64 {
1126 std::time::SystemTime::now()
1127 .duration_since(std::time::UNIX_EPOCH)
1128 .map(|duration| duration.as_millis() as i64)
1129 .unwrap_or(0)
1130}
1131
1132pub fn session_external_sink_count(session_id: &str) -> usize {
1133 #[cfg(test)]
1134 {
1135 let owner = std::thread::current().id();
1136 return external_sinks()
1137 .read()
1138 .expect("sink registry poisoned")
1139 .get(session_id)
1140 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1141 .unwrap_or(0);
1142 }
1143 #[cfg(not(test))]
1144 {
1145 external_sinks()
1146 .read()
1147 .expect("sink registry poisoned")
1148 .get(session_id)
1149 .map(|v| v.len())
1150 .unwrap_or(0)
1151 }
1152}
1153
1154pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1155 crate::agent_sessions::subscriber_count(session_id)
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161 use std::sync::atomic::{AtomicUsize, Ordering};
1162
1163 struct CountingSink(Arc<AtomicUsize>);
1164 impl AgentEventSink for CountingSink {
1165 fn handle_event(&self, _event: &AgentEvent) {
1166 self.0.fetch_add(1, Ordering::SeqCst);
1167 }
1168 }
1169
1170 #[test]
1171 fn multi_sink_fans_out_in_order() {
1172 let multi = MultiSink::new();
1173 let a = Arc::new(AtomicUsize::new(0));
1174 let b = Arc::new(AtomicUsize::new(0));
1175 multi.push(Arc::new(CountingSink(a.clone())));
1176 multi.push(Arc::new(CountingSink(b.clone())));
1177 let event = AgentEvent::TurnStart {
1178 session_id: "s1".into(),
1179 iteration: 1,
1180 };
1181 multi.handle_event(&event);
1182 assert_eq!(a.load(Ordering::SeqCst), 1);
1183 assert_eq!(b.load(Ordering::SeqCst), 1);
1184 }
1185
1186 #[test]
1187 fn session_scoped_sink_routing() {
1188 reset_all_sinks();
1189 let a = Arc::new(AtomicUsize::new(0));
1190 let b = Arc::new(AtomicUsize::new(0));
1191 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1192 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1193 emit_event(&AgentEvent::TurnStart {
1194 session_id: "session-a".into(),
1195 iteration: 0,
1196 });
1197 assert_eq!(a.load(Ordering::SeqCst), 1);
1198 assert_eq!(b.load(Ordering::SeqCst), 0);
1199 emit_event(&AgentEvent::TurnEnd {
1200 session_id: "session-b".into(),
1201 iteration: 0,
1202 turn_info: serde_json::json!({}),
1203 });
1204 assert_eq!(a.load(Ordering::SeqCst), 1);
1205 assert_eq!(b.load(Ordering::SeqCst), 1);
1206 clear_session_sinks("session-a");
1207 assert_eq!(session_external_sink_count("session-a"), 0);
1208 assert_eq!(session_external_sink_count("session-b"), 1);
1209 reset_all_sinks();
1210 }
1211
1212 #[test]
1213 fn newly_opened_child_session_inherits_current_external_sinks() {
1214 reset_all_sinks();
1215 let delivered = Arc::new(AtomicUsize::new(0));
1216 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1217 {
1218 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1219 let inner = crate::agent_sessions::open_or_create(None);
1220 assert_ne!(inner, "outer-session");
1221 emit_event(&AgentEvent::TurnStart {
1222 session_id: inner,
1223 iteration: 0,
1224 });
1225 }
1226 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1227 reset_all_sinks();
1228 }
1229
1230 #[test]
1231 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1232 use std::io::{BufRead, BufReader};
1233 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1234 std::fs::create_dir_all(&dir).unwrap();
1235 let path = dir.join("event_log.jsonl");
1236 let sink = JsonlEventSink::open(&path).unwrap();
1237 for i in 0..5 {
1238 sink.handle_event(&AgentEvent::TurnStart {
1239 session_id: "s".into(),
1240 iteration: i,
1241 });
1242 }
1243 assert_eq!(sink.event_count(), 5);
1244 sink.flush().unwrap();
1245
1246 let file = std::fs::File::open(&path).unwrap();
1248 let mut last_idx: i64 = -1;
1249 let mut last_ts: i64 = 0;
1250 for line in BufReader::new(file).lines() {
1251 let line = line.unwrap();
1252 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1253 let idx = val["index"].as_i64().unwrap();
1254 let ts = val["emitted_at_ms"].as_i64().unwrap();
1255 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1256 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1257 last_idx = idx;
1258 last_ts = ts;
1259 assert_eq!(val["type"], "turn_start");
1261 }
1262 assert_eq!(last_idx, 4);
1263 let _ = std::fs::remove_file(&path);
1264 }
1265
1266 #[test]
1267 fn judge_decision_round_trips_through_jsonl_sink() {
1268 use std::io::{BufRead, BufReader};
1269 let dir =
1270 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1271 std::fs::create_dir_all(&dir).unwrap();
1272 let path = dir.join("event_log.jsonl");
1273 let sink = JsonlEventSink::open(&path).unwrap();
1274 sink.handle_event(&AgentEvent::JudgeDecision {
1275 session_id: "s".into(),
1276 iteration: 2,
1277 verdict: "continue".into(),
1278 reasoning: "needs a concrete next step".into(),
1279 next_step: Some("run the verifier".into()),
1280 judge_duration_ms: 17,
1281 trigger: Some("stalled".into()),
1282 });
1283 sink.flush().unwrap();
1284
1285 let file = std::fs::File::open(&path).unwrap();
1286 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1287 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1288 match recovered.event {
1289 AgentEvent::JudgeDecision {
1290 session_id,
1291 iteration,
1292 verdict,
1293 reasoning,
1294 next_step,
1295 judge_duration_ms,
1296 trigger,
1297 } => {
1298 assert_eq!(session_id, "s");
1299 assert_eq!(iteration, 2);
1300 assert_eq!(verdict, "continue");
1301 assert_eq!(reasoning, "needs a concrete next step");
1302 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1303 assert_eq!(judge_duration_ms, 17);
1304 assert_eq!(trigger.as_deref(), Some("stalled"));
1305 }
1306 other => panic!("expected JudgeDecision, got {other:?}"),
1307 }
1308 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1309 assert_eq!(value["type"], "judge_decision");
1310 let _ = std::fs::remove_file(&path);
1311 let _ = std::fs::remove_dir(&dir);
1312 }
1313
1314 #[test]
1315 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1316 let terminal = AgentEvent::ToolCallUpdate {
1321 session_id: "s".into(),
1322 tool_call_id: "tc-1".into(),
1323 tool_name: "read".into(),
1324 status: ToolCallStatus::Completed,
1325 raw_output: None,
1326 error: None,
1327 duration_ms: Some(42),
1328 execution_duration_ms: Some(7),
1329 error_category: None,
1330 executor: None,
1331 parsing: None,
1332
1333 raw_input: None,
1334 raw_input_partial: None,
1335 audit: None,
1336 };
1337 let value = serde_json::to_value(&terminal).unwrap();
1338 assert_eq!(value["duration_ms"], serde_json::json!(42));
1339 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1340
1341 let intermediate = AgentEvent::ToolCallUpdate {
1345 session_id: "s".into(),
1346 tool_call_id: "tc-1".into(),
1347 tool_name: "read".into(),
1348 status: ToolCallStatus::InProgress,
1349 raw_output: None,
1350 error: None,
1351 duration_ms: None,
1352 execution_duration_ms: None,
1353 error_category: None,
1354 executor: None,
1355 parsing: None,
1356
1357 raw_input: None,
1358 raw_input_partial: None,
1359 audit: None,
1360 };
1361 let value = serde_json::to_value(&intermediate).unwrap();
1362 let object = value.as_object().expect("update serializes as object");
1363 assert!(
1364 !object.contains_key("duration_ms"),
1365 "duration_ms must be omitted when None: {value}"
1366 );
1367 assert!(
1368 !object.contains_key("execution_duration_ms"),
1369 "execution_duration_ms must be omitted when None: {value}"
1370 );
1371 }
1372
1373 #[test]
1374 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1375 let raw = serde_json::json!({
1378 "type": "tool_call_update",
1379 "session_id": "s",
1380 "tool_call_id": "tc-1",
1381 "tool_name": "read",
1382 "status": "completed",
1383 "raw_output": null,
1384 "error": null,
1385 });
1386 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1387 match event {
1388 AgentEvent::ToolCallUpdate {
1389 duration_ms,
1390 execution_duration_ms,
1391 ..
1392 } => {
1393 assert!(duration_ms.is_none());
1394 assert!(execution_duration_ms.is_none());
1395 }
1396 other => panic!("expected ToolCallUpdate, got {other:?}"),
1397 }
1398 }
1399
1400 #[test]
1401 fn tool_call_status_serde() {
1402 assert_eq!(
1403 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1404 "\"pending\""
1405 );
1406 assert_eq!(
1407 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1408 "\"in_progress\""
1409 );
1410 assert_eq!(
1411 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1412 "\"completed\""
1413 );
1414 assert_eq!(
1415 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1416 "\"failed\""
1417 );
1418 }
1419
1420 #[test]
1421 fn tool_call_error_category_serializes_as_snake_case() {
1422 let pairs = [
1423 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1424 (ToolCallErrorCategory::ToolError, "tool_error"),
1425 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1426 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1427 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1428 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1429 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1430 (ToolCallErrorCategory::Timeout, "timeout"),
1431 (ToolCallErrorCategory::Network, "network"),
1432 (ToolCallErrorCategory::Cancelled, "cancelled"),
1433 (ToolCallErrorCategory::Unknown, "unknown"),
1434 ];
1435 for (variant, wire) in pairs {
1436 let encoded = serde_json::to_string(&variant).unwrap();
1437 assert_eq!(encoded, format!("\"{wire}\""));
1438 assert_eq!(variant.as_str(), wire);
1439 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1442 assert_eq!(decoded, variant);
1443 }
1444 }
1445
1446 #[test]
1447 fn tool_executor_round_trips_with_adjacent_tag() {
1448 for executor in [
1453 ToolExecutor::HarnBuiltin,
1454 ToolExecutor::HostBridge,
1455 ToolExecutor::McpServer {
1456 server_name: "linear".to_string(),
1457 },
1458 ToolExecutor::ProviderNative,
1459 ] {
1460 let json = serde_json::to_value(&executor).unwrap();
1461 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1462 match &executor {
1463 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1464 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1465 ToolExecutor::McpServer { server_name } => {
1466 assert_eq!(kind, "mcp_server");
1467 assert_eq!(json["server_name"], *server_name);
1468 }
1469 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1470 }
1471 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1472 assert_eq!(recovered, executor);
1473 }
1474 }
1475
1476 #[test]
1477 fn tool_call_error_category_from_internal_collapses_transient_family() {
1478 use crate::value::ErrorCategory as Internal;
1479 assert_eq!(
1480 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1481 ToolCallErrorCategory::Timeout
1482 );
1483 for net in [
1484 Internal::RateLimit,
1485 Internal::Overloaded,
1486 Internal::ServerError,
1487 Internal::TransientNetwork,
1488 ] {
1489 assert_eq!(
1490 ToolCallErrorCategory::from_internal(&net),
1491 ToolCallErrorCategory::Network,
1492 "{net:?} should map to Network",
1493 );
1494 }
1495 assert_eq!(
1496 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1497 ToolCallErrorCategory::SchemaValidation
1498 );
1499 assert_eq!(
1500 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1501 ToolCallErrorCategory::ToolError
1502 );
1503 assert_eq!(
1504 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1505 ToolCallErrorCategory::PermissionDenied
1506 );
1507 assert_eq!(
1508 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1509 ToolCallErrorCategory::Cancelled
1510 );
1511 for bridge in [
1512 Internal::Auth,
1513 Internal::EgressBlocked,
1514 Internal::NotFound,
1515 Internal::CircuitOpen,
1516 Internal::Generic,
1517 ] {
1518 assert_eq!(
1519 ToolCallErrorCategory::from_internal(&bridge),
1520 ToolCallErrorCategory::HostBridgeError,
1521 "{bridge:?} should map to HostBridgeError",
1522 );
1523 }
1524 }
1525
1526 #[test]
1527 fn tool_call_update_event_omits_error_category_when_none() {
1528 let event = AgentEvent::ToolCallUpdate {
1529 session_id: "s".into(),
1530 tool_call_id: "t".into(),
1531 tool_name: "read".into(),
1532 status: ToolCallStatus::Completed,
1533 raw_output: None,
1534 error: None,
1535 duration_ms: None,
1536 execution_duration_ms: None,
1537 error_category: None,
1538 executor: None,
1539 parsing: None,
1540
1541 raw_input: None,
1542 raw_input_partial: None,
1543 audit: None,
1544 };
1545 let v = serde_json::to_value(&event).unwrap();
1546 assert_eq!(v["type"], "tool_call_update");
1547 assert!(v.get("error_category").is_none());
1548 }
1549
1550 #[test]
1551 fn tool_call_update_event_serializes_error_category_when_set() {
1552 let event = AgentEvent::ToolCallUpdate {
1553 session_id: "s".into(),
1554 tool_call_id: "t".into(),
1555 tool_name: "read".into(),
1556 status: ToolCallStatus::Failed,
1557 raw_output: None,
1558 error: Some("missing required field".into()),
1559 duration_ms: None,
1560 execution_duration_ms: None,
1561 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1562 executor: None,
1563 parsing: None,
1564
1565 raw_input: None,
1566 raw_input_partial: None,
1567 audit: None,
1568 };
1569 let v = serde_json::to_value(&event).unwrap();
1570 assert_eq!(v["error_category"], "schema_validation");
1571 assert_eq!(v["error"], "missing required field");
1572 }
1573
1574 #[test]
1575 fn tool_call_update_omits_executor_when_absent() {
1576 let event = AgentEvent::ToolCallUpdate {
1580 session_id: "s".into(),
1581 tool_call_id: "tc-1".into(),
1582 tool_name: "read".into(),
1583 status: ToolCallStatus::Completed,
1584 raw_output: None,
1585 error: None,
1586 duration_ms: None,
1587 execution_duration_ms: None,
1588 error_category: None,
1589 executor: None,
1590 parsing: None,
1591
1592 raw_input: None,
1593 raw_input_partial: None,
1594 audit: None,
1595 };
1596 let json = serde_json::to_value(&event).unwrap();
1597 assert!(json.get("executor").is_none(), "got: {json}");
1598 }
1599
1600 #[test]
1601 fn worker_event_status_strings_cover_all_variants() {
1602 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1607 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1608 assert_eq!(
1609 WorkerEvent::WorkerWaitingForInput.as_status(),
1610 "awaiting_input"
1611 );
1612 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1613 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1614 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1615
1616 for terminal in [
1617 WorkerEvent::WorkerCompleted,
1618 WorkerEvent::WorkerFailed,
1619 WorkerEvent::WorkerCancelled,
1620 ] {
1621 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1622 }
1623 for non_terminal in [
1624 WorkerEvent::WorkerSpawned,
1625 WorkerEvent::WorkerProgressed,
1626 WorkerEvent::WorkerWaitingForInput,
1627 ] {
1628 assert!(
1629 !non_terminal.is_terminal(),
1630 "{non_terminal:?} should not be terminal"
1631 );
1632 }
1633
1634 let collected: Vec<&'static str> = WorkerEvent::ALL
1638 .iter()
1639 .map(|event| event.as_status())
1640 .collect();
1641 assert_eq!(
1642 collected,
1643 vec![
1644 "running",
1645 "progressed",
1646 "awaiting_input",
1647 "completed",
1648 "failed",
1649 "cancelled",
1650 ]
1651 );
1652 }
1653
1654 #[test]
1655 fn worker_update_event_routes_through_session_keyed_sink() {
1656 reset_all_sinks();
1661 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1662 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1663 impl AgentEventSink for CapturingSink {
1664 fn handle_event(&self, event: &AgentEvent) {
1665 self.0
1666 .lock()
1667 .expect("captured sink mutex poisoned")
1668 .push(event.clone());
1669 }
1670 }
1671 register_sink(
1672 "worker-session-1",
1673 Arc::new(CapturingSink(captured.clone())),
1674 );
1675 emit_event(&AgentEvent::WorkerUpdate {
1676 session_id: "worker-session-1".into(),
1677 worker_id: "worker_42".into(),
1678 worker_name: "review_captain".into(),
1679 worker_task: "review pr".into(),
1680 worker_mode: "delegated_stage".into(),
1681 event: WorkerEvent::WorkerWaitingForInput,
1682 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1683 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1684 audit: None,
1685 });
1686 emit_event(&AgentEvent::WorkerUpdate {
1688 session_id: "other-session".into(),
1689 worker_id: "w2".into(),
1690 worker_name: "n2".into(),
1691 worker_task: "t2".into(),
1692 worker_mode: "delegated_stage".into(),
1693 event: WorkerEvent::WorkerCompleted,
1694 status: "completed".into(),
1695 metadata: serde_json::json!({}),
1696 audit: None,
1697 });
1698 let received = captured.lock().unwrap().clone();
1699 assert_eq!(received.len(), 1, "got: {received:?}");
1700 match &received[0] {
1701 AgentEvent::WorkerUpdate {
1702 session_id,
1703 worker_id,
1704 event,
1705 status,
1706 ..
1707 } => {
1708 assert_eq!(session_id, "worker-session-1");
1709 assert_eq!(worker_id, "worker_42");
1710 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1711 assert_eq!(status, "awaiting_input");
1712 }
1713 other => panic!("expected WorkerUpdate, got {other:?}"),
1714 }
1715 reset_all_sinks();
1716 }
1717
1718 #[test]
1719 fn worker_update_event_serializes_to_canonical_shape() {
1720 let event = AgentEvent::WorkerUpdate {
1726 session_id: "s".into(),
1727 worker_id: "w".into(),
1728 worker_name: "n".into(),
1729 worker_task: "t".into(),
1730 worker_mode: "delegated_stage".into(),
1731 event: WorkerEvent::WorkerProgressed,
1732 status: "progressed".into(),
1733 metadata: serde_json::json!({"started_at": "0193..."}),
1734 audit: Some(serde_json::json!({"run_id": "run_x"})),
1735 };
1736 let value = serde_json::to_value(&event).unwrap();
1737 assert_eq!(value["type"], "worker_update");
1738 assert_eq!(value["session_id"], "s");
1739 assert_eq!(value["worker_id"], "w");
1740 assert_eq!(value["status"], "progressed");
1741 assert_eq!(value["audit"]["run_id"], "run_x");
1742
1743 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1747 match recovered {
1748 AgentEvent::WorkerUpdate {
1749 event: recovered_event,
1750 ..
1751 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1752 other => panic!("expected WorkerUpdate, got {other:?}"),
1753 }
1754 }
1755
1756 #[test]
1757 fn tool_call_update_includes_executor_when_present() {
1758 let event = AgentEvent::ToolCallUpdate {
1759 session_id: "s".into(),
1760 tool_call_id: "tc-1".into(),
1761 tool_name: "read".into(),
1762 status: ToolCallStatus::Completed,
1763 raw_output: None,
1764 error: None,
1765 duration_ms: None,
1766 execution_duration_ms: None,
1767 error_category: None,
1768 executor: Some(ToolExecutor::McpServer {
1769 server_name: "github".into(),
1770 }),
1771 parsing: None,
1772
1773 raw_input: None,
1774 raw_input_partial: None,
1775 audit: None,
1776 };
1777 let json = serde_json::to_value(&event).unwrap();
1778 assert_eq!(json["executor"]["kind"], "mcp_server");
1779 assert_eq!(json["executor"]["server_name"], "github");
1780 }
1781
1782 #[test]
1783 fn tool_call_update_omits_audit_when_absent() {
1784 let event = AgentEvent::ToolCallUpdate {
1785 session_id: "s".into(),
1786 tool_call_id: "tc-1".into(),
1787 tool_name: "read".into(),
1788 status: ToolCallStatus::Completed,
1789 raw_output: None,
1790 error: None,
1791 duration_ms: None,
1792 execution_duration_ms: None,
1793 error_category: None,
1794 executor: None,
1795 parsing: None,
1796 raw_input: None,
1797 raw_input_partial: None,
1798 audit: None,
1799 };
1800 let json = serde_json::to_value(&event).unwrap();
1801 assert!(json.get("audit").is_none(), "got: {json}");
1802 }
1803
1804 #[test]
1805 fn tool_call_update_includes_audit_when_present() {
1806 let audit = MutationSessionRecord {
1807 session_id: "session_42".into(),
1808 run_id: Some("run_42".into()),
1809 mutation_scope: "apply_workspace".into(),
1810 execution_kind: Some("worker".into()),
1811 ..Default::default()
1812 };
1813 let event = AgentEvent::ToolCallUpdate {
1814 session_id: "s".into(),
1815 tool_call_id: "tc-1".into(),
1816 tool_name: "edit_file".into(),
1817 status: ToolCallStatus::Completed,
1818 raw_output: None,
1819 error: None,
1820 duration_ms: None,
1821 execution_duration_ms: None,
1822 error_category: None,
1823 executor: Some(ToolExecutor::HostBridge),
1824 parsing: None,
1825 raw_input: None,
1826 raw_input_partial: None,
1827 audit: Some(audit),
1828 };
1829 let json = serde_json::to_value(&event).unwrap();
1830 assert_eq!(json["audit"]["session_id"], "session_42");
1831 assert_eq!(json["audit"]["run_id"], "run_42");
1832 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1833 assert_eq!(json["audit"]["execution_kind"], "worker");
1834 }
1835
1836 #[test]
1837 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1838 let raw = serde_json::json!({
1839 "type": "tool_call_update",
1840 "session_id": "s",
1841 "tool_call_id": "tc-1",
1842 "tool_name": "read",
1843 "status": "completed",
1844 "raw_output": null,
1845 "error": null,
1846 });
1847 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1848 match event {
1849 AgentEvent::ToolCallUpdate { audit, .. } => {
1850 assert!(audit.is_none());
1851 }
1852 other => panic!("expected ToolCallUpdate, got {other:?}"),
1853 }
1854 }
1855
1856 #[test]
1857 fn tool_call_audit_serializes_with_free_form_audit_payload() {
1858 let audit = serde_json::json!({
1863 "summary": "Searched codebase",
1864 "kind": "search",
1865 "consent": {"decision": "approved", "decided_by": "auto"},
1866 "layers": [{"name": "with_required_reason", "status": "ok"}],
1867 });
1868 let event = AgentEvent::ToolCallAudit {
1869 session_id: "s".into(),
1870 tool_call_id: "tc-1".into(),
1871 tool_name: "search_files".into(),
1872 audit: audit.clone(),
1873 };
1874 let json = serde_json::to_value(&event).unwrap();
1875 assert_eq!(json["type"], "tool_call_audit");
1876 assert_eq!(json["session_id"], "s");
1877 assert_eq!(json["tool_call_id"], "tc-1");
1878 assert_eq!(json["tool_name"], "search_files");
1879 assert_eq!(json["audit"], audit);
1880 }
1881
1882 #[test]
1883 fn tool_call_audit_session_id_routes_correctly() {
1884 let event = AgentEvent::ToolCallAudit {
1885 session_id: "abc".into(),
1886 tool_call_id: "tc".into(),
1887 tool_name: "read".into(),
1888 audit: serde_json::Value::Null,
1889 };
1890 assert_eq!(event.session_id(), "abc");
1891 }
1892}