1use std::cell::RefCell;
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27use crate::value::VmValue;
28
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum ToolCallStatus {
33 Pending,
35 InProgress,
37 Completed,
39 Failed,
41}
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(tag = "type", rename_all = "snake_case")]
47pub enum AgentEvent {
48 AgentMessageChunk {
49 session_id: String,
50 content: String,
51 },
52 AgentThoughtChunk {
53 session_id: String,
54 content: String,
55 },
56 ToolCall {
57 session_id: String,
58 tool_call_id: String,
59 tool_name: String,
60 kind: Option<ToolKind>,
61 status: ToolCallStatus,
62 raw_input: serde_json::Value,
63 },
64 ToolCallUpdate {
65 session_id: String,
66 tool_call_id: String,
67 tool_name: String,
68 status: ToolCallStatus,
69 raw_output: Option<serde_json::Value>,
70 error: Option<String>,
71 },
72 Plan {
73 session_id: String,
74 plan: serde_json::Value,
75 },
76 TurnStart {
78 session_id: String,
79 iteration: usize,
80 },
81 TurnEnd {
82 session_id: String,
83 iteration: usize,
84 turn_info: serde_json::Value,
85 },
86 FeedbackInjected {
87 session_id: String,
88 kind: String,
89 content: String,
90 },
91 BudgetExhausted {
95 session_id: String,
96 max_iterations: usize,
97 },
98 LoopStuck {
102 session_id: String,
103 max_nudges: usize,
104 last_iteration: usize,
105 tail_excerpt: String,
106 },
107 DaemonWatchdogTripped {
112 session_id: String,
113 attempts: usize,
114 elapsed_ms: u64,
115 },
116}
117
118impl AgentEvent {
119 pub fn session_id(&self) -> &str {
120 match self {
121 Self::AgentMessageChunk { session_id, .. }
122 | Self::AgentThoughtChunk { session_id, .. }
123 | Self::ToolCall { session_id, .. }
124 | Self::ToolCallUpdate { session_id, .. }
125 | Self::Plan { session_id, .. }
126 | Self::TurnStart { session_id, .. }
127 | Self::TurnEnd { session_id, .. }
128 | Self::FeedbackInjected { session_id, .. }
129 | Self::BudgetExhausted { session_id, .. }
130 | Self::LoopStuck { session_id, .. }
131 | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
132 }
133 }
134}
135
136pub trait AgentEventSink: Send + Sync {
139 fn handle_event(&self, event: &AgentEvent);
140}
141
142pub struct MultiSink {
144 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
145}
146
147impl MultiSink {
148 pub fn new() -> Self {
149 Self {
150 sinks: Mutex::new(Vec::new()),
151 }
152 }
153 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
154 self.sinks.lock().expect("sink mutex poisoned").push(sink);
155 }
156 pub fn len(&self) -> usize {
157 self.sinks.lock().expect("sink mutex poisoned").len()
158 }
159 pub fn is_empty(&self) -> bool {
160 self.len() == 0
161 }
162}
163
164impl Default for MultiSink {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170impl AgentEventSink for MultiSink {
171 fn handle_event(&self, event: &AgentEvent) {
172 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
178 for sink in sinks {
179 sink.handle_event(event);
180 }
181 }
182}
183
184type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
187
188fn external_sinks() -> &'static ExternalSinkRegistry {
189 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
190 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
191}
192
193thread_local! {
199 static CLOSURE_SUBSCRIBERS: RefCell<HashMap<String, Vec<VmValue>>> =
200 RefCell::new(HashMap::new());
201}
202
203pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
205 let session_id = session_id.into();
206 let mut reg = external_sinks().write().expect("sink registry poisoned");
207 reg.entry(session_id).or_default().push(sink);
208}
209
210pub fn register_closure_subscriber(session_id: impl Into<String>, closure: VmValue) {
211 let session_id = session_id.into();
212 CLOSURE_SUBSCRIBERS.with(|reg| {
213 reg.borrow_mut()
214 .entry(session_id)
215 .or_default()
216 .push(closure);
217 });
218}
219
220pub fn closure_subscribers_for(session_id: &str) -> Vec<VmValue> {
221 CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).cloned().unwrap_or_default())
222}
223
224pub fn clear_session_sinks(session_id: &str) {
225 external_sinks()
226 .write()
227 .expect("sink registry poisoned")
228 .remove(session_id);
229 CLOSURE_SUBSCRIBERS.with(|reg| {
230 reg.borrow_mut().remove(session_id);
231 });
232}
233
234pub fn reset_all_sinks() {
235 external_sinks()
236 .write()
237 .expect("sink registry poisoned")
238 .clear();
239 CLOSURE_SUBSCRIBERS.with(|reg| {
240 reg.borrow_mut().clear();
241 });
242}
243
244pub fn emit_event(event: &AgentEvent) {
248 let sinks: Vec<Arc<dyn AgentEventSink>> = {
249 let reg = external_sinks().read().expect("sink registry poisoned");
250 reg.get(event.session_id()).cloned().unwrap_or_default()
251 };
252 for sink in sinks {
253 sink.handle_event(event);
254 }
255}
256
257pub fn session_external_sink_count(session_id: &str) -> usize {
258 external_sinks()
259 .read()
260 .expect("sink registry poisoned")
261 .get(session_id)
262 .map(|v| v.len())
263 .unwrap_or(0)
264}
265
266pub fn session_closure_subscriber_count(session_id: &str) -> usize {
267 CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).map(|v| v.len()).unwrap_or(0))
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use std::sync::atomic::{AtomicUsize, Ordering};
274
275 struct CountingSink(Arc<AtomicUsize>);
276 impl AgentEventSink for CountingSink {
277 fn handle_event(&self, _event: &AgentEvent) {
278 self.0.fetch_add(1, Ordering::SeqCst);
279 }
280 }
281
282 #[test]
283 fn multi_sink_fans_out_in_order() {
284 let multi = MultiSink::new();
285 let a = Arc::new(AtomicUsize::new(0));
286 let b = Arc::new(AtomicUsize::new(0));
287 multi.push(Arc::new(CountingSink(a.clone())));
288 multi.push(Arc::new(CountingSink(b.clone())));
289 let event = AgentEvent::TurnStart {
290 session_id: "s1".into(),
291 iteration: 1,
292 };
293 multi.handle_event(&event);
294 assert_eq!(a.load(Ordering::SeqCst), 1);
295 assert_eq!(b.load(Ordering::SeqCst), 1);
296 }
297
298 #[test]
299 fn session_scoped_sink_routing() {
300 reset_all_sinks();
301 let a = Arc::new(AtomicUsize::new(0));
302 let b = Arc::new(AtomicUsize::new(0));
303 register_sink("session-a", Arc::new(CountingSink(a.clone())));
304 register_sink("session-b", Arc::new(CountingSink(b.clone())));
305 emit_event(&AgentEvent::TurnStart {
306 session_id: "session-a".into(),
307 iteration: 0,
308 });
309 assert_eq!(a.load(Ordering::SeqCst), 1);
310 assert_eq!(b.load(Ordering::SeqCst), 0);
311 emit_event(&AgentEvent::TurnEnd {
312 session_id: "session-b".into(),
313 iteration: 0,
314 turn_info: serde_json::json!({}),
315 });
316 assert_eq!(a.load(Ordering::SeqCst), 1);
317 assert_eq!(b.load(Ordering::SeqCst), 1);
318 clear_session_sinks("session-a");
319 assert_eq!(session_external_sink_count("session-a"), 0);
320 assert_eq!(session_external_sink_count("session-b"), 1);
321 reset_all_sinks();
322 }
323
324 #[test]
325 fn tool_call_status_serde() {
326 assert_eq!(
327 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
328 "\"pending\""
329 );
330 assert_eq!(
331 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
332 "\"in_progress\""
333 );
334 assert_eq!(
335 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
336 "\"completed\""
337 );
338 assert_eq!(
339 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
340 "\"failed\""
341 );
342 }
343}