Skip to main content

freshblu_server/
hub.rs

1/// The MessageHub routes events to connected WebSocket/MQTT clients.
2/// It holds a map of connected device connections and delivers events.
3use std::sync::Arc;
4
5use dashmap::DashMap;
6use freshblu_core::message::DeviceEvent;
7use tokio::sync::broadcast;
8use uuid::Uuid;
9
10const DEFAULT_CHANNEL_CAPACITY: usize = 256;
11
12/// A sender handle for a connected device
13pub type EventSender = broadcast::Sender<DeviceEvent>;
14
15/// The central message hub
16#[derive(Clone)]
17pub struct MessageHub {
18    /// Map of connected device UUID -> broadcast sender
19    connections: Arc<DashMap<Uuid, EventSender>>,
20    channel_capacity: usize,
21}
22
23impl MessageHub {
24    pub fn new() -> Arc<Self> {
25        Arc::new(Self {
26            connections: Arc::new(DashMap::new()),
27            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
28        })
29    }
30
31    pub fn with_capacity(capacity: usize) -> Arc<Self> {
32        Arc::new(Self {
33            connections: Arc::new(DashMap::new()),
34            channel_capacity: capacity,
35        })
36    }
37
38    /// Get a clone of this hub that shares the same connection map.
39    /// Used by NatsBus to give its delivery listener access to connected devices.
40    pub fn clone_inner(&self) -> Self {
41        self.clone()
42    }
43
44    /// Register a new connection, returns a receiver
45    pub fn connect(&self, uuid: Uuid) -> broadcast::Receiver<DeviceEvent> {
46        // If already connected, just subscribe to existing channel
47        if let Some(sender) = self.connections.get(&uuid) {
48            return sender.subscribe();
49        }
50        let (tx, rx) = broadcast::channel(self.channel_capacity);
51        self.connections.insert(uuid, tx);
52        rx
53    }
54
55    /// Disconnect a device
56    pub fn disconnect(&self, uuid: &Uuid) {
57        self.connections.remove(uuid);
58    }
59
60    /// Deliver an event to a specific device
61    pub fn deliver(&self, uuid: &Uuid, event: DeviceEvent) {
62        if let Some(sender) = self.connections.get(uuid) {
63            let _ = sender.send(event);
64        }
65    }
66
67    /// Deliver an event to multiple devices
68    pub fn deliver_many(&self, uuids: &[Uuid], event: DeviceEvent) {
69        for uuid in uuids {
70            self.deliver(uuid, event.clone());
71        }
72    }
73
74    /// Check if a device is currently online (has an active connection)
75    pub fn is_online(&self, uuid: &Uuid) -> bool {
76        self.connections.contains_key(uuid)
77    }
78
79    /// Get count of online devices
80    pub fn online_count(&self) -> usize {
81        self.connections.len()
82    }
83
84    /// Get all online device UUIDs
85    pub fn online_devices(&self) -> Vec<Uuid> {
86        self.connections.iter().map(|e| *e.key()).collect()
87    }
88}
89
90impl Default for MessageHub {
91    fn default() -> Self {
92        Self {
93            connections: Arc::new(DashMap::new()),
94            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use freshblu_core::message::DeviceEvent;
103    use uuid::Uuid;
104
105    fn test_event() -> DeviceEvent {
106        DeviceEvent::Unregistered {
107            uuid: Uuid::new_v4(),
108        }
109    }
110
111    #[test]
112    fn connect_and_deliver() {
113        let hub = MessageHub::new();
114        let uuid = Uuid::new_v4();
115        let mut rx = hub.connect(uuid);
116
117        let event = test_event();
118        hub.deliver(&uuid, event.clone());
119
120        let received = rx.try_recv().expect("should receive event");
121        assert_eq!(received, event);
122    }
123
124    #[test]
125    fn disconnect_removes() {
126        let hub = MessageHub::new();
127        let uuid = Uuid::new_v4();
128        let _rx = hub.connect(uuid);
129
130        assert!(hub.is_online(&uuid));
131        hub.disconnect(&uuid);
132        assert!(!hub.is_online(&uuid));
133    }
134
135    #[test]
136    fn deliver_to_offline_no_panic() {
137        let hub = MessageHub::new();
138        let uuid = Uuid::new_v4();
139        // Delivering to a non-existent UUID should not panic
140        hub.deliver(&uuid, test_event());
141    }
142
143    #[test]
144    fn deliver_many() {
145        let hub = MessageHub::new();
146        let uuids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
147        let mut receivers: Vec<_> = uuids.iter().map(|u| hub.connect(*u)).collect();
148
149        let event = test_event();
150        hub.deliver_many(&uuids, event.clone());
151
152        for rx in receivers.iter_mut() {
153            let received = rx.try_recv().expect("should receive event");
154            assert_eq!(received, event);
155        }
156    }
157
158    #[test]
159    fn online_count() {
160        let hub = MessageHub::new();
161        let u1 = Uuid::new_v4();
162        let u2 = Uuid::new_v4();
163
164        let _rx1 = hub.connect(u1);
165        let _rx2 = hub.connect(u2);
166        assert_eq!(hub.online_count(), 2);
167
168        hub.disconnect(&u1);
169        assert_eq!(hub.online_count(), 1);
170    }
171
172    #[test]
173    fn multiple_subscribers_same_device() {
174        let hub = MessageHub::new();
175        let uuid = Uuid::new_v4();
176
177        let mut rx1 = hub.connect(uuid);
178        let mut rx2 = hub.connect(uuid); // second subscriber to the same channel
179
180        let event = test_event();
181        hub.deliver(&uuid, event.clone());
182
183        let r1 = rx1.try_recv().expect("rx1 should receive event");
184        let r2 = rx2.try_recv().expect("rx2 should receive event");
185        assert_eq!(r1, event);
186        assert_eq!(r2, event);
187    }
188}