eventbus_tiny/
bus.rs

1//! A tiny pub/sub event bus
2
3use crate::sub::{self, AnySubscriber, Subscription};
4use std::any::Any;
5use std::sync::{Arc, RwLock};
6
7/// A tiny pub/sub event bus
8///
9/// # Performance and Locking
10/// While the event bus is not entirely lock-free, blocking write-locks are only used on registry updates; i.e.
11/// [`Self::subscribe`] and [`Self::shrink_to_fit`]. Event publication uses a non-blocking read-lock, and event
12/// listening and receiption is completely lock-free. In normal scenarios with moderate subscriber fluctuation, event
13/// passing is largely independent on mutex performance and only affected by the amount of event types and subscribers.
14///
15/// # Memory Allocation and Publication
16/// The event bus itself only (de-)allocates memory on registry updates; i.e. [`Self::subscribe`] and
17/// [`Self::shrink_to_fit`].
18///
19/// **However**, during event publication events are cloned for each subscriber to achieve strong decoupling. If
20/// [`Clone`] is not cheap for your event type or you expect a large number of subscribers, consider wrapping the event
21/// into an [`Arc`](std::sync::Arc) or similar referencing types to keep the performance and memory impact low.
22///
23/// # Many Subscribers
24/// Due to the type abstraction layer, each new event needs to be checked against every subscriber to see if the
25/// subscriber can handle this event. While this check is cheap, it may accumulate if you have a very high
26/// event-throughput with lots of subscribers.
27#[derive(Debug, Default)]
28pub struct EventBus {
29    /// A registry for event subscribers
30    registry: RwLock<Vec<Box<dyn AnySubscriber>>>,
31}
32impl EventBus {
33    /// Creates a new event bus
34    pub const fn new() -> Self {
35        let registry = RwLock::new(Vec::new());
36        Self { registry }
37    }
38
39    /// Publishes an event to all registered subscribers and returns the amount of subscribers addressed
40    pub fn publish<T>(&self, event: T) -> usize
41    where
42        T: Send + Clone + 'static,
43    {
44        // Broadcast message to subscribers
45        let registry = self.registry.read().expect("failed to lock registry");
46        registry.iter().fold(0, |count, subscriber| {
47            // Try to send message
48            #[allow(clippy::arithmetic_side_effects, reason = "Can never overflow")]
49            (count + subscriber.send(&event) as usize)
50        })
51    }
52
53    /// Subscribes to a given event type
54    #[must_use]
55    pub fn subscribe<T>(&self, backlog: usize) -> Subscription<T>
56    where
57        T: Send + Clone + 'static,
58    {
59        /// Identity filter-map function for `T`
60        fn filter_map<T>(event: &dyn Any) -> Option<T>
61        where
62            T: Clone + 'static,
63        {
64            event.downcast_ref::<T>().cloned()
65        }
66
67        // Create subscription
68        self.subscribe_where(backlog, filter_map)
69    }
70
71    /// Creates an aggregate subscriber for any event type `X` where `aggregate(&X) => Some(T)`
72    ///
73    /// # Performance
74    /// Please note that the mapping function is called for event for each subscriber to see if the event can be
75    /// delivered to the subscriber. If the mapping is expensive, it is recommended to add an early-abort check before
76    /// the real mapping begins to quickly reject invalid types.
77    ///
78    /// # See Also
79    /// See also [`crate::where_into`] and [`crate::where_try_into`] to create mappers for `Into` and `TryInto`
80    /// convertible types.
81    #[must_use]
82    pub fn subscribe_where<T, F>(&self, backlog: usize, filter_map: F) -> Subscription<T>
83    where
84        T: Send + Clone + 'static,
85        F: Fn(&dyn Any) -> Option<T> + Send + Sync + 'static,
86    {
87        // Create channel
88        let convert = Arc::new(filter_map);
89        let (subscriber, subscription) = sub::pair(backlog, convert);
90        let subscriber: Box<dyn AnySubscriber> = Box::new(subscriber);
91
92        // Register sender
93        let mut registry = self.registry.write().expect("failed to lock registry");
94        registry.push(subscriber);
95
96        // Return associated subscriber
97        subscription
98    }
99
100    /// Shrinks the allocated capacity as much as possible
101    pub fn shrink_to_fit(&self) {
102        // Remove all dead subscribers
103        let mut registry = self.registry.write().expect("failed to lock registry");
104        registry.retain(|subscriber| subscriber.is_alive());
105        registry.shrink_to_fit();
106    }
107}