sentinel_driver/notify/
channel.rs1use std::collections::HashSet;
2
3use tokio::sync::broadcast;
4
5use crate::notify::Notification;
6
7const DEFAULT_CAPACITY: usize = 256;
9
10pub struct NotificationDispatcher {
16 sender: broadcast::Sender<Notification>,
17 channels: HashSet<String>,
18}
19
20impl NotificationDispatcher {
21 pub fn new() -> Self {
23 Self::with_capacity(DEFAULT_CAPACITY)
24 }
25
26 pub fn with_capacity(capacity: usize) -> Self {
28 let (sender, _) = broadcast::channel(capacity);
29 Self {
30 sender,
31 channels: HashSet::new(),
32 }
33 }
34
35 pub fn subscribe(&self) -> NotificationReceiver {
40 NotificationReceiver {
41 receiver: self.sender.subscribe(),
42 }
43 }
44
45 pub fn dispatch(&self, notification: Notification) -> usize {
50 self.sender.send(notification).unwrap_or(0)
51 }
52
53 pub fn add_channel(&mut self, channel: String) {
55 self.channels.insert(channel);
56 }
57
58 pub fn remove_channel(&mut self, channel: &str) {
60 self.channels.remove(channel);
61 }
62
63 pub fn channels(&self) -> &HashSet<String> {
65 &self.channels
66 }
67
68 pub fn subscriber_count(&self) -> usize {
70 self.sender.receiver_count()
71 }
72}
73
74impl Default for NotificationDispatcher {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80pub struct NotificationReceiver {
82 receiver: broadcast::Receiver<Notification>,
83}
84
85impl NotificationReceiver {
86 pub async fn recv(&mut self) -> Option<Notification> {
91 loop {
92 match self.receiver.recv().await {
93 Ok(notification) => return Some(notification),
94 Err(broadcast::error::RecvError::Lagged(n)) => {
95 tracing::warn!(count = n, "notification receiver lagged, skipped messages");
96 }
97 Err(broadcast::error::RecvError::Closed) => return None,
98 }
99 }
100 }
101}