1use std::sync::Arc;
4
5use dashmap::DashMap;
6use freshblu_core::message::DeviceEvent;
7use tokio::sync::broadcast;
8use uuid::Uuid;
9
10const CHANNEL_CAPACITY: usize = 256;
11
12pub type EventSender = broadcast::Sender<DeviceEvent>;
14
15#[derive(Clone)]
17pub struct MessageHub {
18 connections: Arc<DashMap<Uuid, EventSender>>,
20}
21
22impl MessageHub {
23 pub fn new() -> Arc<Self> {
24 Arc::new(Self {
25 connections: Arc::new(DashMap::new()),
26 })
27 }
28
29 pub fn clone_inner(&self) -> Self {
32 self.clone()
33 }
34
35 pub fn connect(&self, uuid: Uuid) -> broadcast::Receiver<DeviceEvent> {
37 if let Some(sender) = self.connections.get(&uuid) {
39 return sender.subscribe();
40 }
41 let (tx, rx) = broadcast::channel(CHANNEL_CAPACITY);
42 self.connections.insert(uuid, tx);
43 rx
44 }
45
46 pub fn disconnect(&self, uuid: &Uuid) {
48 self.connections.remove(uuid);
49 }
50
51 pub fn deliver(&self, uuid: &Uuid, event: DeviceEvent) {
53 if let Some(sender) = self.connections.get(uuid) {
54 let _ = sender.send(event);
55 }
56 }
57
58 pub fn deliver_many(&self, uuids: &[Uuid], event: DeviceEvent) {
60 for uuid in uuids {
61 self.deliver(uuid, event.clone());
62 }
63 }
64
65 pub fn is_online(&self, uuid: &Uuid) -> bool {
67 self.connections.contains_key(uuid)
68 }
69
70 pub fn online_count(&self) -> usize {
72 self.connections.len()
73 }
74
75 pub fn online_devices(&self) -> Vec<Uuid> {
77 self.connections.iter().map(|e| *e.key()).collect()
78 }
79}
80
81impl Default for MessageHub {
82 fn default() -> Self {
83 Self {
84 connections: Arc::new(DashMap::new()),
85 }
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use freshblu_core::message::DeviceEvent;
93 use uuid::Uuid;
94
95 fn test_event() -> DeviceEvent {
96 DeviceEvent::Unregistered {
97 uuid: Uuid::new_v4(),
98 }
99 }
100
101 #[test]
102 fn connect_and_deliver() {
103 let hub = MessageHub::new();
104 let uuid = Uuid::new_v4();
105 let mut rx = hub.connect(uuid);
106
107 let event = test_event();
108 hub.deliver(&uuid, event.clone());
109
110 let received = rx.try_recv().expect("should receive event");
111 assert_eq!(received, event);
112 }
113
114 #[test]
115 fn disconnect_removes() {
116 let hub = MessageHub::new();
117 let uuid = Uuid::new_v4();
118 let _rx = hub.connect(uuid);
119
120 assert!(hub.is_online(&uuid));
121 hub.disconnect(&uuid);
122 assert!(!hub.is_online(&uuid));
123 }
124
125 #[test]
126 fn deliver_to_offline_no_panic() {
127 let hub = MessageHub::new();
128 let uuid = Uuid::new_v4();
129 hub.deliver(&uuid, test_event());
131 }
132
133 #[test]
134 fn deliver_many() {
135 let hub = MessageHub::new();
136 let uuids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
137 let mut receivers: Vec<_> = uuids.iter().map(|u| hub.connect(*u)).collect();
138
139 let event = test_event();
140 hub.deliver_many(&uuids, event.clone());
141
142 for rx in receivers.iter_mut() {
143 let received = rx.try_recv().expect("should receive event");
144 assert_eq!(received, event);
145 }
146 }
147
148 #[test]
149 fn online_count() {
150 let hub = MessageHub::new();
151 let u1 = Uuid::new_v4();
152 let u2 = Uuid::new_v4();
153
154 let _rx1 = hub.connect(u1);
155 let _rx2 = hub.connect(u2);
156 assert_eq!(hub.online_count(), 2);
157
158 hub.disconnect(&u1);
159 assert_eq!(hub.online_count(), 1);
160 }
161
162 #[test]
163 fn multiple_subscribers_same_device() {
164 let hub = MessageHub::new();
165 let uuid = Uuid::new_v4();
166
167 let mut rx1 = hub.connect(uuid);
168 let mut rx2 = hub.connect(uuid); let event = test_event();
171 hub.deliver(&uuid, event.clone());
172
173 let r1 = rx1.try_recv().expect("rx1 should receive event");
174 let r2 = rx2.try_recv().expect("rx2 should receive event");
175 assert_eq!(r1, event);
176 assert_eq!(r2, event);
177 }
178}