1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126 SchemaValidation,
129 ToolError,
132 McpServerError,
134 HostBridgeError,
136 PermissionDenied,
139 RejectedLoop,
142 ParseAborted,
150 Timeout,
152 Network,
154 Cancelled,
156 Unknown,
158}
159
160impl ToolCallErrorCategory {
161 pub fn as_str(self) -> &'static str {
162 match self {
163 Self::SchemaValidation => "schema_validation",
164 Self::ToolError => "tool_error",
165 Self::McpServerError => "mcp_server_error",
166 Self::HostBridgeError => "host_bridge_error",
167 Self::PermissionDenied => "permission_denied",
168 Self::RejectedLoop => "rejected_loop",
169 Self::ParseAborted => "parse_aborted",
170 Self::Timeout => "timeout",
171 Self::Network => "network",
172 Self::Cancelled => "cancelled",
173 Self::Unknown => "unknown",
174 }
175 }
176
177 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184 use crate::value::ErrorCategory as Internal;
185 match category {
186 Internal::Timeout => Self::Timeout,
187 Internal::RateLimit
188 | Internal::Overloaded
189 | Internal::ServerError
190 | Internal::TransientNetwork => Self::Network,
191 Internal::SchemaValidation => Self::SchemaValidation,
192 Internal::ToolError => Self::ToolError,
193 Internal::ToolRejected => Self::PermissionDenied,
194 Internal::Cancelled => Self::Cancelled,
195 Internal::Auth
196 | Internal::EgressBlocked
197 | Internal::NotFound
198 | Internal::CircuitOpen
199 | Internal::BudgetExceeded
200 | Internal::Generic => Self::HostBridgeError,
201 }
202 }
203}
204
205#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218 HarnBuiltin,
221 HostBridge,
224 McpServer { server_name: String },
228 ProviderNative,
233}
234
235#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240 AgentMessageChunk {
241 session_id: String,
242 content: String,
243 },
244 AgentThoughtChunk {
245 session_id: String,
246 content: String,
247 },
248 ToolCall {
249 session_id: String,
250 tool_call_id: String,
251 tool_name: String,
252 kind: Option<ToolKind>,
253 status: ToolCallStatus,
254 raw_input: serde_json::Value,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
268 parsing: Option<bool>,
269 #[serde(default, skip_serializing_if = "Option::is_none")]
273 audit: Option<MutationSessionRecord>,
274 },
275 ToolCallUpdate {
276 session_id: String,
277 tool_call_id: String,
278 tool_name: String,
279 status: ToolCallStatus,
280 raw_output: Option<serde_json::Value>,
281 error: Option<String>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
290 duration_ms: Option<u64>,
291 #[serde(default, skip_serializing_if = "Option::is_none")]
295 execution_duration_ms: Option<u64>,
296 #[serde(default, skip_serializing_if = "Option::is_none")]
302 error_category: Option<ToolCallErrorCategory>,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
308 executor: Option<ToolExecutor>,
309 #[serde(default, skip_serializing_if = "Option::is_none")]
318 parsing: Option<bool>,
319 #[serde(default, skip_serializing_if = "Option::is_none")]
327 raw_input: Option<serde_json::Value>,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
332 raw_input_partial: Option<String>,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
338 audit: Option<MutationSessionRecord>,
339 },
340 Plan {
341 session_id: String,
342 plan: serde_json::Value,
343 },
344 TurnStart {
345 session_id: String,
346 iteration: usize,
347 },
348 TurnEnd {
349 session_id: String,
350 iteration: usize,
351 turn_info: serde_json::Value,
352 },
353 JudgeDecision {
354 session_id: String,
355 iteration: usize,
356 verdict: String,
357 reasoning: String,
358 next_step: Option<String>,
359 judge_duration_ms: u64,
360 },
361 TypedCheckpoint {
362 session_id: String,
363 checkpoint: serde_json::Value,
364 },
365 FeedbackInjected {
366 session_id: String,
367 kind: String,
368 content: String,
369 },
370 BudgetExhausted {
374 session_id: String,
375 max_iterations: usize,
376 },
377 LoopStuck {
381 session_id: String,
382 max_nudges: usize,
383 last_iteration: usize,
384 tail_excerpt: String,
385 },
386 DaemonWatchdogTripped {
391 session_id: String,
392 attempts: usize,
393 elapsed_ms: u64,
394 },
395 SkillActivated {
399 session_id: String,
400 skill_name: String,
401 iteration: usize,
402 reason: String,
403 },
404 SkillDeactivated {
407 session_id: String,
408 skill_name: String,
409 iteration: usize,
410 },
411 SkillScopeTools {
414 session_id: String,
415 skill_name: String,
416 allowed_tools: Vec<String>,
417 },
418 ToolSearchQuery {
426 session_id: String,
427 tool_use_id: String,
428 name: String,
429 query: serde_json::Value,
430 strategy: String,
431 mode: String,
432 },
433 ToolSearchResult {
437 session_id: String,
438 tool_use_id: String,
439 promoted: Vec<String>,
440 strategy: String,
441 mode: String,
442 },
443 TranscriptCompacted {
444 session_id: String,
445 mode: String,
446 strategy: String,
447 archived_messages: usize,
448 estimated_tokens_before: usize,
449 estimated_tokens_after: usize,
450 snapshot_asset_id: Option<String>,
451 },
452 Handoff {
453 session_id: String,
454 artifact_id: String,
455 handoff: Box<HandoffArtifact>,
456 },
457 FsWatch {
458 session_id: String,
459 subscription_id: String,
460 events: Vec<FsWatchEvent>,
461 },
462 WorkerUpdate {
478 session_id: String,
479 worker_id: String,
480 worker_name: String,
481 worker_task: String,
482 worker_mode: String,
483 event: WorkerEvent,
484 status: String,
485 metadata: serde_json::Value,
486 audit: Option<serde_json::Value>,
487 },
488 HitlRequested {
496 session_id: String,
497 request_id: String,
498 kind: String,
499 payload: serde_json::Value,
500 },
501 HitlResolved {
507 session_id: String,
508 request_id: String,
509 kind: String,
510 outcome: String,
511 },
512 LoopControlDecision {
519 session_id: String,
520 iteration: usize,
521 action: String,
522 old_limit: usize,
523 new_limit: usize,
524 reason: String,
525 status: String,
526 },
527 ToolCallAudit {
540 session_id: String,
541 tool_call_id: String,
542 tool_name: String,
543 audit: serde_json::Value,
544 },
545}
546
547impl AgentEvent {
548 pub fn session_id(&self) -> &str {
549 match self {
550 Self::AgentMessageChunk { session_id, .. }
551 | Self::AgentThoughtChunk { session_id, .. }
552 | Self::ToolCall { session_id, .. }
553 | Self::ToolCallUpdate { session_id, .. }
554 | Self::Plan { session_id, .. }
555 | Self::TurnStart { session_id, .. }
556 | Self::TurnEnd { session_id, .. }
557 | Self::JudgeDecision { session_id, .. }
558 | Self::TypedCheckpoint { session_id, .. }
559 | Self::FeedbackInjected { session_id, .. }
560 | Self::BudgetExhausted { session_id, .. }
561 | Self::LoopStuck { session_id, .. }
562 | Self::DaemonWatchdogTripped { session_id, .. }
563 | Self::SkillActivated { session_id, .. }
564 | Self::SkillDeactivated { session_id, .. }
565 | Self::SkillScopeTools { session_id, .. }
566 | Self::ToolSearchQuery { session_id, .. }
567 | Self::ToolSearchResult { session_id, .. }
568 | Self::TranscriptCompacted { session_id, .. }
569 | Self::Handoff { session_id, .. }
570 | Self::FsWatch { session_id, .. }
571 | Self::WorkerUpdate { session_id, .. }
572 | Self::HitlRequested { session_id, .. }
573 | Self::HitlResolved { session_id, .. }
574 | Self::LoopControlDecision { session_id, .. }
575 | Self::ToolCallAudit { session_id, .. } => session_id,
576 }
577 }
578}
579
580pub trait AgentEventSink: Send + Sync {
583 fn handle_event(&self, event: &AgentEvent);
584}
585
586#[derive(Clone, Debug, Serialize, Deserialize)]
593pub struct PersistedAgentEvent {
594 pub index: u64,
598 pub emitted_at_ms: i64,
602 pub frame_depth: Option<u32>,
606 #[serde(flatten)]
608 pub event: AgentEvent,
609}
610
611pub struct JsonlEventSink {
616 state: Mutex<JsonlEventSinkState>,
617 base_path: std::path::PathBuf,
618}
619
620struct JsonlEventSinkState {
621 writer: std::io::BufWriter<std::fs::File>,
622 index: u64,
623 bytes_written: u64,
624 rotation: u32,
625}
626
627impl JsonlEventSink {
628 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
632
633 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
637 let base_path = base_path.into();
638 if let Some(parent) = base_path.parent() {
639 std::fs::create_dir_all(parent)?;
640 }
641 let file = std::fs::OpenOptions::new()
642 .create(true)
643 .truncate(true)
644 .write(true)
645 .open(&base_path)?;
646 Ok(Arc::new(Self {
647 state: Mutex::new(JsonlEventSinkState {
648 writer: std::io::BufWriter::new(file),
649 index: 0,
650 bytes_written: 0,
651 rotation: 0,
652 }),
653 base_path,
654 }))
655 }
656
657 pub fn flush(&self) -> std::io::Result<()> {
660 use std::io::Write as _;
661 self.state
662 .lock()
663 .expect("jsonl sink mutex poisoned")
664 .writer
665 .flush()
666 }
667
668 pub fn event_count(&self) -> u64 {
671 self.state.lock().expect("jsonl sink mutex poisoned").index
672 }
673
674 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
675 use std::io::Write as _;
676 if state.bytes_written < Self::ROTATE_BYTES {
677 return Ok(());
678 }
679 state.writer.flush()?;
680 state.rotation += 1;
681 let suffix = format!("-{:06}", state.rotation);
682 let rotated = self.base_path.with_file_name({
683 let stem = self
684 .base_path
685 .file_stem()
686 .and_then(|s| s.to_str())
687 .unwrap_or("event_log");
688 let ext = self
689 .base_path
690 .extension()
691 .and_then(|e| e.to_str())
692 .unwrap_or("jsonl");
693 format!("{stem}{suffix}.{ext}")
694 });
695 let file = std::fs::OpenOptions::new()
696 .create(true)
697 .truncate(true)
698 .write(true)
699 .open(&rotated)?;
700 state.writer = std::io::BufWriter::new(file);
701 state.bytes_written = 0;
702 Ok(())
703 }
704}
705
706pub struct EventLogSink {
711 log: Arc<AnyEventLog>,
712 topic: Topic,
713 session_id: String,
714}
715
716impl EventLogSink {
717 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
718 let session_id = session_id.into();
719 let topic = Topic::new(format!(
720 "observability.agent_events.{}",
721 crate::event_log::sanitize_topic_component(&session_id)
722 ))
723 .expect("session id should sanitize to a valid topic");
724 Arc::new(Self {
725 log,
726 topic,
727 session_id,
728 })
729 }
730}
731
732impl AgentEventSink for JsonlEventSink {
733 fn handle_event(&self, event: &AgentEvent) {
734 use std::io::Write as _;
735 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
736 let index = state.index;
737 state.index += 1;
738 let emitted_at_ms = std::time::SystemTime::now()
739 .duration_since(std::time::UNIX_EPOCH)
740 .map(|d| d.as_millis() as i64)
741 .unwrap_or(0);
742 let envelope = PersistedAgentEvent {
743 index,
744 emitted_at_ms,
745 frame_depth: None,
746 event: event.clone(),
747 };
748 if let Ok(line) = serde_json::to_string(&envelope) {
749 let _ = state.writer.write_all(line.as_bytes());
754 let _ = state.writer.write_all(b"\n");
755 state.bytes_written += line.len() as u64 + 1;
756 let _ = self.rotate_if_needed(&mut state);
757 }
758 }
759}
760
761impl AgentEventSink for EventLogSink {
762 fn handle_event(&self, event: &AgentEvent) {
763 let event_json = match serde_json::to_value(event) {
764 Ok(value) => value,
765 Err(_) => return,
766 };
767 let event_kind = event_json
768 .get("type")
769 .and_then(|value| value.as_str())
770 .unwrap_or("agent_event")
771 .to_string();
772 let payload = serde_json::json!({
773 "index_hint": now_ms(),
774 "session_id": self.session_id,
775 "event": event_json,
776 });
777 let mut headers = std::collections::BTreeMap::new();
778 headers.insert("session_id".to_string(), self.session_id.clone());
779 let log = self.log.clone();
780 let topic = self.topic.clone();
781 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
782 if let Ok(handle) = tokio::runtime::Handle::try_current() {
783 handle.spawn(async move {
784 let _ = log.append(&topic, record).await;
785 });
786 } else {
787 let _ = futures::executor::block_on(log.append(&topic, record));
788 }
789 }
790}
791
792impl Drop for JsonlEventSink {
793 fn drop(&mut self) {
794 if let Ok(mut state) = self.state.lock() {
795 use std::io::Write as _;
796 let _ = state.writer.flush();
797 }
798 }
799}
800
801pub struct MultiSink {
803 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
804}
805
806impl MultiSink {
807 pub fn new() -> Self {
808 Self {
809 sinks: Mutex::new(Vec::new()),
810 }
811 }
812 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
813 self.sinks.lock().expect("sink mutex poisoned").push(sink);
814 }
815 pub fn len(&self) -> usize {
816 self.sinks.lock().expect("sink mutex poisoned").len()
817 }
818 pub fn is_empty(&self) -> bool {
819 self.len() == 0
820 }
821}
822
823impl Default for MultiSink {
824 fn default() -> Self {
825 Self::new()
826 }
827}
828
829impl AgentEventSink for MultiSink {
830 fn handle_event(&self, event: &AgentEvent) {
831 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
837 for sink in sinks {
838 sink.handle_event(event);
839 }
840 }
841}
842
843#[cfg(test)]
844#[derive(Clone)]
845struct RegisteredSink {
846 owner: std::thread::ThreadId,
847 sink: Arc<dyn AgentEventSink>,
848}
849
850#[cfg(not(test))]
851type RegisteredSink = Arc<dyn AgentEventSink>;
852
853type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
854
855fn external_sinks() -> &'static ExternalSinkRegistry {
856 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
857 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
858}
859
860pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
861 let session_id = session_id.into();
862 let mut reg = external_sinks().write().expect("sink registry poisoned");
863 #[cfg(test)]
864 let sink = RegisteredSink {
865 owner: std::thread::current().id(),
866 sink,
867 };
868 reg.entry(session_id).or_default().push(sink);
869}
870
871pub fn clear_session_sinks(session_id: &str) {
875 #[cfg(test)]
876 {
877 let owner = std::thread::current().id();
878 let mut reg = external_sinks().write().expect("sink registry poisoned");
879 if let Some(sinks) = reg.get_mut(session_id) {
880 sinks.retain(|sink| sink.owner != owner);
881 if sinks.is_empty() {
882 reg.remove(session_id);
883 }
884 }
885 }
886 #[cfg(not(test))]
887 {
888 external_sinks()
889 .write()
890 .expect("sink registry poisoned")
891 .remove(session_id);
892 }
893}
894
895pub fn reset_all_sinks() {
896 #[cfg(test)]
897 {
898 let owner = std::thread::current().id();
899 let mut reg = external_sinks().write().expect("sink registry poisoned");
900 reg.retain(|_, sinks| {
901 sinks.retain(|sink| sink.owner != owner);
902 !sinks.is_empty()
903 });
904 crate::agent_sessions::reset_session_store();
905 }
906 #[cfg(not(test))]
907 {
908 external_sinks()
909 .write()
910 .expect("sink registry poisoned")
911 .clear();
912 crate::agent_sessions::reset_session_store();
913 }
914}
915
916pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
923 if source_session_id.is_empty() || target_session_id.is_empty() {
924 return;
925 }
926 if source_session_id == target_session_id {
927 return;
928 }
929 let mut reg = external_sinks().write().expect("sink registry poisoned");
930 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
931 return;
932 };
933 let target = reg.entry(target_session_id.to_string()).or_default();
934 #[cfg(test)]
935 {
936 for source in source_sinks {
937 let already_present = target
938 .iter()
939 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
940 if !already_present {
941 target.push(source);
942 }
943 }
944 }
945 #[cfg(not(test))]
946 {
947 for source in source_sinks {
948 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
949 if !already_present {
950 target.push(source);
951 }
952 }
953 }
954}
955
956pub fn emit_event(event: &AgentEvent) {
960 let sinks: Vec<Arc<dyn AgentEventSink>> = {
961 let reg = external_sinks().read().expect("sink registry poisoned");
962 #[cfg(test)]
963 {
964 let owner = std::thread::current().id();
965 reg.get(event.session_id())
966 .map(|sinks| {
967 sinks
968 .iter()
969 .filter(|sink| sink.owner == owner)
970 .map(|sink| sink.sink.clone())
971 .collect()
972 })
973 .unwrap_or_default()
974 }
975 #[cfg(not(test))]
976 {
977 reg.get(event.session_id()).cloned().unwrap_or_default()
978 }
979 };
980 for sink in sinks {
981 sink.handle_event(event);
982 }
983}
984
985fn now_ms() -> i64 {
986 std::time::SystemTime::now()
987 .duration_since(std::time::UNIX_EPOCH)
988 .map(|duration| duration.as_millis() as i64)
989 .unwrap_or(0)
990}
991
992pub fn session_external_sink_count(session_id: &str) -> usize {
993 #[cfg(test)]
994 {
995 let owner = std::thread::current().id();
996 return external_sinks()
997 .read()
998 .expect("sink registry poisoned")
999 .get(session_id)
1000 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1001 .unwrap_or(0);
1002 }
1003 #[cfg(not(test))]
1004 {
1005 external_sinks()
1006 .read()
1007 .expect("sink registry poisoned")
1008 .get(session_id)
1009 .map(|v| v.len())
1010 .unwrap_or(0)
1011 }
1012}
1013
1014pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1015 crate::agent_sessions::subscriber_count(session_id)
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020 use super::*;
1021 use std::sync::atomic::{AtomicUsize, Ordering};
1022
1023 struct CountingSink(Arc<AtomicUsize>);
1024 impl AgentEventSink for CountingSink {
1025 fn handle_event(&self, _event: &AgentEvent) {
1026 self.0.fetch_add(1, Ordering::SeqCst);
1027 }
1028 }
1029
1030 #[test]
1031 fn multi_sink_fans_out_in_order() {
1032 let multi = MultiSink::new();
1033 let a = Arc::new(AtomicUsize::new(0));
1034 let b = Arc::new(AtomicUsize::new(0));
1035 multi.push(Arc::new(CountingSink(a.clone())));
1036 multi.push(Arc::new(CountingSink(b.clone())));
1037 let event = AgentEvent::TurnStart {
1038 session_id: "s1".into(),
1039 iteration: 1,
1040 };
1041 multi.handle_event(&event);
1042 assert_eq!(a.load(Ordering::SeqCst), 1);
1043 assert_eq!(b.load(Ordering::SeqCst), 1);
1044 }
1045
1046 #[test]
1047 fn session_scoped_sink_routing() {
1048 reset_all_sinks();
1049 let a = Arc::new(AtomicUsize::new(0));
1050 let b = Arc::new(AtomicUsize::new(0));
1051 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1052 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1053 emit_event(&AgentEvent::TurnStart {
1054 session_id: "session-a".into(),
1055 iteration: 0,
1056 });
1057 assert_eq!(a.load(Ordering::SeqCst), 1);
1058 assert_eq!(b.load(Ordering::SeqCst), 0);
1059 emit_event(&AgentEvent::TurnEnd {
1060 session_id: "session-b".into(),
1061 iteration: 0,
1062 turn_info: serde_json::json!({}),
1063 });
1064 assert_eq!(a.load(Ordering::SeqCst), 1);
1065 assert_eq!(b.load(Ordering::SeqCst), 1);
1066 clear_session_sinks("session-a");
1067 assert_eq!(session_external_sink_count("session-a"), 0);
1068 assert_eq!(session_external_sink_count("session-b"), 1);
1069 reset_all_sinks();
1070 }
1071
1072 #[test]
1073 fn newly_opened_child_session_inherits_current_external_sinks() {
1074 reset_all_sinks();
1075 let delivered = Arc::new(AtomicUsize::new(0));
1076 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1077 {
1078 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1079 let inner = crate::agent_sessions::open_or_create(None);
1080 assert_ne!(inner, "outer-session");
1081 emit_event(&AgentEvent::TurnStart {
1082 session_id: inner,
1083 iteration: 0,
1084 });
1085 }
1086 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1087 reset_all_sinks();
1088 }
1089
1090 #[test]
1091 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1092 use std::io::{BufRead, BufReader};
1093 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1094 std::fs::create_dir_all(&dir).unwrap();
1095 let path = dir.join("event_log.jsonl");
1096 let sink = JsonlEventSink::open(&path).unwrap();
1097 for i in 0..5 {
1098 sink.handle_event(&AgentEvent::TurnStart {
1099 session_id: "s".into(),
1100 iteration: i,
1101 });
1102 }
1103 assert_eq!(sink.event_count(), 5);
1104 sink.flush().unwrap();
1105
1106 let file = std::fs::File::open(&path).unwrap();
1108 let mut last_idx: i64 = -1;
1109 let mut last_ts: i64 = 0;
1110 for line in BufReader::new(file).lines() {
1111 let line = line.unwrap();
1112 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1113 let idx = val["index"].as_i64().unwrap();
1114 let ts = val["emitted_at_ms"].as_i64().unwrap();
1115 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1116 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1117 last_idx = idx;
1118 last_ts = ts;
1119 assert_eq!(val["type"], "turn_start");
1121 }
1122 assert_eq!(last_idx, 4);
1123 let _ = std::fs::remove_file(&path);
1124 }
1125
1126 #[test]
1127 fn judge_decision_round_trips_through_jsonl_sink() {
1128 use std::io::{BufRead, BufReader};
1129 let dir =
1130 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1131 std::fs::create_dir_all(&dir).unwrap();
1132 let path = dir.join("event_log.jsonl");
1133 let sink = JsonlEventSink::open(&path).unwrap();
1134 sink.handle_event(&AgentEvent::JudgeDecision {
1135 session_id: "s".into(),
1136 iteration: 2,
1137 verdict: "continue".into(),
1138 reasoning: "needs a concrete next step".into(),
1139 next_step: Some("run the verifier".into()),
1140 judge_duration_ms: 17,
1141 });
1142 sink.flush().unwrap();
1143
1144 let file = std::fs::File::open(&path).unwrap();
1145 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1146 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1147 match recovered.event {
1148 AgentEvent::JudgeDecision {
1149 session_id,
1150 iteration,
1151 verdict,
1152 reasoning,
1153 next_step,
1154 judge_duration_ms,
1155 } => {
1156 assert_eq!(session_id, "s");
1157 assert_eq!(iteration, 2);
1158 assert_eq!(verdict, "continue");
1159 assert_eq!(reasoning, "needs a concrete next step");
1160 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1161 assert_eq!(judge_duration_ms, 17);
1162 }
1163 other => panic!("expected JudgeDecision, got {other:?}"),
1164 }
1165 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1166 assert_eq!(value["type"], "judge_decision");
1167 let _ = std::fs::remove_file(&path);
1168 let _ = std::fs::remove_dir(&dir);
1169 }
1170
1171 #[test]
1172 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1173 let terminal = AgentEvent::ToolCallUpdate {
1178 session_id: "s".into(),
1179 tool_call_id: "tc-1".into(),
1180 tool_name: "read".into(),
1181 status: ToolCallStatus::Completed,
1182 raw_output: None,
1183 error: None,
1184 duration_ms: Some(42),
1185 execution_duration_ms: Some(7),
1186 error_category: None,
1187 executor: None,
1188 parsing: None,
1189
1190 raw_input: None,
1191 raw_input_partial: None,
1192 audit: None,
1193 };
1194 let value = serde_json::to_value(&terminal).unwrap();
1195 assert_eq!(value["duration_ms"], serde_json::json!(42));
1196 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1197
1198 let intermediate = AgentEvent::ToolCallUpdate {
1202 session_id: "s".into(),
1203 tool_call_id: "tc-1".into(),
1204 tool_name: "read".into(),
1205 status: ToolCallStatus::InProgress,
1206 raw_output: None,
1207 error: None,
1208 duration_ms: None,
1209 execution_duration_ms: None,
1210 error_category: None,
1211 executor: None,
1212 parsing: None,
1213
1214 raw_input: None,
1215 raw_input_partial: None,
1216 audit: None,
1217 };
1218 let value = serde_json::to_value(&intermediate).unwrap();
1219 let object = value.as_object().expect("update serializes as object");
1220 assert!(
1221 !object.contains_key("duration_ms"),
1222 "duration_ms must be omitted when None: {value}"
1223 );
1224 assert!(
1225 !object.contains_key("execution_duration_ms"),
1226 "execution_duration_ms must be omitted when None: {value}"
1227 );
1228 }
1229
1230 #[test]
1231 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1232 let raw = serde_json::json!({
1235 "type": "tool_call_update",
1236 "session_id": "s",
1237 "tool_call_id": "tc-1",
1238 "tool_name": "read",
1239 "status": "completed",
1240 "raw_output": null,
1241 "error": null,
1242 });
1243 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1244 match event {
1245 AgentEvent::ToolCallUpdate {
1246 duration_ms,
1247 execution_duration_ms,
1248 ..
1249 } => {
1250 assert!(duration_ms.is_none());
1251 assert!(execution_duration_ms.is_none());
1252 }
1253 other => panic!("expected ToolCallUpdate, got {other:?}"),
1254 }
1255 }
1256
1257 #[test]
1258 fn tool_call_status_serde() {
1259 assert_eq!(
1260 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1261 "\"pending\""
1262 );
1263 assert_eq!(
1264 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1265 "\"in_progress\""
1266 );
1267 assert_eq!(
1268 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1269 "\"completed\""
1270 );
1271 assert_eq!(
1272 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1273 "\"failed\""
1274 );
1275 }
1276
1277 #[test]
1278 fn tool_call_error_category_serializes_as_snake_case() {
1279 let pairs = [
1280 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1281 (ToolCallErrorCategory::ToolError, "tool_error"),
1282 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1283 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1284 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1285 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1286 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1287 (ToolCallErrorCategory::Timeout, "timeout"),
1288 (ToolCallErrorCategory::Network, "network"),
1289 (ToolCallErrorCategory::Cancelled, "cancelled"),
1290 (ToolCallErrorCategory::Unknown, "unknown"),
1291 ];
1292 for (variant, wire) in pairs {
1293 let encoded = serde_json::to_string(&variant).unwrap();
1294 assert_eq!(encoded, format!("\"{wire}\""));
1295 assert_eq!(variant.as_str(), wire);
1296 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1299 assert_eq!(decoded, variant);
1300 }
1301 }
1302
1303 #[test]
1304 fn tool_executor_round_trips_with_adjacent_tag() {
1305 for executor in [
1310 ToolExecutor::HarnBuiltin,
1311 ToolExecutor::HostBridge,
1312 ToolExecutor::McpServer {
1313 server_name: "linear".to_string(),
1314 },
1315 ToolExecutor::ProviderNative,
1316 ] {
1317 let json = serde_json::to_value(&executor).unwrap();
1318 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1319 match &executor {
1320 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1321 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1322 ToolExecutor::McpServer { server_name } => {
1323 assert_eq!(kind, "mcp_server");
1324 assert_eq!(json["server_name"], *server_name);
1325 }
1326 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1327 }
1328 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1329 assert_eq!(recovered, executor);
1330 }
1331 }
1332
1333 #[test]
1334 fn tool_call_error_category_from_internal_collapses_transient_family() {
1335 use crate::value::ErrorCategory as Internal;
1336 assert_eq!(
1337 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1338 ToolCallErrorCategory::Timeout
1339 );
1340 for net in [
1341 Internal::RateLimit,
1342 Internal::Overloaded,
1343 Internal::ServerError,
1344 Internal::TransientNetwork,
1345 ] {
1346 assert_eq!(
1347 ToolCallErrorCategory::from_internal(&net),
1348 ToolCallErrorCategory::Network,
1349 "{net:?} should map to Network",
1350 );
1351 }
1352 assert_eq!(
1353 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1354 ToolCallErrorCategory::SchemaValidation
1355 );
1356 assert_eq!(
1357 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1358 ToolCallErrorCategory::ToolError
1359 );
1360 assert_eq!(
1361 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1362 ToolCallErrorCategory::PermissionDenied
1363 );
1364 assert_eq!(
1365 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1366 ToolCallErrorCategory::Cancelled
1367 );
1368 for bridge in [
1369 Internal::Auth,
1370 Internal::EgressBlocked,
1371 Internal::NotFound,
1372 Internal::CircuitOpen,
1373 Internal::Generic,
1374 ] {
1375 assert_eq!(
1376 ToolCallErrorCategory::from_internal(&bridge),
1377 ToolCallErrorCategory::HostBridgeError,
1378 "{bridge:?} should map to HostBridgeError",
1379 );
1380 }
1381 }
1382
1383 #[test]
1384 fn tool_call_update_event_omits_error_category_when_none() {
1385 let event = AgentEvent::ToolCallUpdate {
1386 session_id: "s".into(),
1387 tool_call_id: "t".into(),
1388 tool_name: "read".into(),
1389 status: ToolCallStatus::Completed,
1390 raw_output: None,
1391 error: None,
1392 duration_ms: None,
1393 execution_duration_ms: None,
1394 error_category: None,
1395 executor: None,
1396 parsing: None,
1397
1398 raw_input: None,
1399 raw_input_partial: None,
1400 audit: None,
1401 };
1402 let v = serde_json::to_value(&event).unwrap();
1403 assert_eq!(v["type"], "tool_call_update");
1404 assert!(v.get("error_category").is_none());
1405 }
1406
1407 #[test]
1408 fn tool_call_update_event_serializes_error_category_when_set() {
1409 let event = AgentEvent::ToolCallUpdate {
1410 session_id: "s".into(),
1411 tool_call_id: "t".into(),
1412 tool_name: "read".into(),
1413 status: ToolCallStatus::Failed,
1414 raw_output: None,
1415 error: Some("missing required field".into()),
1416 duration_ms: None,
1417 execution_duration_ms: None,
1418 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1419 executor: None,
1420 parsing: None,
1421
1422 raw_input: None,
1423 raw_input_partial: None,
1424 audit: None,
1425 };
1426 let v = serde_json::to_value(&event).unwrap();
1427 assert_eq!(v["error_category"], "schema_validation");
1428 assert_eq!(v["error"], "missing required field");
1429 }
1430
1431 #[test]
1432 fn tool_call_update_omits_executor_when_absent() {
1433 let event = AgentEvent::ToolCallUpdate {
1437 session_id: "s".into(),
1438 tool_call_id: "tc-1".into(),
1439 tool_name: "read".into(),
1440 status: ToolCallStatus::Completed,
1441 raw_output: None,
1442 error: None,
1443 duration_ms: None,
1444 execution_duration_ms: None,
1445 error_category: None,
1446 executor: None,
1447 parsing: None,
1448
1449 raw_input: None,
1450 raw_input_partial: None,
1451 audit: None,
1452 };
1453 let json = serde_json::to_value(&event).unwrap();
1454 assert!(json.get("executor").is_none(), "got: {json}");
1455 }
1456
1457 #[test]
1458 fn worker_event_status_strings_cover_all_variants() {
1459 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1464 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1465 assert_eq!(
1466 WorkerEvent::WorkerWaitingForInput.as_status(),
1467 "awaiting_input"
1468 );
1469 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1470 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1471 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1472
1473 for terminal in [
1474 WorkerEvent::WorkerCompleted,
1475 WorkerEvent::WorkerFailed,
1476 WorkerEvent::WorkerCancelled,
1477 ] {
1478 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1479 }
1480 for non_terminal in [
1481 WorkerEvent::WorkerSpawned,
1482 WorkerEvent::WorkerProgressed,
1483 WorkerEvent::WorkerWaitingForInput,
1484 ] {
1485 assert!(
1486 !non_terminal.is_terminal(),
1487 "{non_terminal:?} should not be terminal"
1488 );
1489 }
1490 }
1491
1492 #[test]
1493 fn worker_update_event_routes_through_session_keyed_sink() {
1494 reset_all_sinks();
1499 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1500 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1501 impl AgentEventSink for CapturingSink {
1502 fn handle_event(&self, event: &AgentEvent) {
1503 self.0
1504 .lock()
1505 .expect("captured sink mutex poisoned")
1506 .push(event.clone());
1507 }
1508 }
1509 register_sink(
1510 "worker-session-1",
1511 Arc::new(CapturingSink(captured.clone())),
1512 );
1513 emit_event(&AgentEvent::WorkerUpdate {
1514 session_id: "worker-session-1".into(),
1515 worker_id: "worker_42".into(),
1516 worker_name: "review_captain".into(),
1517 worker_task: "review pr".into(),
1518 worker_mode: "delegated_stage".into(),
1519 event: WorkerEvent::WorkerWaitingForInput,
1520 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1521 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1522 audit: None,
1523 });
1524 emit_event(&AgentEvent::WorkerUpdate {
1526 session_id: "other-session".into(),
1527 worker_id: "w2".into(),
1528 worker_name: "n2".into(),
1529 worker_task: "t2".into(),
1530 worker_mode: "delegated_stage".into(),
1531 event: WorkerEvent::WorkerCompleted,
1532 status: "completed".into(),
1533 metadata: serde_json::json!({}),
1534 audit: None,
1535 });
1536 let received = captured.lock().unwrap().clone();
1537 assert_eq!(received.len(), 1, "got: {received:?}");
1538 match &received[0] {
1539 AgentEvent::WorkerUpdate {
1540 session_id,
1541 worker_id,
1542 event,
1543 status,
1544 ..
1545 } => {
1546 assert_eq!(session_id, "worker-session-1");
1547 assert_eq!(worker_id, "worker_42");
1548 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1549 assert_eq!(status, "awaiting_input");
1550 }
1551 other => panic!("expected WorkerUpdate, got {other:?}"),
1552 }
1553 reset_all_sinks();
1554 }
1555
1556 #[test]
1557 fn worker_update_event_serializes_to_canonical_shape() {
1558 let event = AgentEvent::WorkerUpdate {
1564 session_id: "s".into(),
1565 worker_id: "w".into(),
1566 worker_name: "n".into(),
1567 worker_task: "t".into(),
1568 worker_mode: "delegated_stage".into(),
1569 event: WorkerEvent::WorkerProgressed,
1570 status: "progressed".into(),
1571 metadata: serde_json::json!({"started_at": "0193..."}),
1572 audit: Some(serde_json::json!({"run_id": "run_x"})),
1573 };
1574 let value = serde_json::to_value(&event).unwrap();
1575 assert_eq!(value["type"], "worker_update");
1576 assert_eq!(value["session_id"], "s");
1577 assert_eq!(value["worker_id"], "w");
1578 assert_eq!(value["status"], "progressed");
1579 assert_eq!(value["audit"]["run_id"], "run_x");
1580
1581 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1585 match recovered {
1586 AgentEvent::WorkerUpdate {
1587 event: recovered_event,
1588 ..
1589 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1590 other => panic!("expected WorkerUpdate, got {other:?}"),
1591 }
1592 }
1593
1594 #[test]
1595 fn tool_call_update_includes_executor_when_present() {
1596 let event = AgentEvent::ToolCallUpdate {
1597 session_id: "s".into(),
1598 tool_call_id: "tc-1".into(),
1599 tool_name: "read".into(),
1600 status: ToolCallStatus::Completed,
1601 raw_output: None,
1602 error: None,
1603 duration_ms: None,
1604 execution_duration_ms: None,
1605 error_category: None,
1606 executor: Some(ToolExecutor::McpServer {
1607 server_name: "github".into(),
1608 }),
1609 parsing: None,
1610
1611 raw_input: None,
1612 raw_input_partial: None,
1613 audit: None,
1614 };
1615 let json = serde_json::to_value(&event).unwrap();
1616 assert_eq!(json["executor"]["kind"], "mcp_server");
1617 assert_eq!(json["executor"]["server_name"], "github");
1618 }
1619
1620 #[test]
1621 fn tool_call_update_omits_audit_when_absent() {
1622 let event = AgentEvent::ToolCallUpdate {
1623 session_id: "s".into(),
1624 tool_call_id: "tc-1".into(),
1625 tool_name: "read".into(),
1626 status: ToolCallStatus::Completed,
1627 raw_output: None,
1628 error: None,
1629 duration_ms: None,
1630 execution_duration_ms: None,
1631 error_category: None,
1632 executor: None,
1633 parsing: None,
1634 raw_input: None,
1635 raw_input_partial: None,
1636 audit: None,
1637 };
1638 let json = serde_json::to_value(&event).unwrap();
1639 assert!(json.get("audit").is_none(), "got: {json}");
1640 }
1641
1642 #[test]
1643 fn tool_call_update_includes_audit_when_present() {
1644 let audit = MutationSessionRecord {
1645 session_id: "session_42".into(),
1646 run_id: Some("run_42".into()),
1647 mutation_scope: "apply_workspace".into(),
1648 execution_kind: Some("worker".into()),
1649 ..Default::default()
1650 };
1651 let event = AgentEvent::ToolCallUpdate {
1652 session_id: "s".into(),
1653 tool_call_id: "tc-1".into(),
1654 tool_name: "edit_file".into(),
1655 status: ToolCallStatus::Completed,
1656 raw_output: None,
1657 error: None,
1658 duration_ms: None,
1659 execution_duration_ms: None,
1660 error_category: None,
1661 executor: Some(ToolExecutor::HostBridge),
1662 parsing: None,
1663 raw_input: None,
1664 raw_input_partial: None,
1665 audit: Some(audit),
1666 };
1667 let json = serde_json::to_value(&event).unwrap();
1668 assert_eq!(json["audit"]["session_id"], "session_42");
1669 assert_eq!(json["audit"]["run_id"], "run_42");
1670 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1671 assert_eq!(json["audit"]["execution_kind"], "worker");
1672 }
1673
1674 #[test]
1675 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1676 let raw = serde_json::json!({
1677 "type": "tool_call_update",
1678 "session_id": "s",
1679 "tool_call_id": "tc-1",
1680 "tool_name": "read",
1681 "status": "completed",
1682 "raw_output": null,
1683 "error": null,
1684 });
1685 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1686 match event {
1687 AgentEvent::ToolCallUpdate { audit, .. } => {
1688 assert!(audit.is_none());
1689 }
1690 other => panic!("expected ToolCallUpdate, got {other:?}"),
1691 }
1692 }
1693
1694 #[test]
1695 fn tool_call_audit_serializes_with_free_form_audit_payload() {
1696 let audit = serde_json::json!({
1701 "summary": "Searched codebase",
1702 "kind": "search",
1703 "consent": {"decision": "approved", "decided_by": "auto"},
1704 "layers": [{"name": "with_required_reason", "status": "ok"}],
1705 });
1706 let event = AgentEvent::ToolCallAudit {
1707 session_id: "s".into(),
1708 tool_call_id: "tc-1".into(),
1709 tool_name: "search_files".into(),
1710 audit: audit.clone(),
1711 };
1712 let json = serde_json::to_value(&event).unwrap();
1713 assert_eq!(json["type"], "tool_call_audit");
1714 assert_eq!(json["session_id"], "s");
1715 assert_eq!(json["tool_call_id"], "tc-1");
1716 assert_eq!(json["tool_name"], "search_files");
1717 assert_eq!(json["audit"], audit);
1718 }
1719
1720 #[test]
1721 fn tool_call_audit_session_id_routes_correctly() {
1722 let event = AgentEvent::ToolCallAudit {
1723 session_id: "abc".into(),
1724 tool_call_id: "tc".into(),
1725 tool_name: "read".into(),
1726 audit: serde_json::Value::Null,
1727 };
1728 assert_eq!(event.session_id(), "abc");
1729 }
1730}