Skip to main content

registry_io/async_registry/
mod.rs

1//! Asynchronous, lock-free event registry.
2//!
3//! [`AsyncRegistry`] mirrors [`crate::SyncRegistry`] for `async fn` handlers.
4//! Handlers return a future that the registry drives — either concurrently
5//! via [`AsyncRegistry::notify`] or sequentially via
6//! [`AsyncRegistry::notify_sequential`].
7//!
8//! Module gated behind the `async` feature flag.
9
10use core::any::Any;
11use core::fmt;
12use core::future::Future;
13use core::pin::Pin;
14use std::sync::Arc;
15
16use arc_swap::{ArcSwap, ArcSwapOption};
17
18use crate::HandlerId;
19use crate::future_ext::{CatchUnwind, JoinAll};
20use crate::handler_id::HandlerIdGenerator;
21use crate::panic::{PanicCallbackHolder, PanicInfo};
22
23/// `Pin<Box<dyn Future<Output = T> + Send + 'static>>` — the type-erased
24/// boxed future stored inside the registry. Defined locally rather than
25/// pulled from `futures-util` to keep the dependency surface minimal.
26type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
27
28pub mod guard;
29pub use guard::AsyncHandlerGuard;
30
31/// A boxed async handler closure stored inside the registry.
32///
33/// The returned future is `'static` so it must not borrow from the event.
34/// Handlers that need to retain event data should `.clone()` it inside the
35/// closure before returning the future.
36type StoredAsyncHandler<E> = Arc<dyn Fn(&E) -> BoxFuture<()> + Send + Sync + 'static>;
37
38/// One entry in the async handler list.
39struct AsyncHandlerEntry<E: Send + Sync + 'static> {
40    id: HandlerId,
41    priority: i32,
42    handler: StoredAsyncHandler<E>,
43}
44
45impl<E: Send + Sync + 'static> Clone for AsyncHandlerEntry<E> {
46    #[inline]
47    fn clone(&self) -> Self {
48        Self {
49            id: self.id,
50            priority: self.priority,
51            handler: Arc::clone(&self.handler),
52        }
53    }
54}
55
56impl<E: Send + Sync + 'static> fmt::Debug for AsyncHandlerEntry<E> {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.debug_struct("AsyncHandlerEntry")
59            .field("id", &self.id)
60            .field("priority", &self.priority)
61            .finish_non_exhaustive()
62    }
63}
64
65/// Asynchronous event registry.
66///
67/// Same lock-free, `ArcSwap`-backed read path as [`crate::SyncRegistry`], but
68/// handlers return a future of `()`. Two dispatch modes are available:
69///
70/// - [`AsyncRegistry::notify`] — drives every handler concurrently via a
71///   crate-local `JoinAll` combinator. Faster total wall-clock for
72///   handlers that perform real `.await` work, since they make progress in
73///   parallel under the runtime.
74/// - [`AsyncRegistry::notify_sequential`] — awaits each handler in order.
75///   Use when downstream ordering or back-pressure between handlers matters.
76///
77/// Each handler future is wrapped in a crate-local `CatchUnwind` adapter
78/// so a panic during `poll` is isolated from sibling handlers and from the
79/// caller awaiting `notify`.
80///
81/// # Type parameter
82///
83/// `E` is the event type. Handlers receive `&E` but return a `'static`
84/// future, so they must `clone` whatever they need from `&E` before
85/// `async move { ... }`.
86///
87/// # Examples
88///
89/// ```
90/// # #[tokio::main(flavor = "current_thread")]
91/// # async fn main() {
92/// use std::sync::Arc;
93/// use std::sync::atomic::{AtomicU64, Ordering};
94/// use registry_io::r#async::AsyncRegistry;
95///
96/// let registry: AsyncRegistry<u64> = AsyncRegistry::new();
97/// let total = Arc::new(AtomicU64::new(0));
98///
99/// let sink = Arc::clone(&total);
100/// let _ = registry.register(move |value| {
101///     let sink = Arc::clone(&sink);
102///     let v = *value;
103///     async move {
104///         sink.fetch_add(v, Ordering::Relaxed);
105///     }
106/// });
107///
108/// registry.notify(&7).await;
109/// assert_eq!(total.load(Ordering::Relaxed), 7);
110/// # }
111/// ```
112pub struct AsyncRegistry<E: Send + Sync + 'static> {
113    handlers: ArcSwap<Vec<AsyncHandlerEntry<E>>>,
114    id_generator: HandlerIdGenerator,
115    panic_callback: ArcSwapOption<PanicCallbackHolder>,
116}
117
118impl<E: Send + Sync + 'static> AsyncRegistry<E> {
119    /// Create a new, empty async registry.
120    ///
121    /// # Examples
122    ///
123    /// ```
124    /// use registry_io::r#async::AsyncRegistry;
125    ///
126    /// let registry: AsyncRegistry<u32> = AsyncRegistry::new();
127    /// assert!(registry.is_empty());
128    /// ```
129    #[must_use]
130    pub fn new() -> Self {
131        Self {
132            handlers: ArcSwap::from_pointee(Vec::new()),
133            id_generator: HandlerIdGenerator::new(),
134            panic_callback: ArcSwapOption::empty(),
135        }
136    }
137
138    /// Create a new, empty async registry with pre-allocated handler
139    /// capacity.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use registry_io::r#async::AsyncRegistry;
145    ///
146    /// let registry: AsyncRegistry<u64> = AsyncRegistry::with_capacity(16);
147    /// assert!(registry.is_empty());
148    /// ```
149    #[must_use]
150    pub fn with_capacity(capacity: usize) -> Self {
151        Self {
152            handlers: ArcSwap::from_pointee(Vec::with_capacity(capacity)),
153            id_generator: HandlerIdGenerator::new(),
154            panic_callback: ArcSwapOption::empty(),
155        }
156    }
157
158    /// Register an async handler at the default priority (`0`).
159    ///
160    /// The handler is a closure `Fn(&E) -> impl Future<Output = ()>`. The
161    /// returned future must be `'static`: clone any data from `&E` you need
162    /// before the inner `async move { ... }`.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use registry_io::r#async::AsyncRegistry;
168    ///
169    /// let registry: AsyncRegistry<String> = AsyncRegistry::new();
170    /// let _ = registry.register(|event| {
171    ///     let owned = event.clone();
172    ///     async move {
173    ///         // Pretend we awaited something useful here.
174    ///         let _ = owned.len();
175    ///     }
176    /// });
177    /// ```
178    pub fn register<F, Fut>(&self, handler: F) -> HandlerId
179    where
180        F: Fn(&E) -> Fut + Send + Sync + 'static,
181        Fut: Future<Output = ()> + Send + 'static,
182    {
183        self.register_with_priority(0, handler)
184    }
185
186    /// Register an async handler with an explicit priority.
187    ///
188    /// Dispatch order at notify time follows the same rule as
189    /// [`crate::SyncRegistry::register_with_priority`]: higher priority
190    /// fires first, ties broken in registration order. In concurrent
191    /// dispatch ([`AsyncRegistry::notify`]) priority controls the order in
192    /// which futures are *spawned* into the join, not the order they
193    /// complete in.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use registry_io::r#async::AsyncRegistry;
199    ///
200    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
201    /// let _ = registry.register_with_priority(100, |_| async move {});
202    /// let _ = registry.register(|_| async move {});
203    /// let _ = registry.register_with_priority(-10, |_| async move {});
204    /// assert_eq!(registry.handler_count(), 3);
205    /// ```
206    pub fn register_with_priority<F, Fut>(&self, priority: i32, handler: F) -> HandlerId
207    where
208        F: Fn(&E) -> Fut + Send + Sync + 'static,
209        Fut: Future<Output = ()> + Send + 'static,
210    {
211        let id = self.id_generator.next();
212        let boxed: StoredAsyncHandler<E> = Arc::new(move |event: &E| {
213            let fut = handler(event);
214            Box::pin(fut) as BoxFuture<()>
215        });
216        let entry = AsyncHandlerEntry {
217            id,
218            priority,
219            handler: boxed,
220        };
221        drop(self.handlers.rcu(|current| {
222            let mut new_vec: Vec<AsyncHandlerEntry<E>> = Vec::with_capacity(current.len() + 1);
223            new_vec.extend(current.iter().cloned());
224            let pos = new_vec.partition_point(|e| e.priority >= entry.priority);
225            new_vec.insert(pos, entry.clone());
226            Arc::new(new_vec)
227        }));
228        id
229    }
230
231    /// Register an async handler and return a RAII
232    /// [`AsyncHandlerGuard`] that auto-unregisters when dropped.
233    ///
234    /// Requires the registry to be wrapped in [`Arc`] so the guard can hold
235    /// a [`std::sync::Weak`] reference.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use std::sync::Arc;
241    /// use registry_io::r#async::AsyncRegistry;
242    ///
243    /// let registry = Arc::new(AsyncRegistry::<u32>::new());
244    /// {
245    ///     let _guard = registry.register_guard(|_| async move {});
246    ///     assert_eq!(registry.handler_count(), 1);
247    /// }
248    /// assert_eq!(registry.handler_count(), 0);
249    /// ```
250    pub fn register_guard<F, Fut>(self: &Arc<Self>, handler: F) -> AsyncHandlerGuard<E>
251    where
252        F: Fn(&E) -> Fut + Send + Sync + 'static,
253        Fut: Future<Output = ()> + Send + 'static,
254    {
255        let id = self.register(handler);
256        AsyncHandlerGuard::new(id, Arc::downgrade(self))
257    }
258
259    /// Like [`AsyncRegistry::register_guard`] but with an explicit
260    /// priority value. Higher priorities fire first; ties broken in
261    /// registration order. See
262    /// [`AsyncRegistry::register_with_priority`].
263    ///
264    /// # Examples
265    ///
266    /// ```
267    /// use std::sync::Arc;
268    /// use registry_io::r#async::AsyncRegistry;
269    ///
270    /// let registry = Arc::new(AsyncRegistry::<&'static str>::new());
271    /// let _hi = registry.register_guard_with_priority(100, |evt| {
272    ///     let s = *evt;
273    ///     async move { let _ = s; }
274    /// });
275    /// let _lo = registry.register_guard_with_priority(-5, |evt| {
276    ///     let s = *evt;
277    ///     async move { let _ = s; }
278    /// });
279    /// assert_eq!(registry.handler_count(), 2);
280    /// ```
281    pub fn register_guard_with_priority<F, Fut>(
282        self: &Arc<Self>,
283        priority: i32,
284        handler: F,
285    ) -> AsyncHandlerGuard<E>
286    where
287        F: Fn(&E) -> Fut + Send + Sync + 'static,
288        Fut: Future<Output = ()> + Send + 'static,
289    {
290        let id = self.register_with_priority(priority, handler);
291        AsyncHandlerGuard::new(id, Arc::downgrade(self))
292    }
293
294    /// Unregister an async handler by id. Returns `true` if a handler was
295    /// found and removed.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// use registry_io::r#async::AsyncRegistry;
301    ///
302    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
303    /// let id = registry.register(|_| async move {});
304    /// assert!(registry.unregister(id));
305    /// assert!(!registry.unregister(id));
306    /// ```
307    pub fn unregister(&self, id: HandlerId) -> bool {
308        let mut removed = false;
309        drop(self.handlers.rcu(|current| {
310            let mut new_vec: Vec<AsyncHandlerEntry<E>> = Vec::with_capacity(current.len());
311            new_vec.extend(current.iter().filter(|e| e.id != id).cloned());
312            removed = new_vec.len() != current.len();
313            Arc::new(new_vec)
314        }));
315        removed
316    }
317
318    /// Remove every registered handler.
319    ///
320    /// In-flight `notify*` calls that already loaded the snapshot still run
321    /// every handler in their snapshot to completion.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// use registry_io::r#async::AsyncRegistry;
327    ///
328    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
329    /// for _ in 0..5 {
330    ///     let _ = registry.register(|_| async move {});
331    /// }
332    /// registry.clear();
333    /// assert_eq!(registry.handler_count(), 0);
334    /// ```
335    pub fn clear(&self) {
336        self.handlers.store(Arc::new(Vec::new()));
337    }
338
339    /// Current handler count. `O(1)` atomic snapshot.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use registry_io::r#async::AsyncRegistry;
345    ///
346    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
347    /// assert_eq!(registry.handler_count(), 0);
348    /// let _ = registry.register(|_| async move {});
349    /// assert_eq!(registry.handler_count(), 1);
350    /// ```
351    #[inline]
352    #[must_use]
353    pub fn handler_count(&self) -> usize {
354        self.handlers.load().len()
355    }
356
357    /// `true` if no handlers are registered.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// use registry_io::r#async::AsyncRegistry;
363    ///
364    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
365    /// assert!(registry.is_empty());
366    /// let _ = registry.register(|_| async move {});
367    /// assert!(!registry.is_empty());
368    /// ```
369    #[inline]
370    #[must_use]
371    pub fn is_empty(&self) -> bool {
372        self.handlers.load().is_empty()
373    }
374
375    /// `true` if a handler with `id` is currently registered.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// use registry_io::r#async::AsyncRegistry;
381    ///
382    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
383    /// let id = registry.register(|_| async move {});
384    /// assert!(registry.contains(id));
385    /// assert!(registry.unregister(id));
386    /// assert!(!registry.contains(id));
387    /// ```
388    #[must_use]
389    pub fn contains(&self, id: HandlerId) -> bool {
390        self.handlers.load().iter().any(|e| e.id == id)
391    }
392
393    /// Install a panic callback fired once per panicking handler future
394    /// during `notify*`. Replaces any previously installed callback.
395    /// Second-order panics inside the callback itself are caught and
396    /// discarded.
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// # #[tokio::main(flavor = "current_thread")]
402    /// # async fn main() {
403    /// use std::sync::Arc;
404    /// use std::sync::atomic::{AtomicUsize, Ordering};
405    /// use registry_io::r#async::AsyncRegistry;
406    ///
407    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
408    /// let count = Arc::new(AtomicUsize::new(0));
409    /// let sink = Arc::clone(&count);
410    /// registry.on_panic(move |_| {
411    ///     let _ = sink.fetch_add(1, Ordering::Relaxed);
412    /// });
413    ///
414    /// let _ = registry.register(|_| async move { panic!("oops") });
415    /// registry.notify(&()).await;
416    /// assert_eq!(count.load(Ordering::Relaxed), 1);
417    /// # }
418    /// ```
419    pub fn on_panic<F>(&self, callback: F)
420    where
421        F: Fn(&PanicInfo<'_>) + Send + Sync + 'static,
422    {
423        let holder = Arc::new(PanicCallbackHolder::new(callback));
424        self.panic_callback.store(Some(holder));
425    }
426
427    /// Remove any previously installed panic callback. Subsequent
428    /// handler panics during `notify*` become silent.
429    ///
430    /// # Examples
431    ///
432    /// ```
433    /// use registry_io::r#async::AsyncRegistry;
434    ///
435    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
436    /// registry.on_panic(|_| {});
437    /// registry.clear_panic_callback();
438    /// ```
439    pub fn clear_panic_callback(&self) {
440        self.panic_callback.store(None);
441    }
442
443    /// Dispatch `event` to every registered handler **concurrently**.
444    ///
445    /// Builds one future per handler, then awaits them all via the
446    /// crate-local `JoinAll` combinator. Each handler future is wrapped in
447    /// a crate-local `CatchUnwind` adapter so a panic in one handler does
448    /// not poison the join — its sibling handlers continue.
449    ///
450    /// # Examples
451    ///
452    /// ```
453    /// # #[tokio::main(flavor = "current_thread")]
454    /// # async fn main() {
455    /// use std::sync::Arc;
456    /// use std::sync::atomic::{AtomicU32, Ordering};
457    /// use registry_io::r#async::AsyncRegistry;
458    ///
459    /// let registry: AsyncRegistry<u32> = AsyncRegistry::new();
460    /// let total = Arc::new(AtomicU32::new(0));
461    /// for _ in 0..4 {
462    ///     let sink = Arc::clone(&total);
463    ///     let _ = registry.register(move |value| {
464    ///         let sink = Arc::clone(&sink);
465    ///         let v = *value;
466    ///         async move {
467    ///             sink.fetch_add(v, Ordering::Relaxed);
468    ///         }
469    ///     });
470    /// }
471    ///
472    /// registry.notify(&10).await;
473    /// assert_eq!(total.load(Ordering::Relaxed), 40);
474    /// # }
475    /// ```
476    pub async fn notify(&self, event: &E) {
477        let snapshot = self.handlers.load();
478        if snapshot.is_empty() {
479            return;
480        }
481
482        // Single pass over the snapshot, producing parallel `ids` and
483        // `wrapped` vectors so we can attribute each post-join outcome
484        // back to its originating handler. `JoinAll` preserves input
485        // order, so a positional zip is exact and saves the intermediate
486        // `pairs` allocation the prior implementation needed.
487        let n = snapshot.len();
488        let mut ids: Vec<HandlerId> = Vec::with_capacity(n);
489        let mut wrapped = Vec::with_capacity(n);
490        for entry in snapshot.iter() {
491            ids.push(entry.id);
492            wrapped.push(CatchUnwind::new((entry.handler)(event)));
493        }
494        let results = JoinAll::new(wrapped).await;
495
496        for (id, outcome) in ids.into_iter().zip(results) {
497            if let Err(payload) = outcome {
498                self.handle_panic(id, payload);
499            }
500        }
501    }
502
503    /// Dispatch `event` to every registered handler **sequentially**, in
504    /// priority order.
505    ///
506    /// Each handler's future is awaited to completion before the next one
507    /// starts. Use this when handlers must observe a strict happens-before
508    /// relationship with one another.
509    ///
510    /// # Examples
511    ///
512    /// ```
513    /// # #[tokio::main(flavor = "current_thread")]
514    /// # async fn main() {
515    /// use std::sync::{Arc, Mutex};
516    /// use registry_io::r#async::AsyncRegistry;
517    ///
518    /// let registry: AsyncRegistry<()> = AsyncRegistry::new();
519    /// let log: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
520    ///
521    /// let l = Arc::clone(&log);
522    /// let _ = registry.register_with_priority(10, move |_| {
523    ///     let l = Arc::clone(&l);
524    ///     async move { l.lock().unwrap().push("first"); }
525    /// });
526    /// let l = Arc::clone(&log);
527    /// let _ = registry.register(move |_| {
528    ///     let l = Arc::clone(&l);
529    ///     async move { l.lock().unwrap().push("second"); }
530    /// });
531    ///
532    /// registry.notify_sequential(&()).await;
533    /// assert_eq!(log.lock().unwrap().as_slice(), &["first", "second"]);
534    /// # }
535    /// ```
536    pub async fn notify_sequential(&self, event: &E) {
537        let snapshot = self.handlers.load();
538        for entry in snapshot.iter() {
539            let fut = (entry.handler)(event);
540            match CatchUnwind::new(fut).await {
541                Ok(()) => {}
542                Err(payload) => self.handle_panic(entry.id, payload),
543            }
544        }
545    }
546
547    /// Invoke the panic callback (if installed), then drop the payload.
548    /// Mirrors [`crate::SyncRegistry`]'s panic plumbing.
549    #[cold]
550    fn handle_panic(&self, handler_id: HandlerId, payload: Box<dyn Any + Send + 'static>) {
551        let guard = self.panic_callback.load();
552        if let Some(holder) = guard.as_ref() {
553            let info = PanicInfo::new(handler_id, payload.as_ref());
554            drop(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
555                || {
556                    holder.invoke(&info);
557                },
558            )));
559        }
560        drop(payload);
561    }
562}
563
564impl<E: Send + Sync + 'static> Default for AsyncRegistry<E> {
565    fn default() -> Self {
566        Self::new()
567    }
568}
569
570impl<E: Send + Sync + 'static> fmt::Debug for AsyncRegistry<E> {
571    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
572        f.debug_struct("AsyncRegistry")
573            .field("handler_count", &self.handlers.load().len())
574            .field("has_panic_callback", &self.panic_callback.load().is_some())
575            .finish_non_exhaustive()
576    }
577}
578
579#[cfg(test)]
580#[allow(clippy::unwrap_used, clippy::expect_used)]
581mod tests {
582    use super::AsyncRegistry;
583    use std::sync::Arc;
584    use std::sync::atomic::{AtomicU32, Ordering};
585
586    #[tokio::test]
587    async fn empty_registry_notify_is_noop() {
588        let registry: AsyncRegistry<u32> = AsyncRegistry::new();
589        registry.notify(&42).await;
590        registry.notify_sequential(&42).await;
591    }
592
593    #[tokio::test]
594    async fn concurrent_notify_fires_every_handler_once() {
595        let registry: AsyncRegistry<u32> = AsyncRegistry::new();
596        let count = Arc::new(AtomicU32::new(0));
597        for _ in 0..5 {
598            let sink = Arc::clone(&count);
599            let _ = registry.register(move |_| {
600                let sink = Arc::clone(&sink);
601                async move {
602                    let _ = sink.fetch_add(1, Ordering::Relaxed);
603                }
604            });
605        }
606        registry.notify(&0).await;
607        assert_eq!(count.load(Ordering::Relaxed), 5);
608    }
609}