Skip to main content

sentinel_driver/notify/
channel.rs

1use std::collections::HashSet;
2
3use tokio::sync::broadcast;
4
5use crate::notify::Notification;
6
7/// Capacity for the broadcast channel.
8const DEFAULT_CAPACITY: usize = 256;
9
10/// A notification dispatcher that routes PG notifications to subscribers.
11///
12/// Wraps a tokio broadcast channel. Multiple receivers can subscribe,
13/// and each receives all notifications. Used internally by the listener
14/// to fan out notifications from a single dedicated connection.
15pub struct NotificationDispatcher {
16    sender: broadcast::Sender<Notification>,
17    channels: HashSet<String>,
18}
19
20impl NotificationDispatcher {
21    /// Create a new dispatcher with the default buffer capacity.
22    pub fn new() -> Self {
23        Self::with_capacity(DEFAULT_CAPACITY)
24    }
25
26    /// Create a new dispatcher with a specific buffer capacity.
27    pub fn with_capacity(capacity: usize) -> Self {
28        let (sender, _) = broadcast::channel(capacity);
29        Self {
30            sender,
31            channels: HashSet::new(),
32        }
33    }
34
35    /// Subscribe to receive notifications.
36    ///
37    /// Returns a receiver that will get all notifications dispatched
38    /// after this point.
39    pub fn subscribe(&self) -> NotificationReceiver {
40        NotificationReceiver {
41            receiver: self.sender.subscribe(),
42        }
43    }
44
45    /// Dispatch a notification to all subscribers.
46    ///
47    /// Returns the number of receivers that got the message.
48    /// Returns 0 if there are no active subscribers (which is fine).
49    pub fn dispatch(&self, notification: Notification) -> usize {
50        self.sender.send(notification).unwrap_or(0)
51    }
52
53    /// Track that we're listening on a channel.
54    pub fn add_channel(&mut self, channel: String) {
55        self.channels.insert(channel);
56    }
57
58    /// Remove a tracked channel.
59    pub fn remove_channel(&mut self, channel: &str) {
60        self.channels.remove(channel);
61    }
62
63    /// Get all tracked channel names (for re-subscribing on reconnect).
64    pub fn channels(&self) -> &HashSet<String> {
65        &self.channels
66    }
67
68    /// Number of active subscribers.
69    pub fn subscriber_count(&self) -> usize {
70        self.sender.receiver_count()
71    }
72}
73
74impl Default for NotificationDispatcher {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80/// A receiver for PG notifications.
81pub struct NotificationReceiver {
82    receiver: broadcast::Receiver<Notification>,
83}
84
85impl NotificationReceiver {
86    /// Wait for the next notification.
87    ///
88    /// Returns `None` if the dispatcher has been dropped (no more notifications).
89    /// Skips over lagged messages (if the receiver falls behind).
90    pub async fn recv(&mut self) -> Option<Notification> {
91        loop {
92            match self.receiver.recv().await {
93                Ok(notification) => return Some(notification),
94                Err(broadcast::error::RecvError::Lagged(n)) => {
95                    tracing::warn!(count = n, "notification receiver lagged, skipped messages");
96                }
97                Err(broadcast::error::RecvError::Closed) => return None,
98            }
99        }
100    }
101}