use std::sync::Arc;
use dashmap::DashMap;
use freshblu_core::message::DeviceEvent;
use tokio::sync::broadcast;
use uuid::Uuid;
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
pub type EventSender = broadcast::Sender<DeviceEvent>;
#[derive(Clone)]
pub struct MessageHub {
connections: Arc<DashMap<Uuid, EventSender>>,
channel_capacity: usize,
}
impl MessageHub {
pub fn new() -> Arc<Self> {
Arc::new(Self {
connections: Arc::new(DashMap::new()),
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
})
}
pub fn with_capacity(capacity: usize) -> Arc<Self> {
Arc::new(Self {
connections: Arc::new(DashMap::new()),
channel_capacity: capacity,
})
}
pub fn clone_inner(&self) -> Self {
self.clone()
}
pub fn connect(&self, uuid: Uuid) -> broadcast::Receiver<DeviceEvent> {
if let Some(sender) = self.connections.get(&uuid) {
return sender.subscribe();
}
let (tx, rx) = broadcast::channel(self.channel_capacity);
self.connections.insert(uuid, tx);
rx
}
pub fn disconnect(&self, uuid: &Uuid) {
self.connections.remove(uuid);
}
pub fn deliver(&self, uuid: &Uuid, event: DeviceEvent) {
if let Some(sender) = self.connections.get(uuid) {
let _ = sender.send(event);
}
}
pub fn deliver_many(&self, uuids: &[Uuid], event: DeviceEvent) {
for uuid in uuids {
self.deliver(uuid, event.clone());
}
}
pub fn is_online(&self, uuid: &Uuid) -> bool {
self.connections.contains_key(uuid)
}
pub fn online_count(&self) -> usize {
self.connections.len()
}
pub fn online_devices(&self) -> Vec<Uuid> {
self.connections.iter().map(|e| *e.key()).collect()
}
}
impl Default for MessageHub {
fn default() -> Self {
Self {
connections: Arc::new(DashMap::new()),
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use freshblu_core::message::DeviceEvent;
use uuid::Uuid;
fn test_event() -> DeviceEvent {
DeviceEvent::Unregistered {
uuid: Uuid::new_v4(),
}
}
#[test]
fn connect_and_deliver() {
let hub = MessageHub::new();
let uuid = Uuid::new_v4();
let mut rx = hub.connect(uuid);
let event = test_event();
hub.deliver(&uuid, event.clone());
let received = rx.try_recv().expect("should receive event");
assert_eq!(received, event);
}
#[test]
fn disconnect_removes() {
let hub = MessageHub::new();
let uuid = Uuid::new_v4();
let _rx = hub.connect(uuid);
assert!(hub.is_online(&uuid));
hub.disconnect(&uuid);
assert!(!hub.is_online(&uuid));
}
#[test]
fn deliver_to_offline_no_panic() {
let hub = MessageHub::new();
let uuid = Uuid::new_v4();
hub.deliver(&uuid, test_event());
}
#[test]
fn deliver_many() {
let hub = MessageHub::new();
let uuids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
let mut receivers: Vec<_> = uuids.iter().map(|u| hub.connect(*u)).collect();
let event = test_event();
hub.deliver_many(&uuids, event.clone());
for rx in receivers.iter_mut() {
let received = rx.try_recv().expect("should receive event");
assert_eq!(received, event);
}
}
#[test]
fn online_count() {
let hub = MessageHub::new();
let u1 = Uuid::new_v4();
let u2 = Uuid::new_v4();
let _rx1 = hub.connect(u1);
let _rx2 = hub.connect(u2);
assert_eq!(hub.online_count(), 2);
hub.disconnect(&u1);
assert_eq!(hub.online_count(), 1);
}
#[test]
fn multiple_subscribers_same_device() {
let hub = MessageHub::new();
let uuid = Uuid::new_v4();
let mut rx1 = hub.connect(uuid);
let mut rx2 = hub.connect(uuid);
let event = test_event();
hub.deliver(&uuid, event.clone());
let r1 = rx1.try_recv().expect("rx1 should receive event");
let r2 = rx2.try_recv().expect("rx2 should receive event");
assert_eq!(r1, event);
assert_eq!(r2, event);
}
}