dynomite/events/manager.rs
1//! [`EventManager`] - the publish side of the cluster events
2//! channel.
3//!
4//! Backed by [`tokio::sync::broadcast`]. The manager owns the
5//! sender; subscribers each hold a fresh receiver. Publishing
6//! is lock-free and never blocks: a slow subscriber will lag
7//! and surface [`super::SubscriberError::Lagged`] on its next
8//! [`Subscriber::recv`](super::Subscriber::recv) call.
9
10use tokio::sync::broadcast;
11
12use super::{ClusterEvent, Subscriber};
13
14/// Publisher side of the cluster events channel.
15///
16/// `EventManager` is cheap to clone (the inner
17/// [`broadcast::Sender`] is reference-counted). Hand a clone or
18/// an [`std::sync::Arc`] handle to every component that needs
19/// to publish.
20///
21/// # Examples
22///
23/// ```
24/// use dynomite::events::EventManager;
25/// let mgr = EventManager::new(8);
26/// let _sub = mgr.subscribe();
27/// assert_eq!(mgr.subscriber_count(), 1);
28/// ```
29#[derive(Clone, Debug)]
30pub struct EventManager {
31 tx: broadcast::Sender<ClusterEvent>,
32}
33
34impl EventManager {
35 /// Build a fresh manager backed by a broadcast channel of
36 /// the supplied capacity.
37 ///
38 /// A capacity of zero is rejected by tokio; this constructor
39 /// silently clamps it to one so embedding code does not have
40 /// to special-case the empty value.
41 ///
42 /// # Examples
43 ///
44 /// ```
45 /// use dynomite::events::EventManager;
46 /// let mgr = EventManager::new(64);
47 /// assert_eq!(mgr.subscriber_count(), 0);
48 /// ```
49 #[must_use]
50 pub fn new(buffer: usize) -> Self {
51 let cap = buffer.max(1);
52 let (tx, _) = broadcast::channel(cap);
53 Self { tx }
54 }
55
56 /// Publish an event to every live subscriber.
57 ///
58 /// Returns silently when there are no subscribers attached;
59 /// `tokio::sync::broadcast::Sender::send` reports that case
60 /// as `Err(SendError(_))`, which we explicitly drop because
61 /// "no observers" is normal during quiet periods and is
62 /// never the publisher's problem.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use std::time::SystemTime;
68 /// use dynomite::events::{ClusterEvent, EventManager};
69 /// let mgr = EventManager::new(4);
70 /// // No subscribers attached; publish is still infallible.
71 /// mgr.publish(ClusterEvent::RingChanged {
72 /// tag: "init".into(),
73 /// ts: SystemTime::now(),
74 /// });
75 /// ```
76 pub fn publish(&self, event: ClusterEvent) {
77 // The Err arm carries the event back; we deliberately
78 // discard it because "no subscribers" is not an error.
79 let _ = self.tx.send(event);
80 }
81
82 /// Subscribe a fresh receiver.
83 ///
84 /// Each call returns an independent [`Subscriber`] whose
85 /// queue starts empty; events published before the call are
86 /// not delivered to the new subscriber.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use dynomite::events::EventManager;
92 /// let mgr = EventManager::new(4);
93 /// let _a = mgr.subscribe();
94 /// let _b = mgr.subscribe();
95 /// assert_eq!(mgr.subscriber_count(), 2);
96 /// ```
97 #[must_use]
98 pub fn subscribe(&self) -> Subscriber {
99 Subscriber::new(self.tx.subscribe())
100 }
101
102 /// Number of attached subscribers.
103 #[must_use]
104 pub fn subscriber_count(&self) -> usize {
105 self.tx.receiver_count()
106 }
107}
108
109impl Default for EventManager {
110 /// Build a manager with a 64-event buffer.
111 ///
112 /// 64 matches the default capacity of [`crate::embed::events::EventBus`]
113 /// so embedders that swap one for the other do not see a
114 /// surprise change in lag behaviour.
115 fn default() -> Self {
116 Self::new(64)
117 }
118}