Skip to main content

dynomite/events/
manager.rs

1//! [`EventManager`] - the publish side of the cluster events
2//! channel.
3//!
4//! Backed by [`tokio::sync::broadcast`]. The manager owns the
5//! sender; subscribers each hold a fresh receiver. Publishing
6//! is lock-free and never blocks: a slow subscriber will lag
7//! and surface [`super::SubscriberError::Lagged`] on its next
8//! [`Subscriber::recv`](super::Subscriber::recv) call.
9
10use tokio::sync::broadcast;
11
12use super::{ClusterEvent, Subscriber};
13
14/// Publisher side of the cluster events channel.
15///
16/// `EventManager` is cheap to clone (the inner
17/// [`broadcast::Sender`] is reference-counted). Hand a clone or
18/// an [`std::sync::Arc`] handle to every component that needs
19/// to publish.
20///
21/// # Examples
22///
23/// ```
24/// use dynomite::events::EventManager;
25/// let mgr = EventManager::new(8);
26/// let _sub = mgr.subscribe();
27/// assert_eq!(mgr.subscriber_count(), 1);
28/// ```
29#[derive(Clone, Debug)]
30pub struct EventManager {
31    tx: broadcast::Sender<ClusterEvent>,
32}
33
34impl EventManager {
35    /// Build a fresh manager backed by a broadcast channel of
36    /// the supplied capacity.
37    ///
38    /// A capacity of zero is rejected by tokio; this constructor
39    /// silently clamps it to one so embedding code does not have
40    /// to special-case the empty value.
41    ///
42    /// # Examples
43    ///
44    /// ```
45    /// use dynomite::events::EventManager;
46    /// let mgr = EventManager::new(64);
47    /// assert_eq!(mgr.subscriber_count(), 0);
48    /// ```
49    #[must_use]
50    pub fn new(buffer: usize) -> Self {
51        let cap = buffer.max(1);
52        let (tx, _) = broadcast::channel(cap);
53        Self { tx }
54    }
55
56    /// Publish an event to every live subscriber.
57    ///
58    /// Returns silently when there are no subscribers attached;
59    /// `tokio::sync::broadcast::Sender::send` reports that case
60    /// as `Err(SendError(_))`, which we explicitly drop because
61    /// "no observers" is normal during quiet periods and is
62    /// never the publisher's problem.
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// use std::time::SystemTime;
68    /// use dynomite::events::{ClusterEvent, EventManager};
69    /// let mgr = EventManager::new(4);
70    /// // No subscribers attached; publish is still infallible.
71    /// mgr.publish(ClusterEvent::RingChanged {
72    ///     tag: "init".into(),
73    ///     ts: SystemTime::now(),
74    /// });
75    /// ```
76    pub fn publish(&self, event: ClusterEvent) {
77        // The Err arm carries the event back; we deliberately
78        // discard it because "no subscribers" is not an error.
79        let _ = self.tx.send(event);
80    }
81
82    /// Subscribe a fresh receiver.
83    ///
84    /// Each call returns an independent [`Subscriber`] whose
85    /// queue starts empty; events published before the call are
86    /// not delivered to the new subscriber.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use dynomite::events::EventManager;
92    /// let mgr = EventManager::new(4);
93    /// let _a = mgr.subscribe();
94    /// let _b = mgr.subscribe();
95    /// assert_eq!(mgr.subscriber_count(), 2);
96    /// ```
97    #[must_use]
98    pub fn subscribe(&self) -> Subscriber {
99        Subscriber::new(self.tx.subscribe())
100    }
101
102    /// Number of attached subscribers.
103    #[must_use]
104    pub fn subscriber_count(&self) -> usize {
105        self.tx.receiver_count()
106    }
107}
108
109impl Default for EventManager {
110    /// Build a manager with a 64-event buffer.
111    ///
112    /// 64 matches the default capacity of [`crate::embed::events::EventBus`]
113    /// so embedders that swap one for the other do not see a
114    /// surprise change in lag behaviour.
115    fn default() -> Self {
116        Self::new(64)
117    }
118}