1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34 pub kind: String,
35 pub paths: Vec<String>,
36 pub relative_paths: Vec<String>,
37 pub raw_kind: String,
38 pub error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54 WorkerSpawned,
55 WorkerProgressed,
56 WorkerWaitingForInput,
57 WorkerCompleted,
58 WorkerFailed,
59 WorkerCancelled,
60}
61
62impl WorkerEvent {
63 pub fn as_status(self) -> &'static str {
70 match self {
71 Self::WorkerSpawned => "running",
72 Self::WorkerProgressed => "progressed",
73 Self::WorkerWaitingForInput => "awaiting_input",
74 Self::WorkerCompleted => "completed",
75 Self::WorkerFailed => "failed",
76 Self::WorkerCancelled => "cancelled",
77 }
78 }
79
80 pub fn as_str(self) -> &'static str {
81 match self {
82 Self::WorkerSpawned => "WorkerSpawned",
83 Self::WorkerProgressed => "WorkerProgressed",
84 Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85 Self::WorkerCompleted => "WorkerCompleted",
86 Self::WorkerFailed => "WorkerFailed",
87 Self::WorkerCancelled => "WorkerCancelled",
88 }
89 }
90
91 pub fn is_terminal(self) -> bool {
96 matches!(
97 self,
98 Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99 )
100 }
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107 Pending,
109 InProgress,
111 Completed,
113 Failed,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126 SchemaValidation,
129 ToolError,
132 McpServerError,
134 HostBridgeError,
136 PermissionDenied,
139 RejectedLoop,
142 ParseAborted,
150 Timeout,
152 Network,
154 Cancelled,
156 Unknown,
158}
159
160impl ToolCallErrorCategory {
161 pub fn as_str(self) -> &'static str {
162 match self {
163 Self::SchemaValidation => "schema_validation",
164 Self::ToolError => "tool_error",
165 Self::McpServerError => "mcp_server_error",
166 Self::HostBridgeError => "host_bridge_error",
167 Self::PermissionDenied => "permission_denied",
168 Self::RejectedLoop => "rejected_loop",
169 Self::ParseAborted => "parse_aborted",
170 Self::Timeout => "timeout",
171 Self::Network => "network",
172 Self::Cancelled => "cancelled",
173 Self::Unknown => "unknown",
174 }
175 }
176
177 pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184 use crate::value::ErrorCategory as Internal;
185 match category {
186 Internal::Timeout => Self::Timeout,
187 Internal::RateLimit
188 | Internal::Overloaded
189 | Internal::ServerError
190 | Internal::TransientNetwork => Self::Network,
191 Internal::SchemaValidation => Self::SchemaValidation,
192 Internal::ToolError => Self::ToolError,
193 Internal::ToolRejected => Self::PermissionDenied,
194 Internal::Cancelled => Self::Cancelled,
195 Internal::Auth
196 | Internal::EgressBlocked
197 | Internal::NotFound
198 | Internal::CircuitOpen
199 | Internal::BudgetExceeded
200 | Internal::Generic => Self::HostBridgeError,
201 }
202 }
203}
204
205#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218 HarnBuiltin,
221 HostBridge,
224 McpServer { server_name: String },
228 ProviderNative,
233}
234
235#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240 AgentMessageChunk {
241 session_id: String,
242 content: String,
243 },
244 AgentThoughtChunk {
245 session_id: String,
246 content: String,
247 },
248 ToolCall {
249 session_id: String,
250 tool_call_id: String,
251 tool_name: String,
252 kind: Option<ToolKind>,
253 status: ToolCallStatus,
254 raw_input: serde_json::Value,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
268 parsing: Option<bool>,
269 #[serde(default, skip_serializing_if = "Option::is_none")]
273 audit: Option<MutationSessionRecord>,
274 },
275 ToolCallUpdate {
276 session_id: String,
277 tool_call_id: String,
278 tool_name: String,
279 status: ToolCallStatus,
280 raw_output: Option<serde_json::Value>,
281 error: Option<String>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
290 duration_ms: Option<u64>,
291 #[serde(default, skip_serializing_if = "Option::is_none")]
295 execution_duration_ms: Option<u64>,
296 #[serde(default, skip_serializing_if = "Option::is_none")]
302 error_category: Option<ToolCallErrorCategory>,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
308 executor: Option<ToolExecutor>,
309 #[serde(default, skip_serializing_if = "Option::is_none")]
318 parsing: Option<bool>,
319 #[serde(default, skip_serializing_if = "Option::is_none")]
327 raw_input: Option<serde_json::Value>,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
332 raw_input_partial: Option<String>,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
338 audit: Option<MutationSessionRecord>,
339 },
340 Plan {
341 session_id: String,
342 plan: serde_json::Value,
343 },
344 TurnStart {
345 session_id: String,
346 iteration: usize,
347 },
348 TurnEnd {
349 session_id: String,
350 iteration: usize,
351 turn_info: serde_json::Value,
352 },
353 JudgeDecision {
354 session_id: String,
355 iteration: usize,
356 verdict: String,
357 reasoning: String,
358 next_step: Option<String>,
359 judge_duration_ms: u64,
360 },
361 TypedCheckpoint {
362 session_id: String,
363 checkpoint: serde_json::Value,
364 },
365 FeedbackInjected {
366 session_id: String,
367 kind: String,
368 content: String,
369 },
370 BudgetExhausted {
374 session_id: String,
375 max_iterations: usize,
376 },
377 LoopStuck {
381 session_id: String,
382 max_nudges: usize,
383 last_iteration: usize,
384 tail_excerpt: String,
385 },
386 DaemonWatchdogTripped {
391 session_id: String,
392 attempts: usize,
393 elapsed_ms: u64,
394 },
395 SkillActivated {
399 session_id: String,
400 skill_name: String,
401 iteration: usize,
402 reason: String,
403 },
404 SkillDeactivated {
407 session_id: String,
408 skill_name: String,
409 iteration: usize,
410 },
411 SkillScopeTools {
414 session_id: String,
415 skill_name: String,
416 allowed_tools: Vec<String>,
417 },
418 ToolSearchQuery {
426 session_id: String,
427 tool_use_id: String,
428 name: String,
429 query: serde_json::Value,
430 strategy: String,
431 mode: String,
432 },
433 ToolSearchResult {
437 session_id: String,
438 tool_use_id: String,
439 promoted: Vec<String>,
440 strategy: String,
441 mode: String,
442 },
443 TranscriptCompacted {
444 session_id: String,
445 mode: String,
446 strategy: String,
447 archived_messages: usize,
448 estimated_tokens_before: usize,
449 estimated_tokens_after: usize,
450 snapshot_asset_id: Option<String>,
451 },
452 Handoff {
453 session_id: String,
454 artifact_id: String,
455 handoff: Box<HandoffArtifact>,
456 },
457 FsWatch {
458 session_id: String,
459 subscription_id: String,
460 events: Vec<FsWatchEvent>,
461 },
462 WorkerUpdate {
478 session_id: String,
479 worker_id: String,
480 worker_name: String,
481 worker_task: String,
482 worker_mode: String,
483 event: WorkerEvent,
484 status: String,
485 metadata: serde_json::Value,
486 audit: Option<serde_json::Value>,
487 },
488 HitlRequested {
496 session_id: String,
497 request_id: String,
498 kind: String,
499 payload: serde_json::Value,
500 },
501 HitlResolved {
507 session_id: String,
508 request_id: String,
509 kind: String,
510 outcome: String,
511 },
512}
513
514impl AgentEvent {
515 pub fn session_id(&self) -> &str {
516 match self {
517 Self::AgentMessageChunk { session_id, .. }
518 | Self::AgentThoughtChunk { session_id, .. }
519 | Self::ToolCall { session_id, .. }
520 | Self::ToolCallUpdate { session_id, .. }
521 | Self::Plan { session_id, .. }
522 | Self::TurnStart { session_id, .. }
523 | Self::TurnEnd { session_id, .. }
524 | Self::JudgeDecision { session_id, .. }
525 | Self::TypedCheckpoint { session_id, .. }
526 | Self::FeedbackInjected { session_id, .. }
527 | Self::BudgetExhausted { session_id, .. }
528 | Self::LoopStuck { session_id, .. }
529 | Self::DaemonWatchdogTripped { session_id, .. }
530 | Self::SkillActivated { session_id, .. }
531 | Self::SkillDeactivated { session_id, .. }
532 | Self::SkillScopeTools { session_id, .. }
533 | Self::ToolSearchQuery { session_id, .. }
534 | Self::ToolSearchResult { session_id, .. }
535 | Self::TranscriptCompacted { session_id, .. }
536 | Self::Handoff { session_id, .. }
537 | Self::FsWatch { session_id, .. }
538 | Self::WorkerUpdate { session_id, .. }
539 | Self::HitlRequested { session_id, .. }
540 | Self::HitlResolved { session_id, .. } => session_id,
541 }
542 }
543}
544
545pub trait AgentEventSink: Send + Sync {
548 fn handle_event(&self, event: &AgentEvent);
549}
550
551#[derive(Clone, Debug, Serialize, Deserialize)]
558pub struct PersistedAgentEvent {
559 pub index: u64,
563 pub emitted_at_ms: i64,
567 pub frame_depth: Option<u32>,
571 #[serde(flatten)]
573 pub event: AgentEvent,
574}
575
576pub struct JsonlEventSink {
581 state: Mutex<JsonlEventSinkState>,
582 base_path: std::path::PathBuf,
583}
584
585struct JsonlEventSinkState {
586 writer: std::io::BufWriter<std::fs::File>,
587 index: u64,
588 bytes_written: u64,
589 rotation: u32,
590}
591
592impl JsonlEventSink {
593 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
597
598 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
602 let base_path = base_path.into();
603 if let Some(parent) = base_path.parent() {
604 std::fs::create_dir_all(parent)?;
605 }
606 let file = std::fs::OpenOptions::new()
607 .create(true)
608 .truncate(true)
609 .write(true)
610 .open(&base_path)?;
611 Ok(Arc::new(Self {
612 state: Mutex::new(JsonlEventSinkState {
613 writer: std::io::BufWriter::new(file),
614 index: 0,
615 bytes_written: 0,
616 rotation: 0,
617 }),
618 base_path,
619 }))
620 }
621
622 pub fn flush(&self) -> std::io::Result<()> {
625 use std::io::Write as _;
626 self.state
627 .lock()
628 .expect("jsonl sink mutex poisoned")
629 .writer
630 .flush()
631 }
632
633 pub fn event_count(&self) -> u64 {
636 self.state.lock().expect("jsonl sink mutex poisoned").index
637 }
638
639 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
640 use std::io::Write as _;
641 if state.bytes_written < Self::ROTATE_BYTES {
642 return Ok(());
643 }
644 state.writer.flush()?;
645 state.rotation += 1;
646 let suffix = format!("-{:06}", state.rotation);
647 let rotated = self.base_path.with_file_name({
648 let stem = self
649 .base_path
650 .file_stem()
651 .and_then(|s| s.to_str())
652 .unwrap_or("event_log");
653 let ext = self
654 .base_path
655 .extension()
656 .and_then(|e| e.to_str())
657 .unwrap_or("jsonl");
658 format!("{stem}{suffix}.{ext}")
659 });
660 let file = std::fs::OpenOptions::new()
661 .create(true)
662 .truncate(true)
663 .write(true)
664 .open(&rotated)?;
665 state.writer = std::io::BufWriter::new(file);
666 state.bytes_written = 0;
667 Ok(())
668 }
669}
670
671pub struct EventLogSink {
676 log: Arc<AnyEventLog>,
677 topic: Topic,
678 session_id: String,
679}
680
681impl EventLogSink {
682 pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
683 let session_id = session_id.into();
684 let topic = Topic::new(format!(
685 "observability.agent_events.{}",
686 crate::event_log::sanitize_topic_component(&session_id)
687 ))
688 .expect("session id should sanitize to a valid topic");
689 Arc::new(Self {
690 log,
691 topic,
692 session_id,
693 })
694 }
695}
696
697impl AgentEventSink for JsonlEventSink {
698 fn handle_event(&self, event: &AgentEvent) {
699 use std::io::Write as _;
700 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
701 let index = state.index;
702 state.index += 1;
703 let emitted_at_ms = std::time::SystemTime::now()
704 .duration_since(std::time::UNIX_EPOCH)
705 .map(|d| d.as_millis() as i64)
706 .unwrap_or(0);
707 let envelope = PersistedAgentEvent {
708 index,
709 emitted_at_ms,
710 frame_depth: None,
711 event: event.clone(),
712 };
713 if let Ok(line) = serde_json::to_string(&envelope) {
714 let _ = state.writer.write_all(line.as_bytes());
719 let _ = state.writer.write_all(b"\n");
720 state.bytes_written += line.len() as u64 + 1;
721 let _ = self.rotate_if_needed(&mut state);
722 }
723 }
724}
725
726impl AgentEventSink for EventLogSink {
727 fn handle_event(&self, event: &AgentEvent) {
728 let event_json = match serde_json::to_value(event) {
729 Ok(value) => value,
730 Err(_) => return,
731 };
732 let event_kind = event_json
733 .get("type")
734 .and_then(|value| value.as_str())
735 .unwrap_or("agent_event")
736 .to_string();
737 let payload = serde_json::json!({
738 "index_hint": now_ms(),
739 "session_id": self.session_id,
740 "event": event_json,
741 });
742 let mut headers = std::collections::BTreeMap::new();
743 headers.insert("session_id".to_string(), self.session_id.clone());
744 let log = self.log.clone();
745 let topic = self.topic.clone();
746 let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
747 if let Ok(handle) = tokio::runtime::Handle::try_current() {
748 handle.spawn(async move {
749 let _ = log.append(&topic, record).await;
750 });
751 } else {
752 let _ = futures::executor::block_on(log.append(&topic, record));
753 }
754 }
755}
756
757impl Drop for JsonlEventSink {
758 fn drop(&mut self) {
759 if let Ok(mut state) = self.state.lock() {
760 use std::io::Write as _;
761 let _ = state.writer.flush();
762 }
763 }
764}
765
766pub struct MultiSink {
768 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
769}
770
771impl MultiSink {
772 pub fn new() -> Self {
773 Self {
774 sinks: Mutex::new(Vec::new()),
775 }
776 }
777 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
778 self.sinks.lock().expect("sink mutex poisoned").push(sink);
779 }
780 pub fn len(&self) -> usize {
781 self.sinks.lock().expect("sink mutex poisoned").len()
782 }
783 pub fn is_empty(&self) -> bool {
784 self.len() == 0
785 }
786}
787
788impl Default for MultiSink {
789 fn default() -> Self {
790 Self::new()
791 }
792}
793
794impl AgentEventSink for MultiSink {
795 fn handle_event(&self, event: &AgentEvent) {
796 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
802 for sink in sinks {
803 sink.handle_event(event);
804 }
805 }
806}
807
808#[cfg(test)]
809#[derive(Clone)]
810struct RegisteredSink {
811 owner: std::thread::ThreadId,
812 sink: Arc<dyn AgentEventSink>,
813}
814
815#[cfg(not(test))]
816type RegisteredSink = Arc<dyn AgentEventSink>;
817
818type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
819
820fn external_sinks() -> &'static ExternalSinkRegistry {
821 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
822 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
823}
824
825pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
826 let session_id = session_id.into();
827 let mut reg = external_sinks().write().expect("sink registry poisoned");
828 #[cfg(test)]
829 let sink = RegisteredSink {
830 owner: std::thread::current().id(),
831 sink,
832 };
833 reg.entry(session_id).or_default().push(sink);
834}
835
836pub fn clear_session_sinks(session_id: &str) {
840 #[cfg(test)]
841 {
842 let owner = std::thread::current().id();
843 let mut reg = external_sinks().write().expect("sink registry poisoned");
844 if let Some(sinks) = reg.get_mut(session_id) {
845 sinks.retain(|sink| sink.owner != owner);
846 if sinks.is_empty() {
847 reg.remove(session_id);
848 }
849 }
850 }
851 #[cfg(not(test))]
852 {
853 external_sinks()
854 .write()
855 .expect("sink registry poisoned")
856 .remove(session_id);
857 }
858}
859
860pub fn reset_all_sinks() {
861 #[cfg(test)]
862 {
863 let owner = std::thread::current().id();
864 let mut reg = external_sinks().write().expect("sink registry poisoned");
865 reg.retain(|_, sinks| {
866 sinks.retain(|sink| sink.owner != owner);
867 !sinks.is_empty()
868 });
869 crate::agent_sessions::reset_session_store();
870 }
871 #[cfg(not(test))]
872 {
873 external_sinks()
874 .write()
875 .expect("sink registry poisoned")
876 .clear();
877 crate::agent_sessions::reset_session_store();
878 }
879}
880
881pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
888 if source_session_id.is_empty() || target_session_id.is_empty() {
889 return;
890 }
891 if source_session_id == target_session_id {
892 return;
893 }
894 let mut reg = external_sinks().write().expect("sink registry poisoned");
895 let Some(source_sinks) = reg.get(source_session_id).cloned() else {
896 return;
897 };
898 let target = reg.entry(target_session_id.to_string()).or_default();
899 #[cfg(test)]
900 {
901 for source in source_sinks {
902 let already_present = target
903 .iter()
904 .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
905 if !already_present {
906 target.push(source);
907 }
908 }
909 }
910 #[cfg(not(test))]
911 {
912 for source in source_sinks {
913 let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
914 if !already_present {
915 target.push(source);
916 }
917 }
918 }
919}
920
921pub fn emit_event(event: &AgentEvent) {
925 let sinks: Vec<Arc<dyn AgentEventSink>> = {
926 let reg = external_sinks().read().expect("sink registry poisoned");
927 #[cfg(test)]
928 {
929 let owner = std::thread::current().id();
930 reg.get(event.session_id())
931 .map(|sinks| {
932 sinks
933 .iter()
934 .filter(|sink| sink.owner == owner)
935 .map(|sink| sink.sink.clone())
936 .collect()
937 })
938 .unwrap_or_default()
939 }
940 #[cfg(not(test))]
941 {
942 reg.get(event.session_id()).cloned().unwrap_or_default()
943 }
944 };
945 for sink in sinks {
946 sink.handle_event(event);
947 }
948}
949
950fn now_ms() -> i64 {
951 std::time::SystemTime::now()
952 .duration_since(std::time::UNIX_EPOCH)
953 .map(|duration| duration.as_millis() as i64)
954 .unwrap_or(0)
955}
956
957pub fn session_external_sink_count(session_id: &str) -> usize {
958 #[cfg(test)]
959 {
960 let owner = std::thread::current().id();
961 return external_sinks()
962 .read()
963 .expect("sink registry poisoned")
964 .get(session_id)
965 .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
966 .unwrap_or(0);
967 }
968 #[cfg(not(test))]
969 {
970 external_sinks()
971 .read()
972 .expect("sink registry poisoned")
973 .get(session_id)
974 .map(|v| v.len())
975 .unwrap_or(0)
976 }
977}
978
979pub fn session_closure_subscriber_count(session_id: &str) -> usize {
980 crate::agent_sessions::subscriber_count(session_id)
981}
982
983#[cfg(test)]
984mod tests {
985 use super::*;
986 use std::sync::atomic::{AtomicUsize, Ordering};
987
988 struct CountingSink(Arc<AtomicUsize>);
989 impl AgentEventSink for CountingSink {
990 fn handle_event(&self, _event: &AgentEvent) {
991 self.0.fetch_add(1, Ordering::SeqCst);
992 }
993 }
994
995 #[test]
996 fn multi_sink_fans_out_in_order() {
997 let multi = MultiSink::new();
998 let a = Arc::new(AtomicUsize::new(0));
999 let b = Arc::new(AtomicUsize::new(0));
1000 multi.push(Arc::new(CountingSink(a.clone())));
1001 multi.push(Arc::new(CountingSink(b.clone())));
1002 let event = AgentEvent::TurnStart {
1003 session_id: "s1".into(),
1004 iteration: 1,
1005 };
1006 multi.handle_event(&event);
1007 assert_eq!(a.load(Ordering::SeqCst), 1);
1008 assert_eq!(b.load(Ordering::SeqCst), 1);
1009 }
1010
1011 #[test]
1012 fn session_scoped_sink_routing() {
1013 reset_all_sinks();
1014 let a = Arc::new(AtomicUsize::new(0));
1015 let b = Arc::new(AtomicUsize::new(0));
1016 register_sink("session-a", Arc::new(CountingSink(a.clone())));
1017 register_sink("session-b", Arc::new(CountingSink(b.clone())));
1018 emit_event(&AgentEvent::TurnStart {
1019 session_id: "session-a".into(),
1020 iteration: 0,
1021 });
1022 assert_eq!(a.load(Ordering::SeqCst), 1);
1023 assert_eq!(b.load(Ordering::SeqCst), 0);
1024 emit_event(&AgentEvent::TurnEnd {
1025 session_id: "session-b".into(),
1026 iteration: 0,
1027 turn_info: serde_json::json!({}),
1028 });
1029 assert_eq!(a.load(Ordering::SeqCst), 1);
1030 assert_eq!(b.load(Ordering::SeqCst), 1);
1031 clear_session_sinks("session-a");
1032 assert_eq!(session_external_sink_count("session-a"), 0);
1033 assert_eq!(session_external_sink_count("session-b"), 1);
1034 reset_all_sinks();
1035 }
1036
1037 #[test]
1038 fn newly_opened_child_session_inherits_current_external_sinks() {
1039 reset_all_sinks();
1040 let delivered = Arc::new(AtomicUsize::new(0));
1041 register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1042 {
1043 let _guard = crate::agent_sessions::enter_current_session("outer-session");
1044 let inner = crate::agent_sessions::open_or_create(None);
1045 assert_ne!(inner, "outer-session");
1046 emit_event(&AgentEvent::TurnStart {
1047 session_id: inner,
1048 iteration: 0,
1049 });
1050 }
1051 assert_eq!(delivered.load(Ordering::SeqCst), 1);
1052 reset_all_sinks();
1053 }
1054
1055 #[test]
1056 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1057 use std::io::{BufRead, BufReader};
1058 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1059 std::fs::create_dir_all(&dir).unwrap();
1060 let path = dir.join("event_log.jsonl");
1061 let sink = JsonlEventSink::open(&path).unwrap();
1062 for i in 0..5 {
1063 sink.handle_event(&AgentEvent::TurnStart {
1064 session_id: "s".into(),
1065 iteration: i,
1066 });
1067 }
1068 assert_eq!(sink.event_count(), 5);
1069 sink.flush().unwrap();
1070
1071 let file = std::fs::File::open(&path).unwrap();
1073 let mut last_idx: i64 = -1;
1074 let mut last_ts: i64 = 0;
1075 for line in BufReader::new(file).lines() {
1076 let line = line.unwrap();
1077 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1078 let idx = val["index"].as_i64().unwrap();
1079 let ts = val["emitted_at_ms"].as_i64().unwrap();
1080 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1081 assert!(ts >= last_ts, "timestamps must be non-decreasing");
1082 last_idx = idx;
1083 last_ts = ts;
1084 assert_eq!(val["type"], "turn_start");
1086 }
1087 assert_eq!(last_idx, 4);
1088 let _ = std::fs::remove_file(&path);
1089 }
1090
1091 #[test]
1092 fn judge_decision_round_trips_through_jsonl_sink() {
1093 use std::io::{BufRead, BufReader};
1094 let dir =
1095 std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1096 std::fs::create_dir_all(&dir).unwrap();
1097 let path = dir.join("event_log.jsonl");
1098 let sink = JsonlEventSink::open(&path).unwrap();
1099 sink.handle_event(&AgentEvent::JudgeDecision {
1100 session_id: "s".into(),
1101 iteration: 2,
1102 verdict: "continue".into(),
1103 reasoning: "needs a concrete next step".into(),
1104 next_step: Some("run the verifier".into()),
1105 judge_duration_ms: 17,
1106 });
1107 sink.flush().unwrap();
1108
1109 let file = std::fs::File::open(&path).unwrap();
1110 let line = BufReader::new(file).lines().next().unwrap().unwrap();
1111 let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1112 match recovered.event {
1113 AgentEvent::JudgeDecision {
1114 session_id,
1115 iteration,
1116 verdict,
1117 reasoning,
1118 next_step,
1119 judge_duration_ms,
1120 } => {
1121 assert_eq!(session_id, "s");
1122 assert_eq!(iteration, 2);
1123 assert_eq!(verdict, "continue");
1124 assert_eq!(reasoning, "needs a concrete next step");
1125 assert_eq!(next_step.as_deref(), Some("run the verifier"));
1126 assert_eq!(judge_duration_ms, 17);
1127 }
1128 other => panic!("expected JudgeDecision, got {other:?}"),
1129 }
1130 let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1131 assert_eq!(value["type"], "judge_decision");
1132 let _ = std::fs::remove_file(&path);
1133 let _ = std::fs::remove_dir(&dir);
1134 }
1135
1136 #[test]
1137 fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1138 let terminal = AgentEvent::ToolCallUpdate {
1143 session_id: "s".into(),
1144 tool_call_id: "tc-1".into(),
1145 tool_name: "read".into(),
1146 status: ToolCallStatus::Completed,
1147 raw_output: None,
1148 error: None,
1149 duration_ms: Some(42),
1150 execution_duration_ms: Some(7),
1151 error_category: None,
1152 executor: None,
1153 parsing: None,
1154
1155 raw_input: None,
1156 raw_input_partial: None,
1157 audit: None,
1158 };
1159 let value = serde_json::to_value(&terminal).unwrap();
1160 assert_eq!(value["duration_ms"], serde_json::json!(42));
1161 assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1162
1163 let intermediate = AgentEvent::ToolCallUpdate {
1167 session_id: "s".into(),
1168 tool_call_id: "tc-1".into(),
1169 tool_name: "read".into(),
1170 status: ToolCallStatus::InProgress,
1171 raw_output: None,
1172 error: None,
1173 duration_ms: None,
1174 execution_duration_ms: None,
1175 error_category: None,
1176 executor: None,
1177 parsing: None,
1178
1179 raw_input: None,
1180 raw_input_partial: None,
1181 audit: None,
1182 };
1183 let value = serde_json::to_value(&intermediate).unwrap();
1184 let object = value.as_object().expect("update serializes as object");
1185 assert!(
1186 !object.contains_key("duration_ms"),
1187 "duration_ms must be omitted when None: {value}"
1188 );
1189 assert!(
1190 !object.contains_key("execution_duration_ms"),
1191 "execution_duration_ms must be omitted when None: {value}"
1192 );
1193 }
1194
1195 #[test]
1196 fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1197 let raw = serde_json::json!({
1200 "type": "tool_call_update",
1201 "session_id": "s",
1202 "tool_call_id": "tc-1",
1203 "tool_name": "read",
1204 "status": "completed",
1205 "raw_output": null,
1206 "error": null,
1207 });
1208 let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1209 match event {
1210 AgentEvent::ToolCallUpdate {
1211 duration_ms,
1212 execution_duration_ms,
1213 ..
1214 } => {
1215 assert!(duration_ms.is_none());
1216 assert!(execution_duration_ms.is_none());
1217 }
1218 other => panic!("expected ToolCallUpdate, got {other:?}"),
1219 }
1220 }
1221
1222 #[test]
1223 fn tool_call_status_serde() {
1224 assert_eq!(
1225 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1226 "\"pending\""
1227 );
1228 assert_eq!(
1229 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1230 "\"in_progress\""
1231 );
1232 assert_eq!(
1233 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1234 "\"completed\""
1235 );
1236 assert_eq!(
1237 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1238 "\"failed\""
1239 );
1240 }
1241
1242 #[test]
1243 fn tool_call_error_category_serializes_as_snake_case() {
1244 let pairs = [
1245 (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1246 (ToolCallErrorCategory::ToolError, "tool_error"),
1247 (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1248 (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1249 (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1250 (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1251 (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1252 (ToolCallErrorCategory::Timeout, "timeout"),
1253 (ToolCallErrorCategory::Network, "network"),
1254 (ToolCallErrorCategory::Cancelled, "cancelled"),
1255 (ToolCallErrorCategory::Unknown, "unknown"),
1256 ];
1257 for (variant, wire) in pairs {
1258 let encoded = serde_json::to_string(&variant).unwrap();
1259 assert_eq!(encoded, format!("\"{wire}\""));
1260 assert_eq!(variant.as_str(), wire);
1261 let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1264 assert_eq!(decoded, variant);
1265 }
1266 }
1267
1268 #[test]
1269 fn tool_executor_round_trips_with_adjacent_tag() {
1270 for executor in [
1275 ToolExecutor::HarnBuiltin,
1276 ToolExecutor::HostBridge,
1277 ToolExecutor::McpServer {
1278 server_name: "linear".to_string(),
1279 },
1280 ToolExecutor::ProviderNative,
1281 ] {
1282 let json = serde_json::to_value(&executor).unwrap();
1283 let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1284 match &executor {
1285 ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1286 ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1287 ToolExecutor::McpServer { server_name } => {
1288 assert_eq!(kind, "mcp_server");
1289 assert_eq!(json["server_name"], *server_name);
1290 }
1291 ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1292 }
1293 let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1294 assert_eq!(recovered, executor);
1295 }
1296 }
1297
1298 #[test]
1299 fn tool_call_error_category_from_internal_collapses_transient_family() {
1300 use crate::value::ErrorCategory as Internal;
1301 assert_eq!(
1302 ToolCallErrorCategory::from_internal(&Internal::Timeout),
1303 ToolCallErrorCategory::Timeout
1304 );
1305 for net in [
1306 Internal::RateLimit,
1307 Internal::Overloaded,
1308 Internal::ServerError,
1309 Internal::TransientNetwork,
1310 ] {
1311 assert_eq!(
1312 ToolCallErrorCategory::from_internal(&net),
1313 ToolCallErrorCategory::Network,
1314 "{net:?} should map to Network",
1315 );
1316 }
1317 assert_eq!(
1318 ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1319 ToolCallErrorCategory::SchemaValidation
1320 );
1321 assert_eq!(
1322 ToolCallErrorCategory::from_internal(&Internal::ToolError),
1323 ToolCallErrorCategory::ToolError
1324 );
1325 assert_eq!(
1326 ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1327 ToolCallErrorCategory::PermissionDenied
1328 );
1329 assert_eq!(
1330 ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1331 ToolCallErrorCategory::Cancelled
1332 );
1333 for bridge in [
1334 Internal::Auth,
1335 Internal::EgressBlocked,
1336 Internal::NotFound,
1337 Internal::CircuitOpen,
1338 Internal::Generic,
1339 ] {
1340 assert_eq!(
1341 ToolCallErrorCategory::from_internal(&bridge),
1342 ToolCallErrorCategory::HostBridgeError,
1343 "{bridge:?} should map to HostBridgeError",
1344 );
1345 }
1346 }
1347
1348 #[test]
1349 fn tool_call_update_event_omits_error_category_when_none() {
1350 let event = AgentEvent::ToolCallUpdate {
1351 session_id: "s".into(),
1352 tool_call_id: "t".into(),
1353 tool_name: "read".into(),
1354 status: ToolCallStatus::Completed,
1355 raw_output: None,
1356 error: None,
1357 duration_ms: None,
1358 execution_duration_ms: None,
1359 error_category: None,
1360 executor: None,
1361 parsing: None,
1362
1363 raw_input: None,
1364 raw_input_partial: None,
1365 audit: None,
1366 };
1367 let v = serde_json::to_value(&event).unwrap();
1368 assert_eq!(v["type"], "tool_call_update");
1369 assert!(v.get("error_category").is_none());
1370 }
1371
1372 #[test]
1373 fn tool_call_update_event_serializes_error_category_when_set() {
1374 let event = AgentEvent::ToolCallUpdate {
1375 session_id: "s".into(),
1376 tool_call_id: "t".into(),
1377 tool_name: "read".into(),
1378 status: ToolCallStatus::Failed,
1379 raw_output: None,
1380 error: Some("missing required field".into()),
1381 duration_ms: None,
1382 execution_duration_ms: None,
1383 error_category: Some(ToolCallErrorCategory::SchemaValidation),
1384 executor: None,
1385 parsing: None,
1386
1387 raw_input: None,
1388 raw_input_partial: None,
1389 audit: None,
1390 };
1391 let v = serde_json::to_value(&event).unwrap();
1392 assert_eq!(v["error_category"], "schema_validation");
1393 assert_eq!(v["error"], "missing required field");
1394 }
1395
1396 #[test]
1397 fn tool_call_update_omits_executor_when_absent() {
1398 let event = AgentEvent::ToolCallUpdate {
1402 session_id: "s".into(),
1403 tool_call_id: "tc-1".into(),
1404 tool_name: "read".into(),
1405 status: ToolCallStatus::Completed,
1406 raw_output: None,
1407 error: None,
1408 duration_ms: None,
1409 execution_duration_ms: None,
1410 error_category: None,
1411 executor: None,
1412 parsing: None,
1413
1414 raw_input: None,
1415 raw_input_partial: None,
1416 audit: None,
1417 };
1418 let json = serde_json::to_value(&event).unwrap();
1419 assert!(json.get("executor").is_none(), "got: {json}");
1420 }
1421
1422 #[test]
1423 fn worker_event_status_strings_cover_all_variants() {
1424 assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1429 assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1430 assert_eq!(
1431 WorkerEvent::WorkerWaitingForInput.as_status(),
1432 "awaiting_input"
1433 );
1434 assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1435 assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1436 assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1437
1438 for terminal in [
1439 WorkerEvent::WorkerCompleted,
1440 WorkerEvent::WorkerFailed,
1441 WorkerEvent::WorkerCancelled,
1442 ] {
1443 assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1444 }
1445 for non_terminal in [
1446 WorkerEvent::WorkerSpawned,
1447 WorkerEvent::WorkerProgressed,
1448 WorkerEvent::WorkerWaitingForInput,
1449 ] {
1450 assert!(
1451 !non_terminal.is_terminal(),
1452 "{non_terminal:?} should not be terminal"
1453 );
1454 }
1455 }
1456
1457 #[test]
1458 fn worker_update_event_routes_through_session_keyed_sink() {
1459 reset_all_sinks();
1464 let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1465 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1466 impl AgentEventSink for CapturingSink {
1467 fn handle_event(&self, event: &AgentEvent) {
1468 self.0
1469 .lock()
1470 .expect("captured sink mutex poisoned")
1471 .push(event.clone());
1472 }
1473 }
1474 register_sink(
1475 "worker-session-1",
1476 Arc::new(CapturingSink(captured.clone())),
1477 );
1478 emit_event(&AgentEvent::WorkerUpdate {
1479 session_id: "worker-session-1".into(),
1480 worker_id: "worker_42".into(),
1481 worker_name: "review_captain".into(),
1482 worker_task: "review pr".into(),
1483 worker_mode: "delegated_stage".into(),
1484 event: WorkerEvent::WorkerWaitingForInput,
1485 status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1486 metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1487 audit: None,
1488 });
1489 emit_event(&AgentEvent::WorkerUpdate {
1491 session_id: "other-session".into(),
1492 worker_id: "w2".into(),
1493 worker_name: "n2".into(),
1494 worker_task: "t2".into(),
1495 worker_mode: "delegated_stage".into(),
1496 event: WorkerEvent::WorkerCompleted,
1497 status: "completed".into(),
1498 metadata: serde_json::json!({}),
1499 audit: None,
1500 });
1501 let received = captured.lock().unwrap().clone();
1502 assert_eq!(received.len(), 1, "got: {received:?}");
1503 match &received[0] {
1504 AgentEvent::WorkerUpdate {
1505 session_id,
1506 worker_id,
1507 event,
1508 status,
1509 ..
1510 } => {
1511 assert_eq!(session_id, "worker-session-1");
1512 assert_eq!(worker_id, "worker_42");
1513 assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1514 assert_eq!(status, "awaiting_input");
1515 }
1516 other => panic!("expected WorkerUpdate, got {other:?}"),
1517 }
1518 reset_all_sinks();
1519 }
1520
1521 #[test]
1522 fn worker_update_event_serializes_to_canonical_shape() {
1523 let event = AgentEvent::WorkerUpdate {
1529 session_id: "s".into(),
1530 worker_id: "w".into(),
1531 worker_name: "n".into(),
1532 worker_task: "t".into(),
1533 worker_mode: "delegated_stage".into(),
1534 event: WorkerEvent::WorkerProgressed,
1535 status: "progressed".into(),
1536 metadata: serde_json::json!({"started_at": "0193..."}),
1537 audit: Some(serde_json::json!({"run_id": "run_x"})),
1538 };
1539 let value = serde_json::to_value(&event).unwrap();
1540 assert_eq!(value["type"], "worker_update");
1541 assert_eq!(value["session_id"], "s");
1542 assert_eq!(value["worker_id"], "w");
1543 assert_eq!(value["status"], "progressed");
1544 assert_eq!(value["audit"]["run_id"], "run_x");
1545
1546 let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1550 match recovered {
1551 AgentEvent::WorkerUpdate {
1552 event: recovered_event,
1553 ..
1554 } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1555 other => panic!("expected WorkerUpdate, got {other:?}"),
1556 }
1557 }
1558
1559 #[test]
1560 fn tool_call_update_includes_executor_when_present() {
1561 let event = AgentEvent::ToolCallUpdate {
1562 session_id: "s".into(),
1563 tool_call_id: "tc-1".into(),
1564 tool_name: "read".into(),
1565 status: ToolCallStatus::Completed,
1566 raw_output: None,
1567 error: None,
1568 duration_ms: None,
1569 execution_duration_ms: None,
1570 error_category: None,
1571 executor: Some(ToolExecutor::McpServer {
1572 server_name: "github".into(),
1573 }),
1574 parsing: None,
1575
1576 raw_input: None,
1577 raw_input_partial: None,
1578 audit: None,
1579 };
1580 let json = serde_json::to_value(&event).unwrap();
1581 assert_eq!(json["executor"]["kind"], "mcp_server");
1582 assert_eq!(json["executor"]["server_name"], "github");
1583 }
1584
1585 #[test]
1586 fn tool_call_update_omits_audit_when_absent() {
1587 let event = AgentEvent::ToolCallUpdate {
1588 session_id: "s".into(),
1589 tool_call_id: "tc-1".into(),
1590 tool_name: "read".into(),
1591 status: ToolCallStatus::Completed,
1592 raw_output: None,
1593 error: None,
1594 duration_ms: None,
1595 execution_duration_ms: None,
1596 error_category: None,
1597 executor: None,
1598 parsing: None,
1599 raw_input: None,
1600 raw_input_partial: None,
1601 audit: None,
1602 };
1603 let json = serde_json::to_value(&event).unwrap();
1604 assert!(json.get("audit").is_none(), "got: {json}");
1605 }
1606
1607 #[test]
1608 fn tool_call_update_includes_audit_when_present() {
1609 let audit = MutationSessionRecord {
1610 session_id: "session_42".into(),
1611 run_id: Some("run_42".into()),
1612 mutation_scope: "apply_workspace".into(),
1613 execution_kind: Some("worker".into()),
1614 ..Default::default()
1615 };
1616 let event = AgentEvent::ToolCallUpdate {
1617 session_id: "s".into(),
1618 tool_call_id: "tc-1".into(),
1619 tool_name: "edit_file".into(),
1620 status: ToolCallStatus::Completed,
1621 raw_output: None,
1622 error: None,
1623 duration_ms: None,
1624 execution_duration_ms: None,
1625 error_category: None,
1626 executor: Some(ToolExecutor::HostBridge),
1627 parsing: None,
1628 raw_input: None,
1629 raw_input_partial: None,
1630 audit: Some(audit),
1631 };
1632 let json = serde_json::to_value(&event).unwrap();
1633 assert_eq!(json["audit"]["session_id"], "session_42");
1634 assert_eq!(json["audit"]["run_id"], "run_42");
1635 assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1636 assert_eq!(json["audit"]["execution_kind"], "worker");
1637 }
1638
1639 #[test]
1640 fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1641 let raw = serde_json::json!({
1642 "type": "tool_call_update",
1643 "session_id": "s",
1644 "tool_call_id": "tc-1",
1645 "tool_name": "read",
1646 "status": "completed",
1647 "raw_output": null,
1648 "error": null,
1649 });
1650 let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1651 match event {
1652 AgentEvent::ToolCallUpdate { audit, .. } => {
1653 assert!(audit.is_none());
1654 }
1655 other => panic!("expected ToolCallUpdate, got {other:?}"),
1656 }
1657 }
1658}