eventbus_tiny/bus.rs
1//! A tiny pub/sub event bus
2
3use crate::sub::{self, AnySubscriber, Subscription};
4use std::any::Any;
5use std::sync::{Arc, RwLock};
6
7/// A tiny pub/sub event bus
8///
9/// # Performance and Locking
10/// While the event bus is not entirely lock-free, blocking write-locks are only used on registry updates; i.e.
11/// [`Self::subscribe`] and [`Self::shrink_to_fit`]. Event publication uses a non-blocking read-lock, and event
12/// listening and receiption is completely lock-free. In normal scenarios with moderate subscriber fluctuation, event
13/// passing is largely independent on mutex performance and only affected by the amount of event types and subscribers.
14///
15/// # Memory Allocation and Publication
16/// The event bus itself only (de-)allocates memory on registry updates; i.e. [`Self::subscribe`] and
17/// [`Self::shrink_to_fit`].
18///
19/// **However**, during event publication events are cloned for each subscriber to achieve strong decoupling. If
20/// [`Clone`] is not cheap for your event type or you expect a large number of subscribers, consider wrapping the event
21/// into an [`Arc`](std::sync::Arc) or similar referencing types to keep the performance and memory impact low.
22///
23/// # Many Subscribers
24/// Due to the type abstraction layer, each new event needs to be checked against every subscriber to see if the
25/// subscriber can handle this event. While this check is cheap, it may accumulate if you have a very high
26/// event-throughput with lots of subscribers.
27#[derive(Debug, Default)]
28pub struct EventBus {
29 /// A registry for event subscribers
30 registry: RwLock<Vec<Box<dyn AnySubscriber>>>,
31}
32impl EventBus {
33 /// Creates a new event bus
34 pub const fn new() -> Self {
35 let registry = RwLock::new(Vec::new());
36 Self { registry }
37 }
38
39 /// Publishes an event to all registered subscribers and returns the amount of subscribers addressed
40 pub fn publish<T>(&self, event: T) -> usize
41 where
42 T: Send + Clone + 'static,
43 {
44 // Broadcast message to subscribers
45 let registry = self.registry.read().expect("failed to lock registry");
46 registry.iter().fold(0, |count, subscriber| {
47 // Try to send message
48 #[allow(clippy::arithmetic_side_effects, reason = "Can never overflow")]
49 (count + subscriber.send(&event) as usize)
50 })
51 }
52
53 /// Subscribes to a given event type
54 #[must_use]
55 pub fn subscribe<T>(&self, backlog: usize) -> Subscription<T>
56 where
57 T: Send + Clone + 'static,
58 {
59 /// Identity filter-map function for `T`
60 fn filter_map<T>(event: &dyn Any) -> Option<T>
61 where
62 T: Clone + 'static,
63 {
64 event.downcast_ref::<T>().cloned()
65 }
66
67 // Create subscription
68 self.subscribe_where(backlog, filter_map)
69 }
70
71 /// Creates an aggregate subscriber for any event type `X` where `aggregate(&X) => Some(T)`
72 ///
73 /// # Performance
74 /// Please note that the mapping function is called for event for each subscriber to see if the event can be
75 /// delivered to the subscriber. If the mapping is expensive, it is recommended to add an early-abort check before
76 /// the real mapping begins to quickly reject invalid types.
77 ///
78 /// # See Also
79 /// See also [`crate::where_into`] and [`crate::where_try_into`] to create mappers for `Into` and `TryInto`
80 /// convertible types.
81 #[must_use]
82 pub fn subscribe_where<T, F>(&self, backlog: usize, filter_map: F) -> Subscription<T>
83 where
84 T: Send + Clone + 'static,
85 F: Fn(&dyn Any) -> Option<T> + Send + Sync + 'static,
86 {
87 // Create channel
88 let convert = Arc::new(filter_map);
89 let (subscriber, subscription) = sub::pair(backlog, convert);
90 let subscriber: Box<dyn AnySubscriber> = Box::new(subscriber);
91
92 // Register sender
93 let mut registry = self.registry.write().expect("failed to lock registry");
94 registry.push(subscriber);
95
96 // Return associated subscriber
97 subscription
98 }
99
100 /// Shrinks the allocated capacity as much as possible
101 pub fn shrink_to_fit(&self) {
102 // Remove all dead subscribers
103 let mut registry = self.registry.write().expect("failed to lock registry");
104 registry.retain(|subscriber| subscriber.is_alive());
105 registry.shrink_to_fit();
106 }
107}