registry_io/async_registry/mod.rs
1//! Asynchronous, lock-free event registry.
2//!
3//! [`AsyncRegistry`] mirrors [`crate::SyncRegistry`] for `async fn` handlers.
4//! Handlers return a future that the registry drives — either concurrently
5//! via [`AsyncRegistry::notify`] or sequentially via
6//! [`AsyncRegistry::notify_sequential`].
7//!
8//! Module gated behind the `async` feature flag.
9
10use core::any::Any;
11use core::fmt;
12use core::future::Future;
13use core::pin::Pin;
14use std::sync::Arc;
15
16use arc_swap::{ArcSwap, ArcSwapOption};
17
18use crate::HandlerId;
19use crate::future_ext::{CatchUnwind, JoinAll};
20use crate::handler_id::HandlerIdGenerator;
21use crate::panic::{PanicCallbackHolder, PanicInfo};
22
23/// `Pin<Box<dyn Future<Output = T> + Send + 'static>>` — the type-erased
24/// boxed future stored inside the registry. Defined locally rather than
25/// pulled from `futures-util` to keep the dependency surface minimal.
26type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
27
28pub mod guard;
29pub use guard::AsyncHandlerGuard;
30
31/// A boxed async handler closure stored inside the registry.
32///
33/// The returned future is `'static` so it must not borrow from the event.
34/// Handlers that need to retain event data should `.clone()` it inside the
35/// closure before returning the future.
36type StoredAsyncHandler<E> = Arc<dyn Fn(&E) -> BoxFuture<()> + Send + Sync + 'static>;
37
38/// One entry in the async handler list.
39struct AsyncHandlerEntry<E: Send + Sync + 'static> {
40 id: HandlerId,
41 priority: i32,
42 handler: StoredAsyncHandler<E>,
43}
44
45impl<E: Send + Sync + 'static> Clone for AsyncHandlerEntry<E> {
46 #[inline]
47 fn clone(&self) -> Self {
48 Self {
49 id: self.id,
50 priority: self.priority,
51 handler: Arc::clone(&self.handler),
52 }
53 }
54}
55
56impl<E: Send + Sync + 'static> fmt::Debug for AsyncHandlerEntry<E> {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.debug_struct("AsyncHandlerEntry")
59 .field("id", &self.id)
60 .field("priority", &self.priority)
61 .finish_non_exhaustive()
62 }
63}
64
65/// Asynchronous event registry.
66///
67/// Same lock-free, `ArcSwap`-backed read path as [`crate::SyncRegistry`], but
68/// handlers return a future of `()`. Two dispatch modes are available:
69///
70/// - [`AsyncRegistry::notify`] — drives every handler concurrently via a
71/// crate-local `JoinAll` combinator. Faster total wall-clock for
72/// handlers that perform real `.await` work, since they make progress in
73/// parallel under the runtime.
74/// - [`AsyncRegistry::notify_sequential`] — awaits each handler in order.
75/// Use when downstream ordering or back-pressure between handlers matters.
76///
77/// Each handler future is wrapped in a crate-local `CatchUnwind` adapter
78/// so a panic during `poll` is isolated from sibling handlers and from the
79/// caller awaiting `notify`.
80///
81/// # Type parameter
82///
83/// `E` is the event type. Handlers receive `&E` but return a `'static`
84/// future, so they must `clone` whatever they need from `&E` before
85/// `async move { ... }`.
86///
87/// # Examples
88///
89/// ```
90/// # #[tokio::main(flavor = "current_thread")]
91/// # async fn main() {
92/// use std::sync::Arc;
93/// use std::sync::atomic::{AtomicU64, Ordering};
94/// use registry_io::r#async::AsyncRegistry;
95///
96/// let registry: AsyncRegistry<u64> = AsyncRegistry::new();
97/// let total = Arc::new(AtomicU64::new(0));
98///
99/// let sink = Arc::clone(&total);
100/// let _ = registry.register(move |value| {
101/// let sink = Arc::clone(&sink);
102/// let v = *value;
103/// async move {
104/// sink.fetch_add(v, Ordering::Relaxed);
105/// }
106/// });
107///
108/// registry.notify(&7).await;
109/// assert_eq!(total.load(Ordering::Relaxed), 7);
110/// # }
111/// ```
112pub struct AsyncRegistry<E: Send + Sync + 'static> {
113 handlers: ArcSwap<Vec<AsyncHandlerEntry<E>>>,
114 id_generator: HandlerIdGenerator,
115 panic_callback: ArcSwapOption<PanicCallbackHolder>,
116}
117
118impl<E: Send + Sync + 'static> AsyncRegistry<E> {
119 /// Create a new, empty async registry.
120 ///
121 /// # Examples
122 ///
123 /// ```
124 /// use registry_io::r#async::AsyncRegistry;
125 ///
126 /// let registry: AsyncRegistry<u32> = AsyncRegistry::new();
127 /// assert!(registry.is_empty());
128 /// ```
129 #[must_use]
130 pub fn new() -> Self {
131 Self {
132 handlers: ArcSwap::from_pointee(Vec::new()),
133 id_generator: HandlerIdGenerator::new(),
134 panic_callback: ArcSwapOption::empty(),
135 }
136 }
137
138 /// Create a new, empty async registry with pre-allocated handler
139 /// capacity.
140 ///
141 /// # Examples
142 ///
143 /// ```
144 /// use registry_io::r#async::AsyncRegistry;
145 ///
146 /// let registry: AsyncRegistry<u64> = AsyncRegistry::with_capacity(16);
147 /// assert!(registry.is_empty());
148 /// ```
149 #[must_use]
150 pub fn with_capacity(capacity: usize) -> Self {
151 Self {
152 handlers: ArcSwap::from_pointee(Vec::with_capacity(capacity)),
153 id_generator: HandlerIdGenerator::new(),
154 panic_callback: ArcSwapOption::empty(),
155 }
156 }
157
158 /// Register an async handler at the default priority (`0`).
159 ///
160 /// The handler is a closure `Fn(&E) -> impl Future<Output = ()>`. The
161 /// returned future must be `'static`: clone any data from `&E` you need
162 /// before the inner `async move { ... }`.
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// use registry_io::r#async::AsyncRegistry;
168 ///
169 /// let registry: AsyncRegistry<String> = AsyncRegistry::new();
170 /// let _ = registry.register(|event| {
171 /// let owned = event.clone();
172 /// async move {
173 /// // Pretend we awaited something useful here.
174 /// let _ = owned.len();
175 /// }
176 /// });
177 /// ```
178 pub fn register<F, Fut>(&self, handler: F) -> HandlerId
179 where
180 F: Fn(&E) -> Fut + Send + Sync + 'static,
181 Fut: Future<Output = ()> + Send + 'static,
182 {
183 self.register_with_priority(0, handler)
184 }
185
186 /// Register an async handler with an explicit priority.
187 ///
188 /// Dispatch order at notify time follows the same rule as
189 /// [`crate::SyncRegistry::register_with_priority`]: higher priority
190 /// fires first, ties broken in registration order. In concurrent
191 /// dispatch ([`AsyncRegistry::notify`]) priority controls the order in
192 /// which futures are *spawned* into the join, not the order they
193 /// complete in.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use registry_io::r#async::AsyncRegistry;
199 ///
200 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
201 /// let _ = registry.register_with_priority(100, |_| async move {});
202 /// let _ = registry.register(|_| async move {});
203 /// let _ = registry.register_with_priority(-10, |_| async move {});
204 /// assert_eq!(registry.handler_count(), 3);
205 /// ```
206 pub fn register_with_priority<F, Fut>(&self, priority: i32, handler: F) -> HandlerId
207 where
208 F: Fn(&E) -> Fut + Send + Sync + 'static,
209 Fut: Future<Output = ()> + Send + 'static,
210 {
211 let id = self.id_generator.next();
212 let boxed: StoredAsyncHandler<E> = Arc::new(move |event: &E| {
213 let fut = handler(event);
214 Box::pin(fut) as BoxFuture<()>
215 });
216 let entry = AsyncHandlerEntry {
217 id,
218 priority,
219 handler: boxed,
220 };
221 drop(self.handlers.rcu(|current| {
222 let mut new_vec: Vec<AsyncHandlerEntry<E>> = Vec::with_capacity(current.len() + 1);
223 new_vec.extend(current.iter().cloned());
224 let pos = new_vec.partition_point(|e| e.priority >= entry.priority);
225 new_vec.insert(pos, entry.clone());
226 Arc::new(new_vec)
227 }));
228 id
229 }
230
231 /// Register an async handler and return a RAII
232 /// [`AsyncHandlerGuard`] that auto-unregisters when dropped.
233 ///
234 /// Requires the registry to be wrapped in [`Arc`] so the guard can hold
235 /// a [`std::sync::Weak`] reference.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use std::sync::Arc;
241 /// use registry_io::r#async::AsyncRegistry;
242 ///
243 /// let registry = Arc::new(AsyncRegistry::<u32>::new());
244 /// {
245 /// let _guard = registry.register_guard(|_| async move {});
246 /// assert_eq!(registry.handler_count(), 1);
247 /// }
248 /// assert_eq!(registry.handler_count(), 0);
249 /// ```
250 pub fn register_guard<F, Fut>(self: &Arc<Self>, handler: F) -> AsyncHandlerGuard<E>
251 where
252 F: Fn(&E) -> Fut + Send + Sync + 'static,
253 Fut: Future<Output = ()> + Send + 'static,
254 {
255 let id = self.register(handler);
256 AsyncHandlerGuard::new(id, Arc::downgrade(self))
257 }
258
259 /// Like [`AsyncRegistry::register_guard`] but with an explicit
260 /// priority value. Higher priorities fire first; ties broken in
261 /// registration order. See
262 /// [`AsyncRegistry::register_with_priority`].
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use std::sync::Arc;
268 /// use registry_io::r#async::AsyncRegistry;
269 ///
270 /// let registry = Arc::new(AsyncRegistry::<&'static str>::new());
271 /// let _hi = registry.register_guard_with_priority(100, |evt| {
272 /// let s = *evt;
273 /// async move { let _ = s; }
274 /// });
275 /// let _lo = registry.register_guard_with_priority(-5, |evt| {
276 /// let s = *evt;
277 /// async move { let _ = s; }
278 /// });
279 /// assert_eq!(registry.handler_count(), 2);
280 /// ```
281 pub fn register_guard_with_priority<F, Fut>(
282 self: &Arc<Self>,
283 priority: i32,
284 handler: F,
285 ) -> AsyncHandlerGuard<E>
286 where
287 F: Fn(&E) -> Fut + Send + Sync + 'static,
288 Fut: Future<Output = ()> + Send + 'static,
289 {
290 let id = self.register_with_priority(priority, handler);
291 AsyncHandlerGuard::new(id, Arc::downgrade(self))
292 }
293
294 /// Unregister an async handler by id. Returns `true` if a handler was
295 /// found and removed.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use registry_io::r#async::AsyncRegistry;
301 ///
302 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
303 /// let id = registry.register(|_| async move {});
304 /// assert!(registry.unregister(id));
305 /// assert!(!registry.unregister(id));
306 /// ```
307 pub fn unregister(&self, id: HandlerId) -> bool {
308 let mut removed = false;
309 drop(self.handlers.rcu(|current| {
310 let mut new_vec: Vec<AsyncHandlerEntry<E>> = Vec::with_capacity(current.len());
311 new_vec.extend(current.iter().filter(|e| e.id != id).cloned());
312 removed = new_vec.len() != current.len();
313 Arc::new(new_vec)
314 }));
315 removed
316 }
317
318 /// Remove every registered handler.
319 ///
320 /// In-flight `notify*` calls that already loaded the snapshot still run
321 /// every handler in their snapshot to completion.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// use registry_io::r#async::AsyncRegistry;
327 ///
328 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
329 /// for _ in 0..5 {
330 /// let _ = registry.register(|_| async move {});
331 /// }
332 /// registry.clear();
333 /// assert_eq!(registry.handler_count(), 0);
334 /// ```
335 pub fn clear(&self) {
336 self.handlers.store(Arc::new(Vec::new()));
337 }
338
339 /// Current handler count. `O(1)` atomic snapshot.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use registry_io::r#async::AsyncRegistry;
345 ///
346 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
347 /// assert_eq!(registry.handler_count(), 0);
348 /// let _ = registry.register(|_| async move {});
349 /// assert_eq!(registry.handler_count(), 1);
350 /// ```
351 #[inline]
352 #[must_use]
353 pub fn handler_count(&self) -> usize {
354 self.handlers.load().len()
355 }
356
357 /// `true` if no handlers are registered.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// use registry_io::r#async::AsyncRegistry;
363 ///
364 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
365 /// assert!(registry.is_empty());
366 /// let _ = registry.register(|_| async move {});
367 /// assert!(!registry.is_empty());
368 /// ```
369 #[inline]
370 #[must_use]
371 pub fn is_empty(&self) -> bool {
372 self.handlers.load().is_empty()
373 }
374
375 /// `true` if a handler with `id` is currently registered.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// use registry_io::r#async::AsyncRegistry;
381 ///
382 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
383 /// let id = registry.register(|_| async move {});
384 /// assert!(registry.contains(id));
385 /// assert!(registry.unregister(id));
386 /// assert!(!registry.contains(id));
387 /// ```
388 #[must_use]
389 pub fn contains(&self, id: HandlerId) -> bool {
390 self.handlers.load().iter().any(|e| e.id == id)
391 }
392
393 /// Install a panic callback fired once per panicking handler future
394 /// during `notify*`. Replaces any previously installed callback.
395 /// Second-order panics inside the callback itself are caught and
396 /// discarded.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// # #[tokio::main(flavor = "current_thread")]
402 /// # async fn main() {
403 /// use std::sync::Arc;
404 /// use std::sync::atomic::{AtomicUsize, Ordering};
405 /// use registry_io::r#async::AsyncRegistry;
406 ///
407 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
408 /// let count = Arc::new(AtomicUsize::new(0));
409 /// let sink = Arc::clone(&count);
410 /// registry.on_panic(move |_| {
411 /// let _ = sink.fetch_add(1, Ordering::Relaxed);
412 /// });
413 ///
414 /// let _ = registry.register(|_| async move { panic!("oops") });
415 /// registry.notify(&()).await;
416 /// assert_eq!(count.load(Ordering::Relaxed), 1);
417 /// # }
418 /// ```
419 pub fn on_panic<F>(&self, callback: F)
420 where
421 F: Fn(&PanicInfo<'_>) + Send + Sync + 'static,
422 {
423 let holder = Arc::new(PanicCallbackHolder::new(callback));
424 self.panic_callback.store(Some(holder));
425 }
426
427 /// Remove any previously installed panic callback. Subsequent
428 /// handler panics during `notify*` become silent.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use registry_io::r#async::AsyncRegistry;
434 ///
435 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
436 /// registry.on_panic(|_| {});
437 /// registry.clear_panic_callback();
438 /// ```
439 pub fn clear_panic_callback(&self) {
440 self.panic_callback.store(None);
441 }
442
443 /// Dispatch `event` to every registered handler **concurrently**.
444 ///
445 /// Builds one future per handler, then awaits them all via the
446 /// crate-local `JoinAll` combinator. Each handler future is wrapped in
447 /// a crate-local `CatchUnwind` adapter so a panic in one handler does
448 /// not poison the join — its sibling handlers continue.
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// # #[tokio::main(flavor = "current_thread")]
454 /// # async fn main() {
455 /// use std::sync::Arc;
456 /// use std::sync::atomic::{AtomicU32, Ordering};
457 /// use registry_io::r#async::AsyncRegistry;
458 ///
459 /// let registry: AsyncRegistry<u32> = AsyncRegistry::new();
460 /// let total = Arc::new(AtomicU32::new(0));
461 /// for _ in 0..4 {
462 /// let sink = Arc::clone(&total);
463 /// let _ = registry.register(move |value| {
464 /// let sink = Arc::clone(&sink);
465 /// let v = *value;
466 /// async move {
467 /// sink.fetch_add(v, Ordering::Relaxed);
468 /// }
469 /// });
470 /// }
471 ///
472 /// registry.notify(&10).await;
473 /// assert_eq!(total.load(Ordering::Relaxed), 40);
474 /// # }
475 /// ```
476 pub async fn notify(&self, event: &E) {
477 let snapshot = self.handlers.load();
478 if snapshot.is_empty() {
479 return;
480 }
481
482 // Single pass over the snapshot, producing parallel `ids` and
483 // `wrapped` vectors so we can attribute each post-join outcome
484 // back to its originating handler. `JoinAll` preserves input
485 // order, so a positional zip is exact and saves the intermediate
486 // `pairs` allocation the prior implementation needed.
487 let n = snapshot.len();
488 let mut ids: Vec<HandlerId> = Vec::with_capacity(n);
489 let mut wrapped = Vec::with_capacity(n);
490 for entry in snapshot.iter() {
491 ids.push(entry.id);
492 wrapped.push(CatchUnwind::new((entry.handler)(event)));
493 }
494 let results = JoinAll::new(wrapped).await;
495
496 for (id, outcome) in ids.into_iter().zip(results) {
497 if let Err(payload) = outcome {
498 self.handle_panic(id, payload);
499 }
500 }
501 }
502
503 /// Dispatch `event` to every registered handler **sequentially**, in
504 /// priority order.
505 ///
506 /// Each handler's future is awaited to completion before the next one
507 /// starts. Use this when handlers must observe a strict happens-before
508 /// relationship with one another.
509 ///
510 /// # Examples
511 ///
512 /// ```
513 /// # #[tokio::main(flavor = "current_thread")]
514 /// # async fn main() {
515 /// use std::sync::{Arc, Mutex};
516 /// use registry_io::r#async::AsyncRegistry;
517 ///
518 /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
519 /// let log: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
520 ///
521 /// let l = Arc::clone(&log);
522 /// let _ = registry.register_with_priority(10, move |_| {
523 /// let l = Arc::clone(&l);
524 /// async move { l.lock().unwrap().push("first"); }
525 /// });
526 /// let l = Arc::clone(&log);
527 /// let _ = registry.register(move |_| {
528 /// let l = Arc::clone(&l);
529 /// async move { l.lock().unwrap().push("second"); }
530 /// });
531 ///
532 /// registry.notify_sequential(&()).await;
533 /// assert_eq!(log.lock().unwrap().as_slice(), &["first", "second"]);
534 /// # }
535 /// ```
536 pub async fn notify_sequential(&self, event: &E) {
537 let snapshot = self.handlers.load();
538 for entry in snapshot.iter() {
539 let fut = (entry.handler)(event);
540 match CatchUnwind::new(fut).await {
541 Ok(()) => {}
542 Err(payload) => self.handle_panic(entry.id, payload),
543 }
544 }
545 }
546
547 /// Invoke the panic callback (if installed), then drop the payload.
548 /// Mirrors [`crate::SyncRegistry`]'s panic plumbing.
549 #[cold]
550 fn handle_panic(&self, handler_id: HandlerId, payload: Box<dyn Any + Send + 'static>) {
551 let guard = self.panic_callback.load();
552 if let Some(holder) = guard.as_ref() {
553 let info = PanicInfo::new(handler_id, payload.as_ref());
554 drop(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
555 || {
556 holder.invoke(&info);
557 },
558 )));
559 }
560 drop(payload);
561 }
562}
563
564impl<E: Send + Sync + 'static> Default for AsyncRegistry<E> {
565 fn default() -> Self {
566 Self::new()
567 }
568}
569
570impl<E: Send + Sync + 'static> fmt::Debug for AsyncRegistry<E> {
571 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
572 f.debug_struct("AsyncRegistry")
573 .field("handler_count", &self.handlers.load().len())
574 .field("has_panic_callback", &self.panic_callback.load().is_some())
575 .finish_non_exhaustive()
576 }
577}
578
579#[cfg(test)]
580#[allow(clippy::unwrap_used, clippy::expect_used)]
581mod tests {
582 use super::AsyncRegistry;
583 use std::sync::Arc;
584 use std::sync::atomic::{AtomicU32, Ordering};
585
586 #[tokio::test]
587 async fn empty_registry_notify_is_noop() {
588 let registry: AsyncRegistry<u32> = AsyncRegistry::new();
589 registry.notify(&42).await;
590 registry.notify_sequential(&42).await;
591 }
592
593 #[tokio::test]
594 async fn concurrent_notify_fires_every_handler_once() {
595 let registry: AsyncRegistry<u32> = AsyncRegistry::new();
596 let count = Arc::new(AtomicU32::new(0));
597 for _ in 0..5 {
598 let sink = Arc::clone(&count);
599 let _ = registry.register(move |_| {
600 let sink = Arc::clone(&sink);
601 async move {
602 let _ = sink.fetch_add(1, Ordering::Relaxed);
603 }
604 });
605 }
606 registry.notify(&0).await;
607 assert_eq!(count.load(Ordering::Relaxed), 5);
608 }
609}