1use std::sync::Arc;
4
5use dashmap::DashMap;
6use freshblu_core::message::DeviceEvent;
7use tokio::sync::broadcast;
8use uuid::Uuid;
9
10const DEFAULT_CHANNEL_CAPACITY: usize = 256;
11
12pub type EventSender = broadcast::Sender<DeviceEvent>;
14
15#[derive(Clone)]
17pub struct MessageHub {
18 connections: Arc<DashMap<Uuid, EventSender>>,
20 channel_capacity: usize,
21}
22
23impl MessageHub {
24 pub fn new() -> Arc<Self> {
25 Arc::new(Self {
26 connections: Arc::new(DashMap::new()),
27 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
28 })
29 }
30
31 pub fn with_capacity(capacity: usize) -> Arc<Self> {
32 Arc::new(Self {
33 connections: Arc::new(DashMap::new()),
34 channel_capacity: capacity,
35 })
36 }
37
38 pub fn clone_inner(&self) -> Self {
41 self.clone()
42 }
43
44 pub fn connect(&self, uuid: Uuid) -> broadcast::Receiver<DeviceEvent> {
46 if let Some(sender) = self.connections.get(&uuid) {
48 return sender.subscribe();
49 }
50 let (tx, rx) = broadcast::channel(self.channel_capacity);
51 self.connections.insert(uuid, tx);
52 rx
53 }
54
55 pub fn disconnect(&self, uuid: &Uuid) {
57 self.connections.remove(uuid);
58 }
59
60 pub fn deliver(&self, uuid: &Uuid, event: DeviceEvent) {
62 if let Some(sender) = self.connections.get(uuid) {
63 let _ = sender.send(event);
64 }
65 }
66
67 pub fn deliver_many(&self, uuids: &[Uuid], event: DeviceEvent) {
69 for uuid in uuids {
70 self.deliver(uuid, event.clone());
71 }
72 }
73
74 pub fn is_online(&self, uuid: &Uuid) -> bool {
76 self.connections.contains_key(uuid)
77 }
78
79 pub fn online_count(&self) -> usize {
81 self.connections.len()
82 }
83
84 pub fn online_devices(&self) -> Vec<Uuid> {
86 self.connections.iter().map(|e| *e.key()).collect()
87 }
88}
89
90impl Default for MessageHub {
91 fn default() -> Self {
92 Self {
93 connections: Arc::new(DashMap::new()),
94 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
95 }
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use freshblu_core::message::DeviceEvent;
103 use uuid::Uuid;
104
105 fn test_event() -> DeviceEvent {
106 DeviceEvent::Unregistered {
107 uuid: Uuid::new_v4(),
108 }
109 }
110
111 #[test]
112 fn connect_and_deliver() {
113 let hub = MessageHub::new();
114 let uuid = Uuid::new_v4();
115 let mut rx = hub.connect(uuid);
116
117 let event = test_event();
118 hub.deliver(&uuid, event.clone());
119
120 let received = rx.try_recv().expect("should receive event");
121 assert_eq!(received, event);
122 }
123
124 #[test]
125 fn disconnect_removes() {
126 let hub = MessageHub::new();
127 let uuid = Uuid::new_v4();
128 let _rx = hub.connect(uuid);
129
130 assert!(hub.is_online(&uuid));
131 hub.disconnect(&uuid);
132 assert!(!hub.is_online(&uuid));
133 }
134
135 #[test]
136 fn deliver_to_offline_no_panic() {
137 let hub = MessageHub::new();
138 let uuid = Uuid::new_v4();
139 hub.deliver(&uuid, test_event());
141 }
142
143 #[test]
144 fn deliver_many() {
145 let hub = MessageHub::new();
146 let uuids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
147 let mut receivers: Vec<_> = uuids.iter().map(|u| hub.connect(*u)).collect();
148
149 let event = test_event();
150 hub.deliver_many(&uuids, event.clone());
151
152 for rx in receivers.iter_mut() {
153 let received = rx.try_recv().expect("should receive event");
154 assert_eq!(received, event);
155 }
156 }
157
158 #[test]
159 fn online_count() {
160 let hub = MessageHub::new();
161 let u1 = Uuid::new_v4();
162 let u2 = Uuid::new_v4();
163
164 let _rx1 = hub.connect(u1);
165 let _rx2 = hub.connect(u2);
166 assert_eq!(hub.online_count(), 2);
167
168 hub.disconnect(&u1);
169 assert_eq!(hub.online_count(), 1);
170 }
171
172 #[test]
173 fn multiple_subscribers_same_device() {
174 let hub = MessageHub::new();
175 let uuid = Uuid::new_v4();
176
177 let mut rx1 = hub.connect(uuid);
178 let mut rx2 = hub.connect(uuid); let event = test_event();
181 hub.deliver(&uuid, event.clone());
182
183 let r1 = rx1.try_recv().expect("rx1 should receive event");
184 let r2 = rx2.try_recv().expect("rx2 should receive event");
185 assert_eq!(r1, event);
186 assert_eq!(r2, event);
187 }
188}