use core::any::Any;
use core::fmt;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;
use arc_swap::{ArcSwap, ArcSwapOption};
use crate::HandlerId;
use crate::handler_id::HandlerIdGenerator;
use crate::panic::{PanicCallbackHolder, PanicInfo};
pub mod guard;
pub use guard::HandlerGuard;
type StoredHandler<E> = Arc<dyn Fn(&E) + Send + Sync + 'static>;
struct HandlerEntry<E: Send + Sync + 'static> {
id: HandlerId,
priority: i32,
handler: StoredHandler<E>,
}
impl<E: Send + Sync + 'static> Clone for HandlerEntry<E> {
#[inline]
fn clone(&self) -> Self {
Self {
id: self.id,
priority: self.priority,
handler: Arc::clone(&self.handler),
}
}
}
impl<E: Send + Sync + 'static> fmt::Debug for HandlerEntry<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HandlerEntry")
.field("id", &self.id)
.field("priority", &self.priority)
.finish_non_exhaustive()
}
}
pub struct SyncRegistry<E: Send + Sync + 'static> {
handlers: ArcSwap<Vec<HandlerEntry<E>>>,
id_generator: HandlerIdGenerator,
panic_callback: ArcSwapOption<PanicCallbackHolder>,
}
impl<E: Send + Sync + 'static> SyncRegistry<E> {
#[must_use]
pub fn new() -> Self {
Self {
handlers: ArcSwap::from_pointee(Vec::new()),
id_generator: HandlerIdGenerator::new(),
panic_callback: ArcSwapOption::empty(),
}
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
handlers: ArcSwap::from_pointee(Vec::with_capacity(capacity)),
id_generator: HandlerIdGenerator::new(),
panic_callback: ArcSwapOption::empty(),
}
}
pub fn register<F>(&self, handler: F) -> HandlerId
where
F: Fn(&E) + Send + Sync + 'static,
{
self.register_with_priority(0, handler)
}
pub fn register_with_priority<F>(&self, priority: i32, handler: F) -> HandlerId
where
F: Fn(&E) + Send + Sync + 'static,
{
let id = self.id_generator.next();
let entry = HandlerEntry {
id,
priority,
handler: Arc::new(handler),
};
drop(self.handlers.rcu(|current| {
let mut new_vec: Vec<HandlerEntry<E>> = Vec::with_capacity(current.len() + 1);
new_vec.extend(current.iter().cloned());
let pos = new_vec.partition_point(|e| e.priority >= entry.priority);
new_vec.insert(pos, entry.clone());
Arc::new(new_vec)
}));
id
}
pub fn register_guard<F>(self: &Arc<Self>, handler: F) -> HandlerGuard<E>
where
F: Fn(&E) + Send + Sync + 'static,
{
let id = self.register(handler);
HandlerGuard::new(id, Arc::downgrade(self))
}
pub fn register_guard_with_priority<F>(
self: &Arc<Self>,
priority: i32,
handler: F,
) -> HandlerGuard<E>
where
F: Fn(&E) + Send + Sync + 'static,
{
let id = self.register_with_priority(priority, handler);
HandlerGuard::new(id, Arc::downgrade(self))
}
pub fn unregister(&self, id: HandlerId) -> bool {
let mut removed = false;
drop(self.handlers.rcu(|current| {
let mut new_vec: Vec<HandlerEntry<E>> = Vec::with_capacity(current.len());
new_vec.extend(current.iter().filter(|e| e.id != id).cloned());
removed = new_vec.len() != current.len();
Arc::new(new_vec)
}));
removed
}
pub fn clear(&self) {
self.handlers.store(Arc::new(Vec::new()));
}
#[inline]
#[must_use]
pub fn handler_count(&self) -> usize {
self.handlers.load().len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.handlers.load().is_empty()
}
#[must_use]
pub fn contains(&self, id: HandlerId) -> bool {
self.handlers.load().iter().any(|e| e.id == id)
}
pub fn on_panic<F>(&self, callback: F)
where
F: Fn(&PanicInfo<'_>) + Send + Sync + 'static,
{
let holder = Arc::new(PanicCallbackHolder::new(callback));
self.panic_callback.store(Some(holder));
}
pub fn clear_panic_callback(&self) {
self.panic_callback.store(None);
}
#[inline]
pub fn notify(&self, event: &E) {
let snapshot = self.handlers.load();
for entry in snapshot.iter() {
let handler = &entry.handler;
let result = catch_unwind(AssertUnwindSafe(|| handler(event)));
if let Err(payload) = result {
self.handle_panic(entry.id, payload);
}
}
}
#[cold]
fn handle_panic(&self, handler_id: HandlerId, payload: Box<dyn Any + Send + 'static>) {
let guard = self.panic_callback.load();
if let Some(holder) = guard.as_ref() {
let info = PanicInfo::new(handler_id, payload.as_ref());
drop(catch_unwind(AssertUnwindSafe(|| holder.invoke(&info))));
}
drop(payload);
}
}
impl<E: Send + Sync + 'static> Default for SyncRegistry<E> {
fn default() -> Self {
Self::new()
}
}
impl<E: Send + Sync + 'static> fmt::Debug for SyncRegistry<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SyncRegistry")
.field("handler_count", &self.handlers.load().len())
.field("has_panic_callback", &self.panic_callback.load().is_some())
.finish_non_exhaustive()
}
}
#[allow(dead_code)]
const fn _assert_send_sync<E: Send + Sync + 'static>() {
const fn assert_send<T: Send>() {}
const fn assert_sync<T: Sync>() {}
assert_send::<SyncRegistry<E>>();
assert_sync::<SyncRegistry<E>>();
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::SyncRegistry;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn empty_registry_has_no_handlers() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
assert_eq!(registry.handler_count(), 0);
assert!(registry.is_empty());
}
#[test]
fn register_increments_count_and_returns_unique_ids() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
let a = registry.register(|_| {});
let b = registry.register(|_| {});
assert_ne!(a, b);
assert_eq!(registry.handler_count(), 2);
}
#[test]
fn notify_fires_each_handler_once() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
let count = Arc::new(AtomicU32::new(0));
for _ in 0..3 {
let c = Arc::clone(&count);
let _ = registry.register(move |_| {
let _ = c.fetch_add(1, Ordering::Relaxed);
});
}
registry.notify(&7);
assert_eq!(count.load(Ordering::Relaxed), 3);
}
#[test]
fn unregister_returns_false_for_unknown_id() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
let id = registry.register(|_| {});
assert!(registry.unregister(id));
assert!(!registry.unregister(id));
}
#[test]
fn clear_removes_all_handlers() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
for _ in 0..5 {
let _ = registry.register(|_| {});
}
registry.clear();
assert_eq!(registry.handler_count(), 0);
}
#[test]
fn debug_does_not_panic_and_omits_handler_internals() {
let registry: SyncRegistry<u32> = SyncRegistry::new();
let _ = registry.register(|_| {});
let s = format!("{registry:?}");
assert!(s.contains("SyncRegistry"));
assert!(s.contains("handler_count"));
}
}