ai_agent/utils/
abort_controller.rs1use std::sync::atomic::Ordering;
4use std::sync::Arc;
5
6const DEFAULT_MAX_LISTENERS: usize = 50;
8
9pub fn create_abort_controller(max_listeners: usize) -> AbortController {
19 AbortController::new(max_listeners)
20}
21
22pub fn create_abort_controller_default() -> AbortController {
24 create_abort_controller(DEFAULT_MAX_LISTENERS)
25}
26
27pub struct AbortController {
30 signal: Arc<AbortSignal>,
31}
32
33impl AbortController {
34 pub fn new(max_listeners: usize) -> Self {
36 Self {
37 signal: Arc::new(AbortSignal::new(max_listeners)),
38 }
39 }
40
41 pub fn signal(&self) -> &Arc<AbortSignal> {
43 &self.signal
44 }
45
46 pub fn abort(&self, reason: Option<Arc<dyn std::any::Any + Send + Sync>>) {
48 self.signal.abort(reason);
49 }
50
51 pub fn is_aborted(&self) -> bool {
53 self.signal.is_aborted()
54 }
55}
56
57impl Default for AbortController {
58 fn default() -> Self {
59 Self::new(DEFAULT_MAX_LISTENERS)
60 }
61}
62
63impl Clone for AbortController {
64 fn clone(&self) -> Self {
65 Self {
66 signal: Arc::clone(&self.signal),
67 }
68 }
69}
70
71pub struct AbortSignal {
73 aborted: std::sync::atomic::AtomicBool,
74 reason: std::sync::Mutex<Option<Arc<dyn std::any::Any + Send + Sync>>>,
75 listeners: std::sync::Mutex<Vec<AbortCallback>>,
76 max_listeners: usize,
77}
78
79pub type AbortCallback = Box<dyn Fn(Option<&dyn std::any::Any>) + Send + Sync>;
80
81impl AbortSignal {
82 pub fn new(max_listeners: usize) -> Self {
84 Self {
85 aborted: std::sync::atomic::AtomicBool::new(false),
86 reason: std::sync::Mutex::new(None),
87 listeners: std::sync::Mutex::new(Vec::new()),
88 max_listeners,
89 }
90 }
91
92 pub fn is_aborted(&self) -> bool {
94 self.aborted.load(Ordering::SeqCst)
95 }
96
97 pub fn reason(&self) -> Option<Arc<dyn std::any::Any + Send + Sync>> {
99 self.reason.lock().ok().and_then(|guard| guard.clone())
100 }
101
102 pub fn abort(&self, reason: Option<Arc<dyn std::any::Any + Send + Sync>>) {
104 if self.aborted.swap(true, Ordering::SeqCst) {
105 return; }
107
108 *self.reason.lock().unwrap() = reason.clone();
109
110 let reason_ref = reason.as_deref().map(|a| a as &dyn std::any::Any);
113 for listener in self.listeners.lock().unwrap().iter() {
114 listener(reason_ref);
115 }
116 }
117
118 pub fn add_event_listener(&self, callback: AbortCallback) -> usize {
121 let mut listeners = self.listeners.lock().unwrap();
122 if listeners.len() >= self.max_listeners {
123 log::warn!(
124 "Max listeners ({}) exceeded for AbortSignal",
125 self.max_listeners
126 );
127 }
128 listeners.push(callback);
129 listeners.len()
130 }
131
132 #[allow(dead_code)]
134 pub fn remove_event_listener(&self, _callback: &AbortCallback) {
135 }
138
139 #[allow(dead_code)]
141 pub fn listener_count(&self) -> usize {
142 self.listeners.lock().unwrap().len()
143 }
144}
145
146impl Default for AbortSignal {
147 fn default() -> Self {
148 Self::new(DEFAULT_MAX_LISTENERS)
149 }
150}
151
152impl Clone for AbortSignal {
153 fn clone(&self) -> Self {
154 Self {
155 aborted: std::sync::atomic::AtomicBool::new(self.aborted.load(Ordering::SeqCst)),
156 reason: std::sync::Mutex::new(self.reason.lock().ok().and_then(|g| g.clone())),
157 listeners: std::sync::Mutex::new(Vec::new()), max_listeners: self.max_listeners,
159 }
160 }
161}
162
163#[allow(dead_code)]
178pub fn create_child_abort_controller(
179 parent: &AbortController,
180 max_listeners: Option<usize>,
181) -> AbortController {
182 let max_listeners = max_listeners.unwrap_or(DEFAULT_MAX_LISTENERS);
183 let child = AbortController::new(max_listeners);
184
185 if parent.is_aborted() {
187 child.abort(parent.signal.reason());
188 return child;
189 }
190
191 let child_signal = Arc::clone(&child.signal);
193 let parent_signal = Arc::clone(parent.signal());
194
195 let reason = parent_signal.reason();
197
198 parent_signal.add_event_listener(Box::new(move |_reason| {
201 child_signal.abort(reason.clone());
203 }));
204
205 child
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn test_create_abort_controller() {
214 let controller = create_abort_controller(50);
215 assert!(!controller.is_aborted());
216 }
217
218 #[test]
219 fn test_abort_controller_abort() {
220 let controller = create_abort_controller(50);
221 controller.abort(None);
222 assert!(controller.is_aborted());
223 }
224
225 #[test]
226 fn test_abort_with_reason() {
227 let controller = create_abort_controller(50);
228 let reason = Arc::new("test reason".to_string()) as Arc<dyn std::any::Any + Send + Sync>;
229 controller.abort(Some(reason));
230
231 assert!(controller.is_aborted());
232 let stored_reason = controller.signal().reason();
233 assert!(stored_reason.is_some());
234 }
235
236 #[test]
237 fn test_abort_listener() {
238 let controller = create_abort_controller(50);
239 let called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
240 let called_clone = called.clone();
241
242 controller
243 .signal()
244 .add_event_listener(Box::new(move |_reason| {
245 called.store(true, std::sync::atomic::Ordering::SeqCst);
246 }));
247
248 controller.abort(None);
249 assert!(called_clone.load(std::sync::atomic::Ordering::SeqCst));
250 }
251
252 #[test]
253 fn test_child_abort_controller() {
254 let parent = create_abort_controller(50);
255 let child = create_child_abort_controller(&parent, None);
256
257 assert!(!parent.is_aborted());
258 assert!(!child.is_aborted());
259
260 parent.abort(None);
261
262 assert!(parent.is_aborted());
263 assert!(child.is_aborted());
264 }
265
266 #[test]
267 fn test_child_already_aborted_parent() {
268 let parent = create_abort_controller(50);
269 parent.abort(None);
270
271 let child = create_child_abort_controller(&parent, None);
272
273 assert!(child.is_aborted());
274 }
275}