Skip to main content

tork_core/
realtime.rs

1//! A small broadcast hub for fan-out messaging (chat rooms, live feeds).
2//!
3//! A [`Hub`] holds named [`Room`]s, each backed by a `tokio::sync::broadcast`
4//! channel: every message sent to a room reaches all of its current subscribers.
5//! A `Hub` is cheap to clone, so it is typically held as an injected resource.
6
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9
10use tokio::sync::broadcast;
11
12/// Default per-room channel capacity (buffered messages before lag).
13const DEFAULT_CAPACITY: usize = 256;
14
15/// A registry of broadcast [`Room`]s keyed by name.
16pub struct Hub<M> {
17    rooms: Arc<Mutex<HashMap<String, Arc<broadcast::Sender<M>>>>>,
18    capacity: usize,
19}
20
21impl<M> Clone for Hub<M> {
22    fn clone(&self) -> Self {
23        Self {
24            rooms: self.rooms.clone(),
25            capacity: self.capacity,
26        }
27    }
28}
29
30impl<M: Clone + Send + 'static> Default for Hub<M> {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl<M: Clone + Send + 'static> Hub<M> {
37    /// Creates an empty hub with the default room capacity.
38    pub fn new() -> Self {
39        Self::with_capacity(DEFAULT_CAPACITY)
40    }
41
42    /// Creates an empty hub whose rooms buffer up to `capacity` messages.
43    pub fn with_capacity(capacity: usize) -> Self {
44        Self {
45            rooms: Arc::new(Mutex::new(HashMap::new())),
46            capacity,
47        }
48    }
49
50    /// Returns the room with the given id, creating it if it does not exist.
51    ///
52    /// Creating a new room first evicts any rooms that have become dead — no
53    /// subscribers and no outstanding [`Room`] handles — so a server that mints
54    /// rooms with dynamic names (per user or session) does not grow unboundedly.
55    pub fn room(&self, id: impl Into<String>) -> Room<M> {
56        let id = id.into();
57        let mut rooms = self.rooms.lock().unwrap_or_else(|p| p.into_inner());
58        let sender = match rooms.get(&id) {
59            Some(sender) => Arc::clone(sender),
60            None => {
61                // A dead room has only the map's reference (strong_count == 1)
62                // and no receivers; keeping any with live handles or subscribers
63                // avoids dropping a room another task is about to use or send to.
64                rooms.retain(|_, s| Arc::strong_count(s) > 1 || s.receiver_count() > 0);
65                let sender = Arc::new(broadcast::channel(self.capacity).0);
66                rooms.insert(id, Arc::clone(&sender));
67                sender
68            }
69        };
70        Room { sender }
71    }
72
73    /// Returns the number of rooms currently held by the hub.
74    pub fn room_count(&self) -> usize {
75        self.rooms.lock().unwrap_or_else(|p| p.into_inner()).len()
76    }
77}
78
79/// A single broadcast room: send to all subscribers, or subscribe to receive.
80pub struct Room<M> {
81    sender: Arc<broadcast::Sender<M>>,
82}
83
84impl<M> Clone for Room<M> {
85    fn clone(&self) -> Self {
86        Self {
87            sender: self.sender.clone(),
88        }
89    }
90}
91
92impl<M: Clone + Send + 'static> Room<M> {
93    /// Subscribes to the room, receiving every subsequent broadcast.
94    pub fn subscribe(&self) -> broadcast::Receiver<M> {
95        self.sender.subscribe()
96    }
97
98    /// Broadcasts a message, returning the number of subscribers it reached.
99    pub fn broadcast(&self, message: M) -> usize {
100        self.sender.send(message).unwrap_or(0)
101    }
102
103    /// Returns the current number of subscribers.
104    pub fn subscribers(&self) -> usize {
105        self.sender.receiver_count()
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[tokio::test]
114    async fn broadcast_reaches_every_subscriber() {
115        let hub = Hub::<i32>::new();
116        let room = hub.room("general");
117        let mut first = room.subscribe();
118        let mut second = room.subscribe();
119
120        assert_eq!(room.subscribers(), 2);
121        assert_eq!(room.broadcast(42), 2);
122        assert_eq!(first.recv().await.unwrap(), 42);
123        assert_eq!(second.recv().await.unwrap(), 42);
124    }
125
126    #[tokio::test]
127    async fn the_same_id_returns_the_same_room() {
128        let hub = Hub::<i32>::new();
129        let mut receiver = hub.room("a").subscribe();
130        // A separate handle to the same room id shares the channel.
131        assert_eq!(hub.room("a").broadcast(7), 1);
132        assert_eq!(receiver.recv().await.unwrap(), 7);
133    }
134
135    #[test]
136    fn broadcast_with_no_subscribers_reaches_nobody() {
137        let hub = Hub::<i32>::new();
138        assert_eq!(hub.room("empty").broadcast(1), 0);
139    }
140
141    #[tokio::test]
142    async fn dead_rooms_are_evicted_when_a_new_room_is_created() {
143        let hub = Hub::<i32>::new();
144        {
145            let room = hub.room("a");
146            let _subscriber = room.subscribe();
147            assert_eq!(hub.room_count(), 1);
148        } // both the room handle and the receiver drop here, so "a" is now dead
149
150        // Creating a different room evicts the dead one first.
151        let _b = hub.room("b");
152        assert_eq!(
153            hub.room_count(),
154            1,
155            "the dead room should have been evicted"
156        );
157    }
158
159    #[tokio::test]
160    async fn rooms_with_live_handles_or_subscribers_are_kept() {
161        let hub = Hub::<i32>::new();
162
163        // A held room handle (a producer) keeps the room alive.
164        let producer = hub.room("kept-handle");
165        // A held subscriber keeps its room alive even without a handle.
166        let _subscriber = hub.room("kept-sub").subscribe();
167        // A room with neither is dead.
168        drop(hub.room("dead"));
169
170        // Creating a new room triggers eviction; only the dead one goes.
171        let _new = hub.room("new");
172        assert_eq!(hub.room_count(), 3, "kept-handle, kept-sub, and new remain");
173
174        // The producer's handle still addresses the same channel.
175        let mut rx = hub.room("kept-handle").subscribe();
176        assert_eq!(producer.broadcast(7), 1);
177        assert_eq!(rx.recv().await.unwrap(), 7);
178    }
179}