1use cloudillo_types::types::TnId;
7use cloudillo_types::utils::random_id;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{broadcast, RwLock};
12
13#[derive(Clone, Debug)]
15pub struct BroadcastMessage {
16 pub id: String,
17 pub cmd: String,
18 pub data: Value,
19 pub sender: String,
20 pub timestamp: u64,
21}
22
23impl BroadcastMessage {
24 pub fn new(cmd: impl Into<String>, data: Value, sender: impl Into<String>) -> Self {
26 Self {
27 id: random_id().unwrap_or_default(),
28 cmd: cmd.into(),
29 data,
30 sender: sender.into(),
31 timestamp: now_timestamp(),
32 }
33 }
34}
35
36#[derive(Debug)]
38pub struct UserConnection {
39 pub id_tag: Box<str>,
41 pub tn_id: TnId,
43 pub connection_id: Box<str>,
45 pub connected_at: u64,
47 sender: broadcast::Sender<BroadcastMessage>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum DeliveryResult {
54 Delivered(usize),
56 UserOffline,
58}
59
60#[derive(Debug, Clone)]
62pub struct UserRegistryStats {
63 pub online_users: usize,
65 pub total_connections: usize,
67 pub users_per_tenant: HashMap<TnId, usize>,
69}
70
71type UserRegistryMap = HashMap<TnId, HashMap<Box<str>, Vec<UserConnection>>>;
73
74#[derive(Clone, Debug)]
76pub struct BroadcastConfig {
77 pub buffer_size: usize,
79}
80
81impl Default for BroadcastConfig {
82 fn default() -> Self {
83 Self { buffer_size: 128 }
85 }
86}
87
88pub struct BroadcastManager {
90 users: Arc<RwLock<UserRegistryMap>>,
92 config: BroadcastConfig,
93}
94
95impl BroadcastManager {
96 pub fn new() -> Self {
98 Self::with_config(BroadcastConfig::default())
99 }
100
101 pub fn with_config(config: BroadcastConfig) -> Self {
103 Self { users: Arc::new(RwLock::new(HashMap::new())), config }
104 }
105
106 pub async fn register_user(
112 &self,
113 tn_id: TnId,
114 id_tag: &str,
115 connection_id: &str,
116 ) -> broadcast::Receiver<BroadcastMessage> {
117 let (sender, receiver) = broadcast::channel(self.config.buffer_size);
118
119 let connection = UserConnection {
120 id_tag: id_tag.into(),
121 tn_id,
122 connection_id: connection_id.into(),
123 connected_at: now_timestamp(),
124 sender,
125 };
126
127 let mut users = self.users.write().await;
128 users
129 .entry(tn_id)
130 .or_default()
131 .entry(id_tag.into())
132 .or_default()
133 .push(connection);
134
135 tracing::debug!(tn_id = ?tn_id, id_tag = %id_tag, connection_id = %connection_id, "User registered");
136 receiver
137 }
138
139 pub async fn unregister_user(&self, tn_id: TnId, id_tag: &str, connection_id: &str) {
144 let mut users = self.users.write().await;
145
146 if let Some(tenant_users) = users.get_mut(&tn_id) {
147 if let Some(connections) = tenant_users.get_mut(id_tag) {
148 connections.retain(|conn| conn.connection_id.as_ref() != connection_id);
149
150 if connections.is_empty() {
152 tenant_users.remove(id_tag);
153 }
154 }
155
156 if tenant_users.is_empty() {
158 users.remove(&tn_id);
159 }
160 }
161
162 tracing::debug!(tn_id = ?tn_id, id_tag = %id_tag, connection_id = %connection_id, "User unregistered");
163 }
164
165 pub async fn send_to_user(
172 &self,
173 tn_id: TnId,
174 id_tag: &str,
175 msg: BroadcastMessage,
176 ) -> DeliveryResult {
177 let users = self.users.read().await;
178
179 if let Some(tenant_users) = users.get(&tn_id) {
180 if let Some(connections) = tenant_users.get(id_tag) {
181 let mut delivered = 0;
182 for conn in connections {
183 if conn.sender.send(msg.clone()).is_ok() {
184 delivered += 1;
185 }
186 }
187 if delivered > 0 {
188 return DeliveryResult::Delivered(delivered);
189 }
190 }
191 }
192
193 DeliveryResult::UserOffline
194 }
195
196 pub async fn send_to_tenant(&self, tn_id: TnId, msg: BroadcastMessage) -> usize {
201 let users = self.users.read().await;
202
203 let mut delivered = 0;
204 if let Some(tenant_users) = users.get(&tn_id) {
205 for connections in tenant_users.values() {
206 for conn in connections {
207 if conn.sender.send(msg.clone()).is_ok() {
208 delivered += 1;
209 }
210 }
211 }
212 }
213 delivered
214 }
215
216 pub async fn is_user_online(&self, tn_id: TnId, id_tag: &str) -> bool {
218 let users = self.users.read().await;
219
220 users
221 .get(&tn_id)
222 .and_then(|tenant_users| tenant_users.get(id_tag))
223 .is_some_and(|connections| !connections.is_empty())
224 }
225
226 pub async fn online_users(&self, tn_id: TnId) -> Vec<Box<str>> {
228 let users = self.users.read().await;
229
230 users
231 .get(&tn_id)
232 .map(|tenant_users| tenant_users.keys().cloned().collect())
233 .unwrap_or_default()
234 }
235
236 pub async fn user_stats(&self) -> UserRegistryStats {
238 let users = self.users.read().await;
239
240 let mut online_users = 0;
241 let mut total_connections = 0;
242 let mut users_per_tenant = HashMap::new();
243
244 for (tn_id, tenant_users) in users.iter() {
245 let tenant_user_count = tenant_users.len();
246 online_users += tenant_user_count;
247 users_per_tenant.insert(*tn_id, tenant_user_count);
248
249 for connections in tenant_users.values() {
250 total_connections += connections.len();
251 }
252 }
253
254 UserRegistryStats { online_users, total_connections, users_per_tenant }
255 }
256
257 pub async fn cleanup_users(&self) {
259 let mut users = self.users.write().await;
260
261 for tenant_users in users.values_mut() {
262 for connections in tenant_users.values_mut() {
263 connections.retain(|conn| conn.sender.receiver_count() > 0);
264 }
265 tenant_users.retain(|_, connections| !connections.is_empty());
266 }
267 users.retain(|_, tenant_users| !tenant_users.is_empty());
268 }
269}
270
271impl Default for BroadcastManager {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277fn now_timestamp() -> u64 {
279 std::time::SystemTime::now()
280 .duration_since(std::time::UNIX_EPOCH)
281 .unwrap_or_default()
282 .as_secs()
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[tokio::test]
290 async fn test_register_user() {
291 let manager = BroadcastManager::new();
292 let tn_id = TnId(1);
293
294 let _rx = manager.register_user(tn_id, "alice", "conn-1").await;
295
296 assert!(manager.is_user_online(tn_id, "alice").await);
297 assert!(!manager.is_user_online(tn_id, "bob").await);
298
299 let stats = manager.user_stats().await;
300 assert_eq!(stats.online_users, 1);
301 assert_eq!(stats.total_connections, 1);
302 }
303
304 #[tokio::test]
305 async fn test_multiple_connections_per_user() {
306 let manager = BroadcastManager::new();
307 let tn_id = TnId(1);
308
309 let _rx1 = manager.register_user(tn_id, "alice", "conn-1").await;
310 let _rx2 = manager.register_user(tn_id, "alice", "conn-2").await;
311
312 let stats = manager.user_stats().await;
313 assert_eq!(stats.online_users, 1);
314 assert_eq!(stats.total_connections, 2);
315 }
316
317 #[tokio::test]
318 async fn test_send_to_user() {
319 let manager = BroadcastManager::new();
320 let tn_id = TnId(1);
321
322 let mut rx = manager.register_user(tn_id, "alice", "conn-1").await;
323
324 let msg = BroadcastMessage::new("ACTION", serde_json::json!({ "type": "MSG" }), "system");
325 let result = manager.send_to_user(tn_id, "alice", msg).await;
326
327 assert_eq!(result, DeliveryResult::Delivered(1));
328
329 let received = rx.recv().await.unwrap();
330 assert_eq!(received.cmd, "ACTION");
331 }
332
333 #[tokio::test]
334 async fn test_send_to_offline_user() {
335 let manager = BroadcastManager::new();
336 let tn_id = TnId(1);
337
338 let msg = BroadcastMessage::new("ACTION", serde_json::json!({ "type": "MSG" }), "system");
339 let result = manager.send_to_user(tn_id, "bob", msg).await;
340
341 assert_eq!(result, DeliveryResult::UserOffline);
342 }
343
344 #[tokio::test]
345 async fn test_unregister_user() {
346 let manager = BroadcastManager::new();
347 let tn_id = TnId(1);
348
349 let _rx = manager.register_user(tn_id, "alice", "conn-1").await;
350 assert!(manager.is_user_online(tn_id, "alice").await);
351
352 manager.unregister_user(tn_id, "alice", "conn-1").await;
353 assert!(!manager.is_user_online(tn_id, "alice").await);
354 }
355
356 #[tokio::test]
357 async fn test_multi_tenant_isolation() {
358 let manager = BroadcastManager::new();
359 let tn1 = TnId(1);
360 let tn2 = TnId(2);
361
362 let _rx1 = manager.register_user(tn1, "alice", "conn-1").await;
363 let _rx2 = manager.register_user(tn2, "alice", "conn-2").await;
364
365 assert!(manager.is_user_online(tn1, "alice").await);
366 assert!(manager.is_user_online(tn2, "alice").await);
367
368 let msg = BroadcastMessage::new("test", serde_json::json!({}), "system");
369 let result = manager.send_to_user(tn1, "alice", msg).await;
370 assert_eq!(result, DeliveryResult::Delivered(1));
371
372 let stats = manager.user_stats().await;
373 assert_eq!(stats.online_users, 2);
374 }
375}
376
377