1use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9
10use tokio::sync::broadcast;
11
12const DEFAULT_CAPACITY: usize = 256;
14
15pub struct Hub<M> {
17 rooms: Arc<Mutex<HashMap<String, Arc<broadcast::Sender<M>>>>>,
18 capacity: usize,
19}
20
21impl<M> Clone for Hub<M> {
22 fn clone(&self) -> Self {
23 Self {
24 rooms: self.rooms.clone(),
25 capacity: self.capacity,
26 }
27 }
28}
29
30impl<M: Clone + Send + 'static> Default for Hub<M> {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl<M: Clone + Send + 'static> Hub<M> {
37 pub fn new() -> Self {
39 Self::with_capacity(DEFAULT_CAPACITY)
40 }
41
42 pub fn with_capacity(capacity: usize) -> Self {
44 Self {
45 rooms: Arc::new(Mutex::new(HashMap::new())),
46 capacity,
47 }
48 }
49
50 pub fn room(&self, id: impl Into<String>) -> Room<M> {
56 let id = id.into();
57 let mut rooms = self.rooms.lock().unwrap_or_else(|p| p.into_inner());
58 let sender = match rooms.get(&id) {
59 Some(sender) => Arc::clone(sender),
60 None => {
61 rooms.retain(|_, s| Arc::strong_count(s) > 1 || s.receiver_count() > 0);
65 let sender = Arc::new(broadcast::channel(self.capacity).0);
66 rooms.insert(id, Arc::clone(&sender));
67 sender
68 }
69 };
70 Room { sender }
71 }
72
73 pub fn room_count(&self) -> usize {
75 self.rooms.lock().unwrap_or_else(|p| p.into_inner()).len()
76 }
77}
78
79pub struct Room<M> {
81 sender: Arc<broadcast::Sender<M>>,
82}
83
84impl<M> Clone for Room<M> {
85 fn clone(&self) -> Self {
86 Self {
87 sender: self.sender.clone(),
88 }
89 }
90}
91
92impl<M: Clone + Send + 'static> Room<M> {
93 pub fn subscribe(&self) -> broadcast::Receiver<M> {
95 self.sender.subscribe()
96 }
97
98 pub fn broadcast(&self, message: M) -> usize {
100 self.sender.send(message).unwrap_or(0)
101 }
102
103 pub fn subscribers(&self) -> usize {
105 self.sender.receiver_count()
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[tokio::test]
114 async fn broadcast_reaches_every_subscriber() {
115 let hub = Hub::<i32>::new();
116 let room = hub.room("general");
117 let mut first = room.subscribe();
118 let mut second = room.subscribe();
119
120 assert_eq!(room.subscribers(), 2);
121 assert_eq!(room.broadcast(42), 2);
122 assert_eq!(first.recv().await.unwrap(), 42);
123 assert_eq!(second.recv().await.unwrap(), 42);
124 }
125
126 #[tokio::test]
127 async fn the_same_id_returns_the_same_room() {
128 let hub = Hub::<i32>::new();
129 let mut receiver = hub.room("a").subscribe();
130 assert_eq!(hub.room("a").broadcast(7), 1);
132 assert_eq!(receiver.recv().await.unwrap(), 7);
133 }
134
135 #[test]
136 fn broadcast_with_no_subscribers_reaches_nobody() {
137 let hub = Hub::<i32>::new();
138 assert_eq!(hub.room("empty").broadcast(1), 0);
139 }
140
141 #[tokio::test]
142 async fn dead_rooms_are_evicted_when_a_new_room_is_created() {
143 let hub = Hub::<i32>::new();
144 {
145 let room = hub.room("a");
146 let _subscriber = room.subscribe();
147 assert_eq!(hub.room_count(), 1);
148 } let _b = hub.room("b");
152 assert_eq!(
153 hub.room_count(),
154 1,
155 "the dead room should have been evicted"
156 );
157 }
158
159 #[tokio::test]
160 async fn rooms_with_live_handles_or_subscribers_are_kept() {
161 let hub = Hub::<i32>::new();
162
163 let producer = hub.room("kept-handle");
165 let _subscriber = hub.room("kept-sub").subscribe();
167 drop(hub.room("dead"));
169
170 let _new = hub.room("new");
172 assert_eq!(hub.room_count(), 3, "kept-handle, kept-sub, and new remain");
173
174 let mut rx = hub.room("kept-handle").subscribe();
176 assert_eq!(producer.broadcast(7), 1);
177 assert_eq!(rx.recv().await.unwrap(), 7);
178 }
179}