1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126 SchemaValidation,
129 ToolError,
132 McpServerError,
134 HostBridgeError,
136 PermissionDenied,
139 RejectedLoop,
142 ParseAborted,
150 Timeout,
152 Network,
154 Cancelled,
156 Unknown,
158}
159
160impl ToolCallErrorCategory {
161 pub fn as_str(self) -> &'static str {
162 match self {
163 Self::SchemaValidation => "schema_validation",
164 Self::ToolError => "tool_error",
165 Self::McpServerError => "mcp_server_error",
166 Self::HostBridgeError => "host_bridge_error",
167 Self::PermissionDenied => "permission_denied",
168 Self::RejectedLoop => "rejected_loop",
169 Self::ParseAborted => "parse_aborted",
170 Self::Timeout => "timeout",
171 Self::Network => "network",
172 Self::Cancelled => "cancelled",
173 Self::Unknown => "unknown",
174 }
175 }
176
177 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184 use crate::value::ErrorCategory as Internal;
185 match category {
186 Internal::Timeout => Self::Timeout,
187 Internal::RateLimit
188 | Internal::Overloaded
189 | Internal::ServerError
190 | Internal::TransientNetwork => Self::Network,
191 Internal::SchemaValidation => Self::SchemaValidation,
192 Internal::ToolError => Self::ToolError,
193 Internal::ToolRejected => Self::PermissionDenied,
194 Internal::Cancelled => Self::Cancelled,
195 Internal::Auth
196 | Internal::EgressBlocked
197 | Internal::NotFound
198 | Internal::CircuitOpen
199 | Internal::BudgetExceeded
200 | Internal::Generic => Self::HostBridgeError,
201 }
202 }
203}
204
205#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218 HarnBuiltin,
221 HostBridge,
224 McpServer { server_name: String },
228 ProviderNative,
233}
234
235#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240 AgentMessageChunk {
241 session_id: String,
242 content: String,
243 },
244 AgentThoughtChunk {
245 session_id: String,
246 content: String,
247 },
248 ToolCall {
249 session_id: String,
250 tool_call_id: String,
251 tool_name: String,
252 kind: Option<ToolKind>,
253 status: ToolCallStatus,
254 raw_input: serde_json::Value,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
268 parsing: Option<bool>,
269 #[serde(default, skip_serializing_if = "Option::is_none")]
273 audit: Option<MutationSessionRecord>,
274 },
275 ToolCallUpdate {
276 session_id: String,
277 tool_call_id: String,
278 tool_name: String,
279 status: ToolCallStatus,
280 raw_output: Option<serde_json::Value>,
281 error: Option<String>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
290 duration_ms: Option<u64>,
291 #[serde(default, skip_serializing_if = "Option::is_none")]
295 execution_duration_ms: Option<u64>,
296 #[serde(default, skip_serializing_if = "Option::is_none")]
302 error_category: Option<ToolCallErrorCategory>,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
308 executor: Option<ToolExecutor>,
309 #[serde(default, skip_serializing_if = "Option::is_none")]
318 parsing: Option<bool>,
319 #[serde(default, skip_serializing_if = "Option::is_none")]
327 raw_input: Option<serde_json::Value>,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
332 raw_input_partial: Option<String>,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
338 audit: Option<MutationSessionRecord>,
339 },
340 Plan {
341 session_id: String,
342 plan: serde_json::Value,
343 },
344 TurnStart {
345 session_id: String,
346 iteration: usize,
347 },
348 TurnEnd {
349 session_id: String,
350 iteration: usize,
351 turn_info: serde_json::Value,
352 },
353 FeedbackInjected {
354 session_id: String,
355 kind: String,
356 content: String,
357 },
358 BudgetExhausted {
362 session_id: String,
363 max_iterations: usize,
364 },
365 LoopStuck {
369 session_id: String,
370 max_nudges: usize,
371 last_iteration: usize,
372 tail_excerpt: String,
373 },
374 DaemonWatchdogTripped {
379 session_id: String,
380 attempts: usize,
381 elapsed_ms: u64,
382 },
383 SkillActivated {
387 session_id: String,
388 skill_name: String,
389 iteration: usize,
390 reason: String,
391 },
392 SkillDeactivated {
395 session_id: String,
396 skill_name: String,
397 iteration: usize,
398 },
399 SkillScopeTools {
402 session_id: String,
403 skill_name: String,
404 allowed_tools: Vec<String>,
405 },
406 ToolSearchQuery {
414 session_id: String,
415 tool_use_id: String,
416 name: String,
417 query: serde_json::Value,
418 strategy: String,
419 mode: String,
420 },
421 ToolSearchResult {
425 session_id: String,
426 tool_use_id: String,
427 promoted: Vec<String>,
428 strategy: String,
429 mode: String,
430 },
431 TranscriptCompacted {
432 session_id: String,
433 mode: String,
434 strategy: String,
435 archived_messages: usize,
436 estimated_tokens_before: usize,
437 estimated_tokens_after: usize,
438 snapshot_asset_id: Option<String>,
439 },
440 Handoff {
441 session_id: String,
442 artifact_id: String,
443 handoff: Box<HandoffArtifact>,
444 },
445 FsWatch {
446 session_id: String,
447 subscription_id: String,
448 events: Vec<FsWatchEvent>,
449 },
450 WorkerUpdate {
466 session_id: String,
467 worker_id: String,
468 worker_name: String,
469 worker_task: String,
470 worker_mode: String,
471 event: WorkerEvent,
472 status: String,
473 metadata: serde_json::Value,
474 audit: Option<serde_json::Value>,
475 },
476}
477
478impl AgentEvent {
479 pub fn session_id(&self) -> &str {
480 match self {
481 Self::AgentMessageChunk { session_id, .. }
482 | Self::AgentThoughtChunk { session_id, .. }
483 | Self::ToolCall { session_id, .. }
484 | Self::ToolCallUpdate { session_id, .. }
485 | Self::Plan { session_id, .. }
486 | Self::TurnStart { session_id, .. }
487 | Self::TurnEnd { session_id, .. }
488 | Self::FeedbackInjected { session_id, .. }
489 | Self::BudgetExhausted { session_id, .. }
490 | Self::LoopStuck { session_id, .. }
491 | Self::DaemonWatchdogTripped { session_id, .. }
492 | Self::SkillActivated { session_id, .. }
493 | Self::SkillDeactivated { session_id, .. }
494 | Self::SkillScopeTools { session_id, .. }
495 | Self::ToolSearchQuery { session_id, .. }
496 | Self::ToolSearchResult { session_id, .. }
497 | Self::TranscriptCompacted { session_id, .. }
498 | Self::Handoff { session_id, .. }
499 | Self::FsWatch { session_id, .. }
500 | Self::WorkerUpdate { session_id, .. } => session_id,
501 }
502 }
503}
504
505pub trait AgentEventSink: Send + Sync {
508 fn handle_event(&self, event: &AgentEvent);
509}
510
511#[derive(Clone, Debug, Serialize, Deserialize)]
518pub struct PersistedAgentEvent {
519 pub index: u64,
523 pub emitted_at_ms: i64,
527 pub frame_depth: Option<u32>,
531 #[serde(flatten)]
533 pub event: AgentEvent,
534}
535
536pub struct JsonlEventSink {
541 state: Mutex<JsonlEventSinkState>,
542 base_path: std::path::PathBuf,
543}
544
545struct JsonlEventSinkState {
546 writer: std::io::BufWriter<std::fs::File>,
547 index: u64,
548 bytes_written: u64,
549 rotation: u32,
550}
551
552impl JsonlEventSink {
553 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
557
558 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
562 let base_path = base_path.into();
563 if let Some(parent) = base_path.parent() {
564 std::fs::create_dir_all(parent)?;
565 }
566 let file = std::fs::OpenOptions::new()
567 .create(true)
568 .truncate(true)
569 .write(true)
570 .open(&base_path)?;
571 Ok(Arc::new(Self {
572 state: Mutex::new(JsonlEventSinkState {
573 writer: std::io::BufWriter::new(file),
574 index: 0,
575 bytes_written: 0,
576 rotation: 0,
577 }),
578 base_path,
579 }))
580 }
581
582 pub fn flush(&self) -> std::io::Result<()> {
585 use std::io::Write as _;
586 self.state
587 .lock()
588 .expect("jsonl sink mutex poisoned")
589 .writer
590 .flush()
591 }
592
593 pub fn event_count(&self) -> u64 {
596 self.state.lock().expect("jsonl sink mutex poisoned").index
597 }
598
599 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
600 use std::io::Write as _;
601 if state.bytes_written < Self::ROTATE_BYTES {
602 return Ok(());
603 }
604 state.writer.flush()?;
605 state.rotation += 1;
606 let suffix = format!("-{:06}", state.rotation);
607 let rotated = self.base_path.with_file_name({
608 let stem = self
609 .base_path
610 .file_stem()
611 .and_then(|s| s.to_str())
612 .unwrap_or("event_log");
613 let ext = self
614 .base_path
615 .extension()
616 .and_then(|e| e.to_str())
617 .unwrap_or("jsonl");
618 format!("{stem}{suffix}.{ext}")
619 });
620 let file = std::fs::OpenOptions::new()
621 .create(true)
622 .truncate(true)
623 .write(true)
624 .open(&rotated)?;
625 state.writer = std::io::BufWriter::new(file);
626 state.bytes_written = 0;
627 Ok(())
628 }
629}
630
631pub struct EventLogSink {
636 log: Arc<AnyEventLog>,
637 topic: Topic,
638 session_id: String,
639}
640
641impl EventLogSink {
642 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
643 let session_id = session_id.into();
644 let topic = Topic::new(format!(
645 "observability.agent_events.{}",
646 crate::event_log::sanitize_topic_component(&session_id)
647 ))
648 .expect("session id should sanitize to a valid topic");
649 Arc::new(Self {
650 log,
651 topic,
652 session_id,
653 })
654 }
655}
656
657impl AgentEventSink for JsonlEventSink {
658 fn handle_event(&self, event: &AgentEvent) {
659 use std::io::Write as _;
660 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
661 let index = state.index;
662 state.index += 1;
663 let emitted_at_ms = std::time::SystemTime::now()
664 .duration_since(std::time::UNIX_EPOCH)
665 .map(|d| d.as_millis() as i64)
666 .unwrap_or(0);
667 let envelope = PersistedAgentEvent {
668 index,
669 emitted_at_ms,
670 frame_depth: None,
671 event: event.clone(),
672 };
673 if let Ok(line) = serde_json::to_string(&envelope) {
674 let _ = state.writer.write_all(line.as_bytes());
679 let _ = state.writer.write_all(b"\n");
680 state.bytes_written += line.len() as u64 + 1;
681 let _ = self.rotate_if_needed(&mut state);
682 }
683 }
684}
685
686impl AgentEventSink for EventLogSink {
687 fn handle_event(&self, event: &AgentEvent) {
688 let event_json = match serde_json::to_value(event) {
689 Ok(value) => value,
690 Err(_) => return,
691 };
692 let event_kind = event_json
693 .get("type")
694 .and_then(|value| value.as_str())
695 .unwrap_or("agent_event")
696 .to_string();
697 let payload = serde_json::json!({
698 "index_hint": now_ms(),
699 "session_id": self.session_id,
700 "event": event_json,
701 });
702 let mut headers = std::collections::BTreeMap::new();
703 headers.insert("session_id".to_string(), self.session_id.clone());
704 let log = self.log.clone();
705 let topic = self.topic.clone();
706 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
707 if let Ok(handle) = tokio::runtime::Handle::try_current() {
708 handle.spawn(async move {
709 let _ = log.append(&topic, record).await;
710 });
711 } else {
712 let _ = futures::executor::block_on(log.append(&topic, record));
713 }
714 }
715}
716
717impl Drop for JsonlEventSink {
718 fn drop(&mut self) {
719 if let Ok(mut state) = self.state.lock() {
720 use std::io::Write as _;
721 let _ = state.writer.flush();
722 }
723 }
724}
725
726pub struct MultiSink {
728 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
729}
730
731impl MultiSink {
732 pub fn new() -> Self {
733 Self {
734 sinks: Mutex::new(Vec::new()),
735 }
736 }
737 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
738 self.sinks.lock().expect("sink mutex poisoned").push(sink);
739 }
740 pub fn len(&self) -> usize {
741 self.sinks.lock().expect("sink mutex poisoned").len()
742 }
743 pub fn is_empty(&self) -> bool {
744 self.len() == 0
745 }
746}
747
748impl Default for MultiSink {
749 fn default() -> Self {
750 Self::new()
751 }
752}
753
754impl AgentEventSink for MultiSink {
755 fn handle_event(&self, event: &AgentEvent) {
756 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
762 for sink in sinks {
763 sink.handle_event(event);
764 }
765 }
766}
767
768#[cfg(test)]
769#[derive(Clone)]
770struct RegisteredSink {
771 owner: std::thread::ThreadId,
772 sink: Arc<dyn AgentEventSink>,
773}
774
775#[cfg(not(test))]
776type RegisteredSink = Arc<dyn AgentEventSink>;
777
778type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
779
780fn external_sinks() -> &'static ExternalSinkRegistry {
781 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
782 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
783}
784
785pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
786 let session_id = session_id.into();
787 let mut reg = external_sinks().write().expect("sink registry poisoned");
788 #[cfg(test)]
789 let sink = RegisteredSink {
790 owner: std::thread::current().id(),
791 sink,
792 };
793 reg.entry(session_id).or_default().push(sink);
794}
795
796pub fn clear_session_sinks(session_id: &str) {
800 #[cfg(test)]
801 {
802 let owner = std::thread::current().id();
803 let mut reg = external_sinks().write().expect("sink registry poisoned");
804 if let Some(sinks) = reg.get_mut(session_id) {
805 sinks.retain(|sink| sink.owner != owner);
806 if sinks.is_empty() {
807 reg.remove(session_id);
808 }
809 }
810 }
811 #[cfg(not(test))]
812 {
813 external_sinks()
814 .write()
815 .expect("sink registry poisoned")
816 .remove(session_id);
817 }
818}
819
820pub fn reset_all_sinks() {
821 #[cfg(test)]
822 {
823 let owner = std::thread::current().id();
824 let mut reg = external_sinks().write().expect("sink registry poisoned");
825 reg.retain(|_, sinks| {
826 sinks.retain(|sink| sink.owner != owner);
827 !sinks.is_empty()
828 });
829 crate::agent_sessions::reset_session_store();
830 }
831 #[cfg(not(test))]
832 {
833 external_sinks()
834 .write()
835 .expect("sink registry poisoned")
836 .clear();
837 crate::agent_sessions::reset_session_store();
838 }
839}
840
841pub fn emit_event(event: &AgentEvent) {
845 let sinks: Vec<Arc<dyn AgentEventSink>> = {
846 let reg = external_sinks().read().expect("sink registry poisoned");
847 #[cfg(test)]
848 {
849 let owner = std::thread::current().id();
850 reg.get(event.session_id())
851 .map(|sinks| {
852 sinks
853 .iter()
854 .filter(|sink| sink.owner == owner)
855 .map(|sink| sink.sink.clone())
856 .collect()
857 })
858 .unwrap_or_default()
859 }
860 #[cfg(not(test))]
861 {
862 reg.get(event.session_id()).cloned().unwrap_or_default()
863 }
864 };
865 for sink in sinks {
866 sink.handle_event(event);
867 }
868}
869
870fn now_ms() -> i64 {
871 std::time::SystemTime::now()
872 .duration_since(std::time::UNIX_EPOCH)
873 .map(|duration| duration.as_millis() as i64)
874 .unwrap_or(0)
875}
876
877pub fn session_external_sink_count(session_id: &str) -> usize {
878 #[cfg(test)]
879 {
880 let owner = std::thread::current().id();
881 return external_sinks()
882 .read()
883 .expect("sink registry poisoned")
884 .get(session_id)
885 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
886 .unwrap_or(0);
887 }
888 #[cfg(not(test))]
889 {
890 external_sinks()
891 .read()
892 .expect("sink registry poisoned")
893 .get(session_id)
894 .map(|v| v.len())
895 .unwrap_or(0)
896 }
897}
898
899pub fn session_closure_subscriber_count(session_id: &str) -> usize {
900 crate::agent_sessions::subscriber_count(session_id)
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use std::sync::atomic::{AtomicUsize, Ordering};
907
908 struct CountingSink(Arc<AtomicUsize>);
909 impl AgentEventSink for CountingSink {
910 fn handle_event(&self, _event: &AgentEvent) {
911 self.0.fetch_add(1, Ordering::SeqCst);
912 }
913 }
914
915 #[test]
916 fn multi_sink_fans_out_in_order() {
917 let multi = MultiSink::new();
918 let a = Arc::new(AtomicUsize::new(0));
919 let b = Arc::new(AtomicUsize::new(0));
920 multi.push(Arc::new(CountingSink(a.clone())));
921 multi.push(Arc::new(CountingSink(b.clone())));
922 let event = AgentEvent::TurnStart {
923 session_id: "s1".into(),
924 iteration: 1,
925 };
926 multi.handle_event(&event);
927 assert_eq!(a.load(Ordering::SeqCst), 1);
928 assert_eq!(b.load(Ordering::SeqCst), 1);
929 }
930
931 #[test]
932 fn session_scoped_sink_routing() {
933 reset_all_sinks();
934 let a = Arc::new(AtomicUsize::new(0));
935 let b = Arc::new(AtomicUsize::new(0));
936 register_sink("session-a", Arc::new(CountingSink(a.clone())));
937 register_sink("session-b", Arc::new(CountingSink(b.clone())));
938 emit_event(&AgentEvent::TurnStart {
939 session_id: "session-a".into(),
940 iteration: 0,
941 });
942 assert_eq!(a.load(Ordering::SeqCst), 1);
943 assert_eq!(b.load(Ordering::SeqCst), 0);
944 emit_event(&AgentEvent::TurnEnd {
945 session_id: "session-b".into(),
946 iteration: 0,
947 turn_info: serde_json::json!({}),
948 });
949 assert_eq!(a.load(Ordering::SeqCst), 1);
950 assert_eq!(b.load(Ordering::SeqCst), 1);
951 clear_session_sinks("session-a");
952 assert_eq!(session_external_sink_count("session-a"), 0);
953 assert_eq!(session_external_sink_count("session-b"), 1);
954 reset_all_sinks();
955 }
956
957 #[test]
958 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
959 use std::io::{BufRead, BufReader};
960 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
961 std::fs::create_dir_all(&dir).unwrap();
962 let path = dir.join("event_log.jsonl");
963 let sink = JsonlEventSink::open(&path).unwrap();
964 for i in 0..5 {
965 sink.handle_event(&AgentEvent::TurnStart {
966 session_id: "s".into(),
967 iteration: i,
968 });
969 }
970 assert_eq!(sink.event_count(), 5);
971 sink.flush().unwrap();
972
973 let file = std::fs::File::open(&path).unwrap();
975 let mut last_idx: i64 = -1;
976 let mut last_ts: i64 = 0;
977 for line in BufReader::new(file).lines() {
978 let line = line.unwrap();
979 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
980 let idx = val["index"].as_i64().unwrap();
981 let ts = val["emitted_at_ms"].as_i64().unwrap();
982 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
983 assert!(ts >= last_ts, "timestamps must be non-decreasing");
984 last_idx = idx;
985 last_ts = ts;
986 assert_eq!(val["type"], "turn_start");
988 }
989 assert_eq!(last_idx, 4);
990 let _ = std::fs::remove_file(&path);
991 }
992
993 #[test]
994 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
995 let terminal = AgentEvent::ToolCallUpdate {
1000 session_id: "s".into(),
1001 tool_call_id: "tc-1".into(),
1002 tool_name: "read".into(),
1003 status: ToolCallStatus::Completed,
1004 raw_output: None,
1005 error: None,
1006 duration_ms: Some(42),
1007 execution_duration_ms: Some(7),
1008 error_category: None,
1009 executor: None,
1010 parsing: None,
1011
1012 raw_input: None,
1013 raw_input_partial: None,
1014 audit: None,
1015 };
1016 let value = serde_json::to_value(&terminal).unwrap();
1017 assert_eq!(value["duration_ms"], serde_json::json!(42));
1018 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1019
1020 let intermediate = AgentEvent::ToolCallUpdate {
1024 session_id: "s".into(),
1025 tool_call_id: "tc-1".into(),
1026 tool_name: "read".into(),
1027 status: ToolCallStatus::InProgress,
1028 raw_output: None,
1029 error: None,
1030 duration_ms: None,
1031 execution_duration_ms: None,
1032 error_category: None,
1033 executor: None,
1034 parsing: None,
1035
1036 raw_input: None,
1037 raw_input_partial: None,
1038 audit: None,
1039 };
1040 let value = serde_json::to_value(&intermediate).unwrap();
1041 let object = value.as_object().expect("update serializes as object");
1042 assert!(
1043 !object.contains_key("duration_ms"),
1044 "duration_ms must be omitted when None: {value}"
1045 );
1046 assert!(
1047 !object.contains_key("execution_duration_ms"),
1048 "execution_duration_ms must be omitted when None: {value}"
1049 );
1050 }
1051
1052 #[test]
1053 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1054 let raw = serde_json::json!({
1057 "type": "tool_call_update",
1058 "session_id": "s",
1059 "tool_call_id": "tc-1",
1060 "tool_name": "read",
1061 "status": "completed",
1062 "raw_output": null,
1063 "error": null,
1064 });
1065 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1066 match event {
1067 AgentEvent::ToolCallUpdate {
1068 duration_ms,
1069 execution_duration_ms,
1070 ..
1071 } => {
1072 assert!(duration_ms.is_none());
1073 assert!(execution_duration_ms.is_none());
1074 }
1075 other => panic!("expected ToolCallUpdate, got {other:?}"),
1076 }
1077 }
1078
1079 #[test]
1080 fn tool_call_status_serde() {
1081 assert_eq!(
1082 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1083 "\"pending\""
1084 );
1085 assert_eq!(
1086 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1087 "\"in_progress\""
1088 );
1089 assert_eq!(
1090 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1091 "\"completed\""
1092 );
1093 assert_eq!(
1094 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1095 "\"failed\""
1096 );
1097 }
1098
1099 #[test]
1100 fn tool_call_error_category_serializes_as_snake_case() {
1101 let pairs = [
1102 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1103 (ToolCallErrorCategory::ToolError, "tool_error"),
1104 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1105 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1106 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1107 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1108 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1109 (ToolCallErrorCategory::Timeout, "timeout"),
1110 (ToolCallErrorCategory::Network, "network"),
1111 (ToolCallErrorCategory::Cancelled, "cancelled"),
1112 (ToolCallErrorCategory::Unknown, "unknown"),
1113 ];
1114 for (variant, wire) in pairs {
1115 let encoded = serde_json::to_string(&variant).unwrap();
1116 assert_eq!(encoded, format!("\"{wire}\""));
1117 assert_eq!(variant.as_str(), wire);
1118 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1121 assert_eq!(decoded, variant);
1122 }
1123 }
1124
1125 #[test]
1126 fn tool_executor_round_trips_with_adjacent_tag() {
1127 for executor in [
1132 ToolExecutor::HarnBuiltin,
1133 ToolExecutor::HostBridge,
1134 ToolExecutor::McpServer {
1135 server_name: "linear".to_string(),
1136 },
1137 ToolExecutor::ProviderNative,
1138 ] {
1139 let json = serde_json::to_value(&executor).unwrap();
1140 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1141 match &executor {
1142 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1143 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1144 ToolExecutor::McpServer { server_name } => {
1145 assert_eq!(kind, "mcp_server");
1146 assert_eq!(json["server_name"], *server_name);
1147 }
1148 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1149 }
1150 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1151 assert_eq!(recovered, executor);
1152 }
1153 }
1154
1155 #[test]
1156 fn tool_call_error_category_from_internal_collapses_transient_family() {
1157 use crate::value::ErrorCategory as Internal;
1158 assert_eq!(
1159 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1160 ToolCallErrorCategory::Timeout
1161 );
1162 for net in [
1163 Internal::RateLimit,
1164 Internal::Overloaded,
1165 Internal::ServerError,
1166 Internal::TransientNetwork,
1167 ] {
1168 assert_eq!(
1169 ToolCallErrorCategory::from_internal(&net),
1170 ToolCallErrorCategory::Network,
1171 "{net:?} should map to Network",
1172 );
1173 }
1174 assert_eq!(
1175 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1176 ToolCallErrorCategory::SchemaValidation
1177 );
1178 assert_eq!(
1179 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1180 ToolCallErrorCategory::ToolError
1181 );
1182 assert_eq!(
1183 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1184 ToolCallErrorCategory::PermissionDenied
1185 );
1186 assert_eq!(
1187 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1188 ToolCallErrorCategory::Cancelled
1189 );
1190 for bridge in [
1191 Internal::Auth,
1192 Internal::EgressBlocked,
1193 Internal::NotFound,
1194 Internal::CircuitOpen,
1195 Internal::Generic,
1196 ] {
1197 assert_eq!(
1198 ToolCallErrorCategory::from_internal(&bridge),
1199 ToolCallErrorCategory::HostBridgeError,
1200 "{bridge:?} should map to HostBridgeError",
1201 );
1202 }
1203 }
1204
1205 #[test]
1206 fn tool_call_update_event_omits_error_category_when_none() {
1207 let event = AgentEvent::ToolCallUpdate {
1208 session_id: "s".into(),
1209 tool_call_id: "t".into(),
1210 tool_name: "read".into(),
1211 status: ToolCallStatus::Completed,
1212 raw_output: None,
1213 error: None,
1214 duration_ms: None,
1215 execution_duration_ms: None,
1216 error_category: None,
1217 executor: None,
1218 parsing: None,
1219
1220 raw_input: None,
1221 raw_input_partial: None,
1222 audit: None,
1223 };
1224 let v = serde_json::to_value(&event).unwrap();
1225 assert_eq!(v["type"], "tool_call_update");
1226 assert!(v.get("error_category").is_none());
1227 }
1228
1229 #[test]
1230 fn tool_call_update_event_serializes_error_category_when_set() {
1231 let event = AgentEvent::ToolCallUpdate {
1232 session_id: "s".into(),
1233 tool_call_id: "t".into(),
1234 tool_name: "read".into(),
1235 status: ToolCallStatus::Failed,
1236 raw_output: None,
1237 error: Some("missing required field".into()),
1238 duration_ms: None,
1239 execution_duration_ms: None,
1240 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1241 executor: None,
1242 parsing: None,
1243
1244 raw_input: None,
1245 raw_input_partial: None,
1246 audit: None,
1247 };
1248 let v = serde_json::to_value(&event).unwrap();
1249 assert_eq!(v["error_category"], "schema_validation");
1250 assert_eq!(v["error"], "missing required field");
1251 }
1252
1253 #[test]
1254 fn tool_call_update_omits_executor_when_absent() {
1255 let event = AgentEvent::ToolCallUpdate {
1259 session_id: "s".into(),
1260 tool_call_id: "tc-1".into(),
1261 tool_name: "read".into(),
1262 status: ToolCallStatus::Completed,
1263 raw_output: None,
1264 error: None,
1265 duration_ms: None,
1266 execution_duration_ms: None,
1267 error_category: None,
1268 executor: None,
1269 parsing: None,
1270
1271 raw_input: None,
1272 raw_input_partial: None,
1273 audit: None,
1274 };
1275 let json = serde_json::to_value(&event).unwrap();
1276 assert!(json.get("executor").is_none(), "got: {json}");
1277 }
1278
1279 #[test]
1280 fn worker_event_status_strings_cover_all_variants() {
1281 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1286 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1287 assert_eq!(
1288 WorkerEvent::WorkerWaitingForInput.as_status(),
1289 "awaiting_input"
1290 );
1291 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1292 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1293 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1294
1295 for terminal in [
1296 WorkerEvent::WorkerCompleted,
1297 WorkerEvent::WorkerFailed,
1298 WorkerEvent::WorkerCancelled,
1299 ] {
1300 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1301 }
1302 for non_terminal in [
1303 WorkerEvent::WorkerSpawned,
1304 WorkerEvent::WorkerProgressed,
1305 WorkerEvent::WorkerWaitingForInput,
1306 ] {
1307 assert!(
1308 !non_terminal.is_terminal(),
1309 "{non_terminal:?} should not be terminal"
1310 );
1311 }
1312 }
1313
1314 #[test]
1315 fn worker_update_event_routes_through_session_keyed_sink() {
1316 reset_all_sinks();
1321 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1322 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1323 impl AgentEventSink for CapturingSink {
1324 fn handle_event(&self, event: &AgentEvent) {
1325 self.0
1326 .lock()
1327 .expect("captured sink mutex poisoned")
1328 .push(event.clone());
1329 }
1330 }
1331 register_sink(
1332 "worker-session-1",
1333 Arc::new(CapturingSink(captured.clone())),
1334 );
1335 emit_event(&AgentEvent::WorkerUpdate {
1336 session_id: "worker-session-1".into(),
1337 worker_id: "worker_42".into(),
1338 worker_name: "review_captain".into(),
1339 worker_task: "review pr".into(),
1340 worker_mode: "delegated_stage".into(),
1341 event: WorkerEvent::WorkerWaitingForInput,
1342 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1343 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1344 audit: None,
1345 });
1346 emit_event(&AgentEvent::WorkerUpdate {
1348 session_id: "other-session".into(),
1349 worker_id: "w2".into(),
1350 worker_name: "n2".into(),
1351 worker_task: "t2".into(),
1352 worker_mode: "delegated_stage".into(),
1353 event: WorkerEvent::WorkerCompleted,
1354 status: "completed".into(),
1355 metadata: serde_json::json!({}),
1356 audit: None,
1357 });
1358 let received = captured.lock().unwrap().clone();
1359 assert_eq!(received.len(), 1, "got: {received:?}");
1360 match &received[0] {
1361 AgentEvent::WorkerUpdate {
1362 session_id,
1363 worker_id,
1364 event,
1365 status,
1366 ..
1367 } => {
1368 assert_eq!(session_id, "worker-session-1");
1369 assert_eq!(worker_id, "worker_42");
1370 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1371 assert_eq!(status, "awaiting_input");
1372 }
1373 other => panic!("expected WorkerUpdate, got {other:?}"),
1374 }
1375 reset_all_sinks();
1376 }
1377
1378 #[test]
1379 fn worker_update_event_serializes_to_canonical_shape() {
1380 let event = AgentEvent::WorkerUpdate {
1386 session_id: "s".into(),
1387 worker_id: "w".into(),
1388 worker_name: "n".into(),
1389 worker_task: "t".into(),
1390 worker_mode: "delegated_stage".into(),
1391 event: WorkerEvent::WorkerProgressed,
1392 status: "progressed".into(),
1393 metadata: serde_json::json!({"started_at": "0193..."}),
1394 audit: Some(serde_json::json!({"run_id": "run_x"})),
1395 };
1396 let value = serde_json::to_value(&event).unwrap();
1397 assert_eq!(value["type"], "worker_update");
1398 assert_eq!(value["session_id"], "s");
1399 assert_eq!(value["worker_id"], "w");
1400 assert_eq!(value["status"], "progressed");
1401 assert_eq!(value["audit"]["run_id"], "run_x");
1402
1403 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1407 match recovered {
1408 AgentEvent::WorkerUpdate {
1409 event: recovered_event,
1410 ..
1411 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1412 other => panic!("expected WorkerUpdate, got {other:?}"),
1413 }
1414 }
1415
1416 #[test]
1417 fn tool_call_update_includes_executor_when_present() {
1418 let event = AgentEvent::ToolCallUpdate {
1419 session_id: "s".into(),
1420 tool_call_id: "tc-1".into(),
1421 tool_name: "read".into(),
1422 status: ToolCallStatus::Completed,
1423 raw_output: None,
1424 error: None,
1425 duration_ms: None,
1426 execution_duration_ms: None,
1427 error_category: None,
1428 executor: Some(ToolExecutor::McpServer {
1429 server_name: "github".into(),
1430 }),
1431 parsing: None,
1432
1433 raw_input: None,
1434 raw_input_partial: None,
1435 audit: None,
1436 };
1437 let json = serde_json::to_value(&event).unwrap();
1438 assert_eq!(json["executor"]["kind"], "mcp_server");
1439 assert_eq!(json["executor"]["server_name"], "github");
1440 }
1441
1442 #[test]
1443 fn tool_call_update_omits_audit_when_absent() {
1444 let event = AgentEvent::ToolCallUpdate {
1445 session_id: "s".into(),
1446 tool_call_id: "tc-1".into(),
1447 tool_name: "read".into(),
1448 status: ToolCallStatus::Completed,
1449 raw_output: None,
1450 error: None,
1451 duration_ms: None,
1452 execution_duration_ms: None,
1453 error_category: None,
1454 executor: None,
1455 parsing: None,
1456 raw_input: None,
1457 raw_input_partial: None,
1458 audit: None,
1459 };
1460 let json = serde_json::to_value(&event).unwrap();
1461 assert!(json.get("audit").is_none(), "got: {json}");
1462 }
1463
1464 #[test]
1465 fn tool_call_update_includes_audit_when_present() {
1466 let audit = MutationSessionRecord {
1467 session_id: "session_42".into(),
1468 run_id: Some("run_42".into()),
1469 mutation_scope: "apply_workspace".into(),
1470 execution_kind: Some("worker".into()),
1471 ..Default::default()
1472 };
1473 let event = AgentEvent::ToolCallUpdate {
1474 session_id: "s".into(),
1475 tool_call_id: "tc-1".into(),
1476 tool_name: "edit_file".into(),
1477 status: ToolCallStatus::Completed,
1478 raw_output: None,
1479 error: None,
1480 duration_ms: None,
1481 execution_duration_ms: None,
1482 error_category: None,
1483 executor: Some(ToolExecutor::HostBridge),
1484 parsing: None,
1485 raw_input: None,
1486 raw_input_partial: None,
1487 audit: Some(audit),
1488 };
1489 let json = serde_json::to_value(&event).unwrap();
1490 assert_eq!(json["audit"]["session_id"], "session_42");
1491 assert_eq!(json["audit"]["run_id"], "run_42");
1492 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1493 assert_eq!(json["audit"]["execution_kind"], "worker");
1494 }
1495
1496 #[test]
1497 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1498 let raw = serde_json::json!({
1499 "type": "tool_call_update",
1500 "session_id": "s",
1501 "tool_call_id": "tc-1",
1502 "tool_name": "read",
1503 "status": "completed",
1504 "raw_output": null,
1505 "error": null,
1506 });
1507 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1508 match event {
1509 AgentEvent::ToolCallUpdate { audit, .. } => {
1510 assert!(audit.is_none());
1511 }
1512 other => panic!("expected ToolCallUpdate, got {other:?}"),
1513 }
1514 }
1515}