1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126 SchemaValidation,
129 ToolError,
132 McpServerError,
134 HostBridgeError,
136 PermissionDenied,
139 RejectedLoop,
142 ParseAborted,
150 Timeout,
152 Network,
154 Cancelled,
156 Unknown,
158}
159
160impl ToolCallErrorCategory {
161 pub fn as_str(self) -> &'static str {
162 match self {
163 Self::SchemaValidation => "schema_validation",
164 Self::ToolError => "tool_error",
165 Self::McpServerError => "mcp_server_error",
166 Self::HostBridgeError => "host_bridge_error",
167 Self::PermissionDenied => "permission_denied",
168 Self::RejectedLoop => "rejected_loop",
169 Self::ParseAborted => "parse_aborted",
170 Self::Timeout => "timeout",
171 Self::Network => "network",
172 Self::Cancelled => "cancelled",
173 Self::Unknown => "unknown",
174 }
175 }
176
177 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184 use crate::value::ErrorCategory as Internal;
185 match category {
186 Internal::Timeout => Self::Timeout,
187 Internal::RateLimit
188 | Internal::Overloaded
189 | Internal::ServerError
190 | Internal::TransientNetwork => Self::Network,
191 Internal::SchemaValidation => Self::SchemaValidation,
192 Internal::ToolError => Self::ToolError,
193 Internal::ToolRejected => Self::PermissionDenied,
194 Internal::Cancelled => Self::Cancelled,
195 Internal::Auth
196 | Internal::EgressBlocked
197 | Internal::NotFound
198 | Internal::CircuitOpen
199 | Internal::Generic => Self::HostBridgeError,
200 }
201 }
202}
203
204#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
215#[serde(tag = "kind", rename_all = "snake_case")]
216pub enum ToolExecutor {
217 HarnBuiltin,
220 HostBridge,
223 McpServer { server_name: String },
227 ProviderNative,
232}
233
234#[derive(Clone, Debug, Serialize, Deserialize)]
237#[serde(tag = "type", rename_all = "snake_case")]
238pub enum AgentEvent {
239 AgentMessageChunk {
240 session_id: String,
241 content: String,
242 },
243 AgentThoughtChunk {
244 session_id: String,
245 content: String,
246 },
247 ToolCall {
248 session_id: String,
249 tool_call_id: String,
250 tool_name: String,
251 kind: Option<ToolKind>,
252 status: ToolCallStatus,
253 raw_input: serde_json::Value,
254 #[serde(default, skip_serializing_if = "Option::is_none")]
267 parsing: Option<bool>,
268 #[serde(default, skip_serializing_if = "Option::is_none")]
272 audit: Option<MutationSessionRecord>,
273 },
274 ToolCallUpdate {
275 session_id: String,
276 tool_call_id: String,
277 tool_name: String,
278 status: ToolCallStatus,
279 raw_output: Option<serde_json::Value>,
280 error: Option<String>,
281 #[serde(default, skip_serializing_if = "Option::is_none")]
289 duration_ms: Option<u64>,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
294 execution_duration_ms: Option<u64>,
295 #[serde(default, skip_serializing_if = "Option::is_none")]
301 error_category: Option<ToolCallErrorCategory>,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
307 executor: Option<ToolExecutor>,
308 #[serde(default, skip_serializing_if = "Option::is_none")]
317 parsing: Option<bool>,
318 #[serde(default, skip_serializing_if = "Option::is_none")]
326 raw_input: Option<serde_json::Value>,
327 #[serde(default, skip_serializing_if = "Option::is_none")]
331 raw_input_partial: Option<String>,
332 #[serde(default, skip_serializing_if = "Option::is_none")]
337 audit: Option<MutationSessionRecord>,
338 },
339 Plan {
340 session_id: String,
341 plan: serde_json::Value,
342 },
343 TurnStart {
344 session_id: String,
345 iteration: usize,
346 },
347 TurnEnd {
348 session_id: String,
349 iteration: usize,
350 turn_info: serde_json::Value,
351 },
352 FeedbackInjected {
353 session_id: String,
354 kind: String,
355 content: String,
356 },
357 BudgetExhausted {
361 session_id: String,
362 max_iterations: usize,
363 },
364 LoopStuck {
368 session_id: String,
369 max_nudges: usize,
370 last_iteration: usize,
371 tail_excerpt: String,
372 },
373 DaemonWatchdogTripped {
378 session_id: String,
379 attempts: usize,
380 elapsed_ms: u64,
381 },
382 SkillActivated {
386 session_id: String,
387 skill_name: String,
388 iteration: usize,
389 reason: String,
390 },
391 SkillDeactivated {
394 session_id: String,
395 skill_name: String,
396 iteration: usize,
397 },
398 SkillScopeTools {
401 session_id: String,
402 skill_name: String,
403 allowed_tools: Vec<String>,
404 },
405 ToolSearchQuery {
413 session_id: String,
414 tool_use_id: String,
415 name: String,
416 query: serde_json::Value,
417 strategy: String,
418 mode: String,
419 },
420 ToolSearchResult {
424 session_id: String,
425 tool_use_id: String,
426 promoted: Vec<String>,
427 strategy: String,
428 mode: String,
429 },
430 TranscriptCompacted {
431 session_id: String,
432 mode: String,
433 strategy: String,
434 archived_messages: usize,
435 estimated_tokens_before: usize,
436 estimated_tokens_after: usize,
437 snapshot_asset_id: Option<String>,
438 },
439 Handoff {
440 session_id: String,
441 artifact_id: String,
442 handoff: Box<HandoffArtifact>,
443 },
444 FsWatch {
445 session_id: String,
446 subscription_id: String,
447 events: Vec<FsWatchEvent>,
448 },
449 WorkerUpdate {
465 session_id: String,
466 worker_id: String,
467 worker_name: String,
468 worker_task: String,
469 worker_mode: String,
470 event: WorkerEvent,
471 status: String,
472 metadata: serde_json::Value,
473 audit: Option<serde_json::Value>,
474 },
475}
476
477impl AgentEvent {
478 pub fn session_id(&self) -> &str {
479 match self {
480 Self::AgentMessageChunk { session_id, .. }
481 | Self::AgentThoughtChunk { session_id, .. }
482 | Self::ToolCall { session_id, .. }
483 | Self::ToolCallUpdate { session_id, .. }
484 | Self::Plan { session_id, .. }
485 | Self::TurnStart { session_id, .. }
486 | Self::TurnEnd { session_id, .. }
487 | Self::FeedbackInjected { session_id, .. }
488 | Self::BudgetExhausted { session_id, .. }
489 | Self::LoopStuck { session_id, .. }
490 | Self::DaemonWatchdogTripped { session_id, .. }
491 | Self::SkillActivated { session_id, .. }
492 | Self::SkillDeactivated { session_id, .. }
493 | Self::SkillScopeTools { session_id, .. }
494 | Self::ToolSearchQuery { session_id, .. }
495 | Self::ToolSearchResult { session_id, .. }
496 | Self::TranscriptCompacted { session_id, .. }
497 | Self::Handoff { session_id, .. }
498 | Self::FsWatch { session_id, .. }
499 | Self::WorkerUpdate { session_id, .. } => session_id,
500 }
501 }
502}
503
504pub trait AgentEventSink: Send + Sync {
507 fn handle_event(&self, event: &AgentEvent);
508}
509
510#[derive(Clone, Debug, Serialize, Deserialize)]
517pub struct PersistedAgentEvent {
518 pub index: u64,
522 pub emitted_at_ms: i64,
526 pub frame_depth: Option<u32>,
530 #[serde(flatten)]
532 pub event: AgentEvent,
533}
534
535pub struct JsonlEventSink {
540 state: Mutex<JsonlEventSinkState>,
541 base_path: std::path::PathBuf,
542}
543
544struct JsonlEventSinkState {
545 writer: std::io::BufWriter<std::fs::File>,
546 index: u64,
547 bytes_written: u64,
548 rotation: u32,
549}
550
551impl JsonlEventSink {
552 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
556
557 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
561 let base_path = base_path.into();
562 if let Some(parent) = base_path.parent() {
563 std::fs::create_dir_all(parent)?;
564 }
565 let file = std::fs::OpenOptions::new()
566 .create(true)
567 .truncate(true)
568 .write(true)
569 .open(&base_path)?;
570 Ok(Arc::new(Self {
571 state: Mutex::new(JsonlEventSinkState {
572 writer: std::io::BufWriter::new(file),
573 index: 0,
574 bytes_written: 0,
575 rotation: 0,
576 }),
577 base_path,
578 }))
579 }
580
581 pub fn flush(&self) -> std::io::Result<()> {
584 use std::io::Write as _;
585 self.state
586 .lock()
587 .expect("jsonl sink mutex poisoned")
588 .writer
589 .flush()
590 }
591
592 pub fn event_count(&self) -> u64 {
595 self.state.lock().expect("jsonl sink mutex poisoned").index
596 }
597
598 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
599 use std::io::Write as _;
600 if state.bytes_written < Self::ROTATE_BYTES {
601 return Ok(());
602 }
603 state.writer.flush()?;
604 state.rotation += 1;
605 let suffix = format!("-{:06}", state.rotation);
606 let rotated = self.base_path.with_file_name({
607 let stem = self
608 .base_path
609 .file_stem()
610 .and_then(|s| s.to_str())
611 .unwrap_or("event_log");
612 let ext = self
613 .base_path
614 .extension()
615 .and_then(|e| e.to_str())
616 .unwrap_or("jsonl");
617 format!("{stem}{suffix}.{ext}")
618 });
619 let file = std::fs::OpenOptions::new()
620 .create(true)
621 .truncate(true)
622 .write(true)
623 .open(&rotated)?;
624 state.writer = std::io::BufWriter::new(file);
625 state.bytes_written = 0;
626 Ok(())
627 }
628}
629
630pub struct EventLogSink {
635 log: Arc<AnyEventLog>,
636 topic: Topic,
637 session_id: String,
638}
639
640impl EventLogSink {
641 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
642 let session_id = session_id.into();
643 let topic = Topic::new(format!(
644 "observability.agent_events.{}",
645 crate::event_log::sanitize_topic_component(&session_id)
646 ))
647 .expect("session id should sanitize to a valid topic");
648 Arc::new(Self {
649 log,
650 topic,
651 session_id,
652 })
653 }
654}
655
656impl AgentEventSink for JsonlEventSink {
657 fn handle_event(&self, event: &AgentEvent) {
658 use std::io::Write as _;
659 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
660 let index = state.index;
661 state.index += 1;
662 let emitted_at_ms = std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .map(|d| d.as_millis() as i64)
665 .unwrap_or(0);
666 let envelope = PersistedAgentEvent {
667 index,
668 emitted_at_ms,
669 frame_depth: None,
670 event: event.clone(),
671 };
672 if let Ok(line) = serde_json::to_string(&envelope) {
673 let _ = state.writer.write_all(line.as_bytes());
678 let _ = state.writer.write_all(b"\n");
679 state.bytes_written += line.len() as u64 + 1;
680 let _ = self.rotate_if_needed(&mut state);
681 }
682 }
683}
684
685impl AgentEventSink for EventLogSink {
686 fn handle_event(&self, event: &AgentEvent) {
687 let event_json = match serde_json::to_value(event) {
688 Ok(value) => value,
689 Err(_) => return,
690 };
691 let event_kind = event_json
692 .get("type")
693 .and_then(|value| value.as_str())
694 .unwrap_or("agent_event")
695 .to_string();
696 let payload = serde_json::json!({
697 "index_hint": now_ms(),
698 "session_id": self.session_id,
699 "event": event_json,
700 });
701 let mut headers = std::collections::BTreeMap::new();
702 headers.insert("session_id".to_string(), self.session_id.clone());
703 let log = self.log.clone();
704 let topic = self.topic.clone();
705 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
706 if let Ok(handle) = tokio::runtime::Handle::try_current() {
707 handle.spawn(async move {
708 let _ = log.append(&topic, record).await;
709 });
710 } else {
711 let _ = futures::executor::block_on(log.append(&topic, record));
712 }
713 }
714}
715
716impl Drop for JsonlEventSink {
717 fn drop(&mut self) {
718 if let Ok(mut state) = self.state.lock() {
719 use std::io::Write as _;
720 let _ = state.writer.flush();
721 }
722 }
723}
724
725pub struct MultiSink {
727 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
728}
729
730impl MultiSink {
731 pub fn new() -> Self {
732 Self {
733 sinks: Mutex::new(Vec::new()),
734 }
735 }
736 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
737 self.sinks.lock().expect("sink mutex poisoned").push(sink);
738 }
739 pub fn len(&self) -> usize {
740 self.sinks.lock().expect("sink mutex poisoned").len()
741 }
742 pub fn is_empty(&self) -> bool {
743 self.len() == 0
744 }
745}
746
747impl Default for MultiSink {
748 fn default() -> Self {
749 Self::new()
750 }
751}
752
753impl AgentEventSink for MultiSink {
754 fn handle_event(&self, event: &AgentEvent) {
755 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
761 for sink in sinks {
762 sink.handle_event(event);
763 }
764 }
765}
766
767type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
768
769fn external_sinks() -> &'static ExternalSinkRegistry {
770 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
771 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
772}
773
774pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
775 let session_id = session_id.into();
776 let mut reg = external_sinks().write().expect("sink registry poisoned");
777 reg.entry(session_id).or_default().push(sink);
778}
779
780pub fn clear_session_sinks(session_id: &str) {
784 external_sinks()
785 .write()
786 .expect("sink registry poisoned")
787 .remove(session_id);
788}
789
790pub fn reset_all_sinks() {
791 external_sinks()
792 .write()
793 .expect("sink registry poisoned")
794 .clear();
795 crate::agent_sessions::reset_session_store();
796}
797
798pub fn emit_event(event: &AgentEvent) {
802 let sinks: Vec<Arc<dyn AgentEventSink>> = {
803 let reg = external_sinks().read().expect("sink registry poisoned");
804 reg.get(event.session_id()).cloned().unwrap_or_default()
805 };
806 for sink in sinks {
807 sink.handle_event(event);
808 }
809}
810
811fn now_ms() -> i64 {
812 std::time::SystemTime::now()
813 .duration_since(std::time::UNIX_EPOCH)
814 .map(|duration| duration.as_millis() as i64)
815 .unwrap_or(0)
816}
817
818pub fn session_external_sink_count(session_id: &str) -> usize {
819 external_sinks()
820 .read()
821 .expect("sink registry poisoned")
822 .get(session_id)
823 .map(|v| v.len())
824 .unwrap_or(0)
825}
826
827pub fn session_closure_subscriber_count(session_id: &str) -> usize {
828 crate::agent_sessions::subscriber_count(session_id)
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834 use std::sync::atomic::{AtomicUsize, Ordering};
835
836 struct CountingSink(Arc<AtomicUsize>);
837 impl AgentEventSink for CountingSink {
838 fn handle_event(&self, _event: &AgentEvent) {
839 self.0.fetch_add(1, Ordering::SeqCst);
840 }
841 }
842
843 #[test]
844 fn multi_sink_fans_out_in_order() {
845 let multi = MultiSink::new();
846 let a = Arc::new(AtomicUsize::new(0));
847 let b = Arc::new(AtomicUsize::new(0));
848 multi.push(Arc::new(CountingSink(a.clone())));
849 multi.push(Arc::new(CountingSink(b.clone())));
850 let event = AgentEvent::TurnStart {
851 session_id: "s1".into(),
852 iteration: 1,
853 };
854 multi.handle_event(&event);
855 assert_eq!(a.load(Ordering::SeqCst), 1);
856 assert_eq!(b.load(Ordering::SeqCst), 1);
857 }
858
859 #[test]
860 fn session_scoped_sink_routing() {
861 reset_all_sinks();
862 let a = Arc::new(AtomicUsize::new(0));
863 let b = Arc::new(AtomicUsize::new(0));
864 register_sink("session-a", Arc::new(CountingSink(a.clone())));
865 register_sink("session-b", Arc::new(CountingSink(b.clone())));
866 emit_event(&AgentEvent::TurnStart {
867 session_id: "session-a".into(),
868 iteration: 0,
869 });
870 assert_eq!(a.load(Ordering::SeqCst), 1);
871 assert_eq!(b.load(Ordering::SeqCst), 0);
872 emit_event(&AgentEvent::TurnEnd {
873 session_id: "session-b".into(),
874 iteration: 0,
875 turn_info: serde_json::json!({}),
876 });
877 assert_eq!(a.load(Ordering::SeqCst), 1);
878 assert_eq!(b.load(Ordering::SeqCst), 1);
879 clear_session_sinks("session-a");
880 assert_eq!(session_external_sink_count("session-a"), 0);
881 assert_eq!(session_external_sink_count("session-b"), 1);
882 reset_all_sinks();
883 }
884
885 #[test]
886 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
887 use std::io::{BufRead, BufReader};
888 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
889 std::fs::create_dir_all(&dir).unwrap();
890 let path = dir.join("event_log.jsonl");
891 let sink = JsonlEventSink::open(&path).unwrap();
892 for i in 0..5 {
893 sink.handle_event(&AgentEvent::TurnStart {
894 session_id: "s".into(),
895 iteration: i,
896 });
897 }
898 assert_eq!(sink.event_count(), 5);
899 sink.flush().unwrap();
900
901 let file = std::fs::File::open(&path).unwrap();
903 let mut last_idx: i64 = -1;
904 let mut last_ts: i64 = 0;
905 for line in BufReader::new(file).lines() {
906 let line = line.unwrap();
907 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
908 let idx = val["index"].as_i64().unwrap();
909 let ts = val["emitted_at_ms"].as_i64().unwrap();
910 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
911 assert!(ts >= last_ts, "timestamps must be non-decreasing");
912 last_idx = idx;
913 last_ts = ts;
914 assert_eq!(val["type"], "turn_start");
916 }
917 assert_eq!(last_idx, 4);
918 let _ = std::fs::remove_file(&path);
919 }
920
921 #[test]
922 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
923 let terminal = AgentEvent::ToolCallUpdate {
928 session_id: "s".into(),
929 tool_call_id: "tc-1".into(),
930 tool_name: "read".into(),
931 status: ToolCallStatus::Completed,
932 raw_output: None,
933 error: None,
934 duration_ms: Some(42),
935 execution_duration_ms: Some(7),
936 error_category: None,
937 executor: None,
938 parsing: None,
939
940 raw_input: None,
941 raw_input_partial: None,
942 audit: None,
943 };
944 let value = serde_json::to_value(&terminal).unwrap();
945 assert_eq!(value["duration_ms"], serde_json::json!(42));
946 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
947
948 let intermediate = AgentEvent::ToolCallUpdate {
952 session_id: "s".into(),
953 tool_call_id: "tc-1".into(),
954 tool_name: "read".into(),
955 status: ToolCallStatus::InProgress,
956 raw_output: None,
957 error: None,
958 duration_ms: None,
959 execution_duration_ms: None,
960 error_category: None,
961 executor: None,
962 parsing: None,
963
964 raw_input: None,
965 raw_input_partial: None,
966 audit: None,
967 };
968 let value = serde_json::to_value(&intermediate).unwrap();
969 let object = value.as_object().expect("update serializes as object");
970 assert!(
971 !object.contains_key("duration_ms"),
972 "duration_ms must be omitted when None: {value}"
973 );
974 assert!(
975 !object.contains_key("execution_duration_ms"),
976 "execution_duration_ms must be omitted when None: {value}"
977 );
978 }
979
980 #[test]
981 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
982 let raw = serde_json::json!({
985 "type": "tool_call_update",
986 "session_id": "s",
987 "tool_call_id": "tc-1",
988 "tool_name": "read",
989 "status": "completed",
990 "raw_output": null,
991 "error": null,
992 });
993 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
994 match event {
995 AgentEvent::ToolCallUpdate {
996 duration_ms,
997 execution_duration_ms,
998 ..
999 } => {
1000 assert!(duration_ms.is_none());
1001 assert!(execution_duration_ms.is_none());
1002 }
1003 other => panic!("expected ToolCallUpdate, got {other:?}"),
1004 }
1005 }
1006
1007 #[test]
1008 fn tool_call_status_serde() {
1009 assert_eq!(
1010 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1011 "\"pending\""
1012 );
1013 assert_eq!(
1014 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1015 "\"in_progress\""
1016 );
1017 assert_eq!(
1018 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1019 "\"completed\""
1020 );
1021 assert_eq!(
1022 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1023 "\"failed\""
1024 );
1025 }
1026
1027 #[test]
1028 fn tool_call_error_category_serializes_as_snake_case() {
1029 let pairs = [
1030 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1031 (ToolCallErrorCategory::ToolError, "tool_error"),
1032 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1033 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1034 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1035 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1036 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1037 (ToolCallErrorCategory::Timeout, "timeout"),
1038 (ToolCallErrorCategory::Network, "network"),
1039 (ToolCallErrorCategory::Cancelled, "cancelled"),
1040 (ToolCallErrorCategory::Unknown, "unknown"),
1041 ];
1042 for (variant, wire) in pairs {
1043 let encoded = serde_json::to_string(&variant).unwrap();
1044 assert_eq!(encoded, format!("\"{wire}\""));
1045 assert_eq!(variant.as_str(), wire);
1046 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1049 assert_eq!(decoded, variant);
1050 }
1051 }
1052
1053 #[test]
1054 fn tool_executor_round_trips_with_adjacent_tag() {
1055 for executor in [
1060 ToolExecutor::HarnBuiltin,
1061 ToolExecutor::HostBridge,
1062 ToolExecutor::McpServer {
1063 server_name: "linear".to_string(),
1064 },
1065 ToolExecutor::ProviderNative,
1066 ] {
1067 let json = serde_json::to_value(&executor).unwrap();
1068 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1069 match &executor {
1070 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1071 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1072 ToolExecutor::McpServer { server_name } => {
1073 assert_eq!(kind, "mcp_server");
1074 assert_eq!(json["server_name"], *server_name);
1075 }
1076 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1077 }
1078 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1079 assert_eq!(recovered, executor);
1080 }
1081 }
1082
1083 #[test]
1084 fn tool_call_error_category_from_internal_collapses_transient_family() {
1085 use crate::value::ErrorCategory as Internal;
1086 assert_eq!(
1087 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1088 ToolCallErrorCategory::Timeout
1089 );
1090 for net in [
1091 Internal::RateLimit,
1092 Internal::Overloaded,
1093 Internal::ServerError,
1094 Internal::TransientNetwork,
1095 ] {
1096 assert_eq!(
1097 ToolCallErrorCategory::from_internal(&net),
1098 ToolCallErrorCategory::Network,
1099 "{net:?} should map to Network",
1100 );
1101 }
1102 assert_eq!(
1103 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1104 ToolCallErrorCategory::SchemaValidation
1105 );
1106 assert_eq!(
1107 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1108 ToolCallErrorCategory::ToolError
1109 );
1110 assert_eq!(
1111 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1112 ToolCallErrorCategory::PermissionDenied
1113 );
1114 assert_eq!(
1115 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1116 ToolCallErrorCategory::Cancelled
1117 );
1118 for bridge in [
1119 Internal::Auth,
1120 Internal::EgressBlocked,
1121 Internal::NotFound,
1122 Internal::CircuitOpen,
1123 Internal::Generic,
1124 ] {
1125 assert_eq!(
1126 ToolCallErrorCategory::from_internal(&bridge),
1127 ToolCallErrorCategory::HostBridgeError,
1128 "{bridge:?} should map to HostBridgeError",
1129 );
1130 }
1131 }
1132
1133 #[test]
1134 fn tool_call_update_event_omits_error_category_when_none() {
1135 let event = AgentEvent::ToolCallUpdate {
1136 session_id: "s".into(),
1137 tool_call_id: "t".into(),
1138 tool_name: "read".into(),
1139 status: ToolCallStatus::Completed,
1140 raw_output: None,
1141 error: None,
1142 duration_ms: None,
1143 execution_duration_ms: None,
1144 error_category: None,
1145 executor: None,
1146 parsing: None,
1147
1148 raw_input: None,
1149 raw_input_partial: None,
1150 audit: None,
1151 };
1152 let v = serde_json::to_value(&event).unwrap();
1153 assert_eq!(v["type"], "tool_call_update");
1154 assert!(v.get("error_category").is_none());
1155 }
1156
1157 #[test]
1158 fn tool_call_update_event_serializes_error_category_when_set() {
1159 let event = AgentEvent::ToolCallUpdate {
1160 session_id: "s".into(),
1161 tool_call_id: "t".into(),
1162 tool_name: "read".into(),
1163 status: ToolCallStatus::Failed,
1164 raw_output: None,
1165 error: Some("missing required field".into()),
1166 duration_ms: None,
1167 execution_duration_ms: None,
1168 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1169 executor: None,
1170 parsing: None,
1171
1172 raw_input: None,
1173 raw_input_partial: None,
1174 audit: None,
1175 };
1176 let v = serde_json::to_value(&event).unwrap();
1177 assert_eq!(v["error_category"], "schema_validation");
1178 assert_eq!(v["error"], "missing required field");
1179 }
1180
1181 #[test]
1182 fn tool_call_update_omits_executor_when_absent() {
1183 let event = AgentEvent::ToolCallUpdate {
1187 session_id: "s".into(),
1188 tool_call_id: "tc-1".into(),
1189 tool_name: "read".into(),
1190 status: ToolCallStatus::Completed,
1191 raw_output: None,
1192 error: None,
1193 duration_ms: None,
1194 execution_duration_ms: None,
1195 error_category: None,
1196 executor: None,
1197 parsing: None,
1198
1199 raw_input: None,
1200 raw_input_partial: None,
1201 audit: None,
1202 };
1203 let json = serde_json::to_value(&event).unwrap();
1204 assert!(json.get("executor").is_none(), "got: {json}");
1205 }
1206
1207 #[test]
1208 fn worker_event_status_strings_cover_all_variants() {
1209 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1214 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1215 assert_eq!(
1216 WorkerEvent::WorkerWaitingForInput.as_status(),
1217 "awaiting_input"
1218 );
1219 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1220 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1221 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1222
1223 for terminal in [
1224 WorkerEvent::WorkerCompleted,
1225 WorkerEvent::WorkerFailed,
1226 WorkerEvent::WorkerCancelled,
1227 ] {
1228 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1229 }
1230 for non_terminal in [
1231 WorkerEvent::WorkerSpawned,
1232 WorkerEvent::WorkerProgressed,
1233 WorkerEvent::WorkerWaitingForInput,
1234 ] {
1235 assert!(
1236 !non_terminal.is_terminal(),
1237 "{non_terminal:?} should not be terminal"
1238 );
1239 }
1240 }
1241
1242 #[test]
1243 fn worker_update_event_routes_through_session_keyed_sink() {
1244 reset_all_sinks();
1249 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1250 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1251 impl AgentEventSink for CapturingSink {
1252 fn handle_event(&self, event: &AgentEvent) {
1253 self.0
1254 .lock()
1255 .expect("captured sink mutex poisoned")
1256 .push(event.clone());
1257 }
1258 }
1259 register_sink(
1260 "worker-session-1",
1261 Arc::new(CapturingSink(captured.clone())),
1262 );
1263 emit_event(&AgentEvent::WorkerUpdate {
1264 session_id: "worker-session-1".into(),
1265 worker_id: "worker_42".into(),
1266 worker_name: "review_captain".into(),
1267 worker_task: "review pr".into(),
1268 worker_mode: "delegated_stage".into(),
1269 event: WorkerEvent::WorkerWaitingForInput,
1270 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1271 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1272 audit: None,
1273 });
1274 emit_event(&AgentEvent::WorkerUpdate {
1276 session_id: "other-session".into(),
1277 worker_id: "w2".into(),
1278 worker_name: "n2".into(),
1279 worker_task: "t2".into(),
1280 worker_mode: "delegated_stage".into(),
1281 event: WorkerEvent::WorkerCompleted,
1282 status: "completed".into(),
1283 metadata: serde_json::json!({}),
1284 audit: None,
1285 });
1286 let received = captured.lock().unwrap().clone();
1287 assert_eq!(received.len(), 1, "got: {received:?}");
1288 match &received[0] {
1289 AgentEvent::WorkerUpdate {
1290 session_id,
1291 worker_id,
1292 event,
1293 status,
1294 ..
1295 } => {
1296 assert_eq!(session_id, "worker-session-1");
1297 assert_eq!(worker_id, "worker_42");
1298 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1299 assert_eq!(status, "awaiting_input");
1300 }
1301 other => panic!("expected WorkerUpdate, got {other:?}"),
1302 }
1303 reset_all_sinks();
1304 }
1305
1306 #[test]
1307 fn worker_update_event_serializes_to_canonical_shape() {
1308 let event = AgentEvent::WorkerUpdate {
1314 session_id: "s".into(),
1315 worker_id: "w".into(),
1316 worker_name: "n".into(),
1317 worker_task: "t".into(),
1318 worker_mode: "delegated_stage".into(),
1319 event: WorkerEvent::WorkerProgressed,
1320 status: "progressed".into(),
1321 metadata: serde_json::json!({"started_at": "0193..."}),
1322 audit: Some(serde_json::json!({"run_id": "run_x"})),
1323 };
1324 let value = serde_json::to_value(&event).unwrap();
1325 assert_eq!(value["type"], "worker_update");
1326 assert_eq!(value["session_id"], "s");
1327 assert_eq!(value["worker_id"], "w");
1328 assert_eq!(value["status"], "progressed");
1329 assert_eq!(value["audit"]["run_id"], "run_x");
1330
1331 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1335 match recovered {
1336 AgentEvent::WorkerUpdate {
1337 event: recovered_event,
1338 ..
1339 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1340 other => panic!("expected WorkerUpdate, got {other:?}"),
1341 }
1342 }
1343
1344 #[test]
1345 fn tool_call_update_includes_executor_when_present() {
1346 let event = AgentEvent::ToolCallUpdate {
1347 session_id: "s".into(),
1348 tool_call_id: "tc-1".into(),
1349 tool_name: "read".into(),
1350 status: ToolCallStatus::Completed,
1351 raw_output: None,
1352 error: None,
1353 duration_ms: None,
1354 execution_duration_ms: None,
1355 error_category: None,
1356 executor: Some(ToolExecutor::McpServer {
1357 server_name: "github".into(),
1358 }),
1359 parsing: None,
1360
1361 raw_input: None,
1362 raw_input_partial: None,
1363 audit: None,
1364 };
1365 let json = serde_json::to_value(&event).unwrap();
1366 assert_eq!(json["executor"]["kind"], "mcp_server");
1367 assert_eq!(json["executor"]["server_name"], "github");
1368 }
1369
1370 #[test]
1371 fn tool_call_update_omits_audit_when_absent() {
1372 let event = AgentEvent::ToolCallUpdate {
1373 session_id: "s".into(),
1374 tool_call_id: "tc-1".into(),
1375 tool_name: "read".into(),
1376 status: ToolCallStatus::Completed,
1377 raw_output: None,
1378 error: None,
1379 duration_ms: None,
1380 execution_duration_ms: None,
1381 error_category: None,
1382 executor: None,
1383 parsing: None,
1384 raw_input: None,
1385 raw_input_partial: None,
1386 audit: None,
1387 };
1388 let json = serde_json::to_value(&event).unwrap();
1389 assert!(json.get("audit").is_none(), "got: {json}");
1390 }
1391
1392 #[test]
1393 fn tool_call_update_includes_audit_when_present() {
1394 let audit = MutationSessionRecord {
1395 session_id: "session_42".into(),
1396 run_id: Some("run_42".into()),
1397 mutation_scope: "apply_workspace".into(),
1398 execution_kind: Some("worker".into()),
1399 ..Default::default()
1400 };
1401 let event = AgentEvent::ToolCallUpdate {
1402 session_id: "s".into(),
1403 tool_call_id: "tc-1".into(),
1404 tool_name: "edit_file".into(),
1405 status: ToolCallStatus::Completed,
1406 raw_output: None,
1407 error: None,
1408 duration_ms: None,
1409 execution_duration_ms: None,
1410 error_category: None,
1411 executor: Some(ToolExecutor::HostBridge),
1412 parsing: None,
1413 raw_input: None,
1414 raw_input_partial: None,
1415 audit: Some(audit),
1416 };
1417 let json = serde_json::to_value(&event).unwrap();
1418 assert_eq!(json["audit"]["session_id"], "session_42");
1419 assert_eq!(json["audit"]["run_id"], "run_42");
1420 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1421 assert_eq!(json["audit"]["execution_kind"], "worker");
1422 }
1423
1424 #[test]
1425 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1426 let raw = serde_json::json!({
1427 "type": "tool_call_update",
1428 "session_id": "s",
1429 "tool_call_id": "tc-1",
1430 "tool_name": "read",
1431 "status": "completed",
1432 "raw_output": null,
1433 "error": null,
1434 });
1435 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1436 match event {
1437 AgentEvent::ToolCallUpdate { audit, .. } => {
1438 assert!(audit.is_none());
1439 }
1440 other => panic!("expected ToolCallUpdate, got {other:?}"),
1441 }
1442 }
1443}