Skip to main content

freshblu_server/
hub.rs

1/// The MessageHub routes events to connected WebSocket/MQTT clients.
2/// It holds a map of connected device connections and delivers events.
3use std::sync::Arc;
4
5use dashmap::DashMap;
6use freshblu_core::message::DeviceEvent;
7use tokio::sync::broadcast;
8use uuid::Uuid;
9
10const CHANNEL_CAPACITY: usize = 256;
11
12/// A sender handle for a connected device
13pub type EventSender = broadcast::Sender<DeviceEvent>;
14
15/// The central message hub
16#[derive(Clone)]
17pub struct MessageHub {
18    /// Map of connected device UUID -> broadcast sender
19    connections: Arc<DashMap<Uuid, EventSender>>,
20}
21
22impl MessageHub {
23    pub fn new() -> Arc<Self> {
24        Arc::new(Self {
25            connections: Arc::new(DashMap::new()),
26        })
27    }
28
29    /// Get a clone of this hub that shares the same connection map.
30    /// Used by NatsBus to give its delivery listener access to connected devices.
31    pub fn clone_inner(&self) -> Self {
32        self.clone()
33    }
34
35    /// Register a new connection, returns a receiver
36    pub fn connect(&self, uuid: Uuid) -> broadcast::Receiver<DeviceEvent> {
37        // If already connected, just subscribe to existing channel
38        if let Some(sender) = self.connections.get(&uuid) {
39            return sender.subscribe();
40        }
41        let (tx, rx) = broadcast::channel(CHANNEL_CAPACITY);
42        self.connections.insert(uuid, tx);
43        rx
44    }
45
46    /// Disconnect a device
47    pub fn disconnect(&self, uuid: &Uuid) {
48        self.connections.remove(uuid);
49    }
50
51    /// Deliver an event to a specific device
52    pub fn deliver(&self, uuid: &Uuid, event: DeviceEvent) {
53        if let Some(sender) = self.connections.get(uuid) {
54            let _ = sender.send(event);
55        }
56    }
57
58    /// Deliver an event to multiple devices
59    pub fn deliver_many(&self, uuids: &[Uuid], event: DeviceEvent) {
60        for uuid in uuids {
61            self.deliver(uuid, event.clone());
62        }
63    }
64
65    /// Check if a device is currently online (has an active connection)
66    pub fn is_online(&self, uuid: &Uuid) -> bool {
67        self.connections.contains_key(uuid)
68    }
69
70    /// Get count of online devices
71    pub fn online_count(&self) -> usize {
72        self.connections.len()
73    }
74
75    /// Get all online device UUIDs
76    pub fn online_devices(&self) -> Vec<Uuid> {
77        self.connections.iter().map(|e| *e.key()).collect()
78    }
79}
80
81impl Default for MessageHub {
82    fn default() -> Self {
83        Self {
84            connections: Arc::new(DashMap::new()),
85        }
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use freshblu_core::message::DeviceEvent;
93    use uuid::Uuid;
94
95    fn test_event() -> DeviceEvent {
96        DeviceEvent::Unregistered {
97            uuid: Uuid::new_v4(),
98        }
99    }
100
101    #[test]
102    fn connect_and_deliver() {
103        let hub = MessageHub::new();
104        let uuid = Uuid::new_v4();
105        let mut rx = hub.connect(uuid);
106
107        let event = test_event();
108        hub.deliver(&uuid, event.clone());
109
110        let received = rx.try_recv().expect("should receive event");
111        assert_eq!(received, event);
112    }
113
114    #[test]
115    fn disconnect_removes() {
116        let hub = MessageHub::new();
117        let uuid = Uuid::new_v4();
118        let _rx = hub.connect(uuid);
119
120        assert!(hub.is_online(&uuid));
121        hub.disconnect(&uuid);
122        assert!(!hub.is_online(&uuid));
123    }
124
125    #[test]
126    fn deliver_to_offline_no_panic() {
127        let hub = MessageHub::new();
128        let uuid = Uuid::new_v4();
129        // Delivering to a non-existent UUID should not panic
130        hub.deliver(&uuid, test_event());
131    }
132
133    #[test]
134    fn deliver_many() {
135        let hub = MessageHub::new();
136        let uuids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
137        let mut receivers: Vec<_> = uuids.iter().map(|u| hub.connect(*u)).collect();
138
139        let event = test_event();
140        hub.deliver_many(&uuids, event.clone());
141
142        for rx in receivers.iter_mut() {
143            let received = rx.try_recv().expect("should receive event");
144            assert_eq!(received, event);
145        }
146    }
147
148    #[test]
149    fn online_count() {
150        let hub = MessageHub::new();
151        let u1 = Uuid::new_v4();
152        let u2 = Uuid::new_v4();
153
154        let _rx1 = hub.connect(u1);
155        let _rx2 = hub.connect(u2);
156        assert_eq!(hub.online_count(), 2);
157
158        hub.disconnect(&u1);
159        assert_eq!(hub.online_count(), 1);
160    }
161
162    #[test]
163    fn multiple_subscribers_same_device() {
164        let hub = MessageHub::new();
165        let uuid = Uuid::new_v4();
166
167        let mut rx1 = hub.connect(uuid);
168        let mut rx2 = hub.connect(uuid); // second subscriber to the same channel
169
170        let event = test_event();
171        hub.deliver(&uuid, event.clone());
172
173        let r1 = rx1.try_recv().expect("rx1 should receive event");
174        let r2 = rx2.try_recv().expect("rx2 should receive event");
175        assert_eq!(r1, event);
176        assert_eq!(r2, event);
177    }
178}