1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32 Pending,
34 InProgress,
36 Completed,
38 Failed,
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47 AgentMessageChunk {
48 session_id: String,
49 content: String,
50 },
51 AgentThoughtChunk {
52 session_id: String,
53 content: String,
54 },
55 ToolCall {
56 session_id: String,
57 tool_call_id: String,
58 tool_name: String,
59 kind: Option<ToolKind>,
60 status: ToolCallStatus,
61 raw_input: serde_json::Value,
62 },
63 ToolCallUpdate {
64 session_id: String,
65 tool_call_id: String,
66 tool_name: String,
67 status: ToolCallStatus,
68 raw_output: Option<serde_json::Value>,
69 error: Option<String>,
70 },
71 Plan {
72 session_id: String,
73 plan: serde_json::Value,
74 },
75 TurnStart {
76 session_id: String,
77 iteration: usize,
78 },
79 TurnEnd {
80 session_id: String,
81 iteration: usize,
82 turn_info: serde_json::Value,
83 },
84 FeedbackInjected {
85 session_id: String,
86 kind: String,
87 content: String,
88 },
89 BudgetExhausted {
93 session_id: String,
94 max_iterations: usize,
95 },
96 LoopStuck {
100 session_id: String,
101 max_nudges: usize,
102 last_iteration: usize,
103 tail_excerpt: String,
104 },
105 DaemonWatchdogTripped {
110 session_id: String,
111 attempts: usize,
112 elapsed_ms: u64,
113 },
114}
115
116impl AgentEvent {
117 pub fn session_id(&self) -> &str {
118 match self {
119 Self::AgentMessageChunk { session_id, .. }
120 | Self::AgentThoughtChunk { session_id, .. }
121 | Self::ToolCall { session_id, .. }
122 | Self::ToolCallUpdate { session_id, .. }
123 | Self::Plan { session_id, .. }
124 | Self::TurnStart { session_id, .. }
125 | Self::TurnEnd { session_id, .. }
126 | Self::FeedbackInjected { session_id, .. }
127 | Self::BudgetExhausted { session_id, .. }
128 | Self::LoopStuck { session_id, .. }
129 | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
130 }
131 }
132}
133
134pub trait AgentEventSink: Send + Sync {
137 fn handle_event(&self, event: &AgentEvent);
138}
139
140pub struct MultiSink {
142 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
143}
144
145impl MultiSink {
146 pub fn new() -> Self {
147 Self {
148 sinks: Mutex::new(Vec::new()),
149 }
150 }
151 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
152 self.sinks.lock().expect("sink mutex poisoned").push(sink);
153 }
154 pub fn len(&self) -> usize {
155 self.sinks.lock().expect("sink mutex poisoned").len()
156 }
157 pub fn is_empty(&self) -> bool {
158 self.len() == 0
159 }
160}
161
162impl Default for MultiSink {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168impl AgentEventSink for MultiSink {
169 fn handle_event(&self, event: &AgentEvent) {
170 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
176 for sink in sinks {
177 sink.handle_event(event);
178 }
179 }
180}
181
182type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
183
184fn external_sinks() -> &'static ExternalSinkRegistry {
185 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
186 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
187}
188
189pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
190 let session_id = session_id.into();
191 let mut reg = external_sinks().write().expect("sink registry poisoned");
192 reg.entry(session_id).or_default().push(sink);
193}
194
195pub fn clear_session_sinks(session_id: &str) {
199 external_sinks()
200 .write()
201 .expect("sink registry poisoned")
202 .remove(session_id);
203}
204
205pub fn reset_all_sinks() {
206 external_sinks()
207 .write()
208 .expect("sink registry poisoned")
209 .clear();
210 crate::agent_sessions::reset_session_store();
211}
212
213pub fn emit_event(event: &AgentEvent) {
217 let sinks: Vec<Arc<dyn AgentEventSink>> = {
218 let reg = external_sinks().read().expect("sink registry poisoned");
219 reg.get(event.session_id()).cloned().unwrap_or_default()
220 };
221 for sink in sinks {
222 sink.handle_event(event);
223 }
224}
225
226pub fn session_external_sink_count(session_id: &str) -> usize {
227 external_sinks()
228 .read()
229 .expect("sink registry poisoned")
230 .get(session_id)
231 .map(|v| v.len())
232 .unwrap_or(0)
233}
234
235pub fn session_closure_subscriber_count(session_id: &str) -> usize {
236 crate::agent_sessions::subscriber_count(session_id)
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use std::sync::atomic::{AtomicUsize, Ordering};
243
244 struct CountingSink(Arc<AtomicUsize>);
245 impl AgentEventSink for CountingSink {
246 fn handle_event(&self, _event: &AgentEvent) {
247 self.0.fetch_add(1, Ordering::SeqCst);
248 }
249 }
250
251 #[test]
252 fn multi_sink_fans_out_in_order() {
253 let multi = MultiSink::new();
254 let a = Arc::new(AtomicUsize::new(0));
255 let b = Arc::new(AtomicUsize::new(0));
256 multi.push(Arc::new(CountingSink(a.clone())));
257 multi.push(Arc::new(CountingSink(b.clone())));
258 let event = AgentEvent::TurnStart {
259 session_id: "s1".into(),
260 iteration: 1,
261 };
262 multi.handle_event(&event);
263 assert_eq!(a.load(Ordering::SeqCst), 1);
264 assert_eq!(b.load(Ordering::SeqCst), 1);
265 }
266
267 #[test]
268 fn session_scoped_sink_routing() {
269 reset_all_sinks();
270 let a = Arc::new(AtomicUsize::new(0));
271 let b = Arc::new(AtomicUsize::new(0));
272 register_sink("session-a", Arc::new(CountingSink(a.clone())));
273 register_sink("session-b", Arc::new(CountingSink(b.clone())));
274 emit_event(&AgentEvent::TurnStart {
275 session_id: "session-a".into(),
276 iteration: 0,
277 });
278 assert_eq!(a.load(Ordering::SeqCst), 1);
279 assert_eq!(b.load(Ordering::SeqCst), 0);
280 emit_event(&AgentEvent::TurnEnd {
281 session_id: "session-b".into(),
282 iteration: 0,
283 turn_info: serde_json::json!({}),
284 });
285 assert_eq!(a.load(Ordering::SeqCst), 1);
286 assert_eq!(b.load(Ordering::SeqCst), 1);
287 clear_session_sinks("session-a");
288 assert_eq!(session_external_sink_count("session-a"), 0);
289 assert_eq!(session_external_sink_count("session-b"), 1);
290 reset_all_sinks();
291 }
292
293 #[test]
294 fn tool_call_status_serde() {
295 assert_eq!(
296 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
297 "\"pending\""
298 );
299 assert_eq!(
300 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
301 "\"in_progress\""
302 );
303 assert_eq!(
304 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
305 "\"completed\""
306 );
307 assert_eq!(
308 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
309 "\"failed\""
310 );
311 }
312}