1use std::cell::RefCell;
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27use crate::value::VmValue;
28
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum ToolCallStatus {
33 Pending,
35 InProgress,
37 Completed,
39 Failed,
41}
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(tag = "type", rename_all = "snake_case")]
47pub enum AgentEvent {
48 AgentMessageChunk {
49 session_id: String,
50 content: String,
51 },
52 AgentThoughtChunk {
53 session_id: String,
54 content: String,
55 },
56 ToolCall {
57 session_id: String,
58 tool_call_id: String,
59 tool_name: String,
60 kind: Option<ToolKind>,
61 status: ToolCallStatus,
62 raw_input: serde_json::Value,
63 },
64 ToolCallUpdate {
65 session_id: String,
66 tool_call_id: String,
67 tool_name: String,
68 status: ToolCallStatus,
69 raw_output: Option<serde_json::Value>,
70 error: Option<String>,
71 },
72 Plan {
73 session_id: String,
74 plan: serde_json::Value,
75 },
76 TurnStart {
77 session_id: String,
78 iteration: usize,
79 },
80 TurnEnd {
81 session_id: String,
82 iteration: usize,
83 turn_info: serde_json::Value,
84 },
85 FeedbackInjected {
86 session_id: String,
87 kind: String,
88 content: String,
89 },
90 BudgetExhausted {
94 session_id: String,
95 max_iterations: usize,
96 },
97 LoopStuck {
101 session_id: String,
102 max_nudges: usize,
103 last_iteration: usize,
104 tail_excerpt: String,
105 },
106 DaemonWatchdogTripped {
111 session_id: String,
112 attempts: usize,
113 elapsed_ms: u64,
114 },
115}
116
117impl AgentEvent {
118 pub fn session_id(&self) -> &str {
119 match self {
120 Self::AgentMessageChunk { session_id, .. }
121 | Self::AgentThoughtChunk { session_id, .. }
122 | Self::ToolCall { session_id, .. }
123 | Self::ToolCallUpdate { session_id, .. }
124 | Self::Plan { session_id, .. }
125 | Self::TurnStart { session_id, .. }
126 | Self::TurnEnd { session_id, .. }
127 | Self::FeedbackInjected { session_id, .. }
128 | Self::BudgetExhausted { session_id, .. }
129 | Self::LoopStuck { session_id, .. }
130 | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
131 }
132 }
133}
134
135pub trait AgentEventSink: Send + Sync {
138 fn handle_event(&self, event: &AgentEvent);
139}
140
141pub struct MultiSink {
143 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
144}
145
146impl MultiSink {
147 pub fn new() -> Self {
148 Self {
149 sinks: Mutex::new(Vec::new()),
150 }
151 }
152 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
153 self.sinks.lock().expect("sink mutex poisoned").push(sink);
154 }
155 pub fn len(&self) -> usize {
156 self.sinks.lock().expect("sink mutex poisoned").len()
157 }
158 pub fn is_empty(&self) -> bool {
159 self.len() == 0
160 }
161}
162
163impl Default for MultiSink {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169impl AgentEventSink for MultiSink {
170 fn handle_event(&self, event: &AgentEvent) {
171 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
177 for sink in sinks {
178 sink.handle_event(event);
179 }
180 }
181}
182
183type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
184
185fn external_sinks() -> &'static ExternalSinkRegistry {
186 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
187 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
188}
189
190thread_local! {
196 static CLOSURE_SUBSCRIBERS: RefCell<HashMap<String, Vec<VmValue>>> =
197 RefCell::new(HashMap::new());
198}
199
200pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
201 let session_id = session_id.into();
202 let mut reg = external_sinks().write().expect("sink registry poisoned");
203 reg.entry(session_id).or_default().push(sink);
204}
205
206pub fn register_closure_subscriber(session_id: impl Into<String>, closure: VmValue) {
207 let session_id = session_id.into();
208 CLOSURE_SUBSCRIBERS.with(|reg| {
209 reg.borrow_mut()
210 .entry(session_id)
211 .or_default()
212 .push(closure);
213 });
214}
215
216pub fn closure_subscribers_for(session_id: &str) -> Vec<VmValue> {
217 CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).cloned().unwrap_or_default())
218}
219
220pub fn clear_session_sinks(session_id: &str) {
221 external_sinks()
222 .write()
223 .expect("sink registry poisoned")
224 .remove(session_id);
225 CLOSURE_SUBSCRIBERS.with(|reg| {
226 reg.borrow_mut().remove(session_id);
227 });
228}
229
230pub fn reset_all_sinks() {
231 external_sinks()
232 .write()
233 .expect("sink registry poisoned")
234 .clear();
235 CLOSURE_SUBSCRIBERS.with(|reg| {
236 reg.borrow_mut().clear();
237 });
238}
239
240pub fn emit_event(event: &AgentEvent) {
244 let sinks: Vec<Arc<dyn AgentEventSink>> = {
245 let reg = external_sinks().read().expect("sink registry poisoned");
246 reg.get(event.session_id()).cloned().unwrap_or_default()
247 };
248 for sink in sinks {
249 sink.handle_event(event);
250 }
251}
252
253pub fn session_external_sink_count(session_id: &str) -> usize {
254 external_sinks()
255 .read()
256 .expect("sink registry poisoned")
257 .get(session_id)
258 .map(|v| v.len())
259 .unwrap_or(0)
260}
261
262pub fn session_closure_subscriber_count(session_id: &str) -> usize {
263 CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).map(|v| v.len()).unwrap_or(0))
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use std::sync::atomic::{AtomicUsize, Ordering};
270
271 struct CountingSink(Arc<AtomicUsize>);
272 impl AgentEventSink for CountingSink {
273 fn handle_event(&self, _event: &AgentEvent) {
274 self.0.fetch_add(1, Ordering::SeqCst);
275 }
276 }
277
278 #[test]
279 fn multi_sink_fans_out_in_order() {
280 let multi = MultiSink::new();
281 let a = Arc::new(AtomicUsize::new(0));
282 let b = Arc::new(AtomicUsize::new(0));
283 multi.push(Arc::new(CountingSink(a.clone())));
284 multi.push(Arc::new(CountingSink(b.clone())));
285 let event = AgentEvent::TurnStart {
286 session_id: "s1".into(),
287 iteration: 1,
288 };
289 multi.handle_event(&event);
290 assert_eq!(a.load(Ordering::SeqCst), 1);
291 assert_eq!(b.load(Ordering::SeqCst), 1);
292 }
293
294 #[test]
295 fn session_scoped_sink_routing() {
296 reset_all_sinks();
297 let a = Arc::new(AtomicUsize::new(0));
298 let b = Arc::new(AtomicUsize::new(0));
299 register_sink("session-a", Arc::new(CountingSink(a.clone())));
300 register_sink("session-b", Arc::new(CountingSink(b.clone())));
301 emit_event(&AgentEvent::TurnStart {
302 session_id: "session-a".into(),
303 iteration: 0,
304 });
305 assert_eq!(a.load(Ordering::SeqCst), 1);
306 assert_eq!(b.load(Ordering::SeqCst), 0);
307 emit_event(&AgentEvent::TurnEnd {
308 session_id: "session-b".into(),
309 iteration: 0,
310 turn_info: serde_json::json!({}),
311 });
312 assert_eq!(a.load(Ordering::SeqCst), 1);
313 assert_eq!(b.load(Ordering::SeqCst), 1);
314 clear_session_sinks("session-a");
315 assert_eq!(session_external_sink_count("session-a"), 0);
316 assert_eq!(session_external_sink_count("session-b"), 1);
317 reset_all_sinks();
318 }
319
320 #[test]
321 fn tool_call_status_serde() {
322 assert_eq!(
323 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
324 "\"pending\""
325 );
326 assert_eq!(
327 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
328 "\"in_progress\""
329 );
330 assert_eq!(
331 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
332 "\"completed\""
333 );
334 assert_eq!(
335 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
336 "\"failed\""
337 );
338 }
339}