Skip to main content

cloudillo_core/
ws_broadcast.rs

1//! WebSocket User Messaging
2//!
3//! Manages direct user-to-user messaging via WebSocket connections.
4//! Supports multiple connections per user (multiple tabs/devices).
5
6use cloudillo_types::types::TnId;
7use cloudillo_types::utils::random_id;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{broadcast, RwLock};
12
13/// A message to send to a user
14#[derive(Clone, Debug)]
15pub struct BroadcastMessage {
16	pub id: String,
17	pub cmd: String,
18	pub data: Value,
19	pub sender: String,
20	pub timestamp: u64,
21}
22
23impl BroadcastMessage {
24	/// Create a new message
25	pub fn new(cmd: impl Into<String>, data: Value, sender: impl Into<String>) -> Self {
26		Self {
27			id: random_id().unwrap_or_default(),
28			cmd: cmd.into(),
29			data,
30			sender: sender.into(),
31			timestamp: now_timestamp(),
32		}
33	}
34}
35
36/// A user connection for direct messaging
37#[derive(Debug)]
38pub struct UserConnection {
39	/// User's id_tag
40	pub id_tag: Box<str>,
41	/// Tenant ID
42	pub tn_id: TnId,
43	/// Unique connection ID (UUID) - supports multiple tabs/devices
44	pub connection_id: Box<str>,
45	/// When this connection was established
46	pub connected_at: u64,
47	/// Sender for this connection
48	sender: broadcast::Sender<BroadcastMessage>,
49}
50
51/// Result of sending a message to a user
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum DeliveryResult {
54	/// Message delivered to N connections
55	Delivered(usize),
56	/// User is not connected (offline)
57	UserOffline,
58}
59
60/// User registry statistics
61#[derive(Debug, Clone)]
62pub struct UserRegistryStats {
63	/// Number of unique online users
64	pub online_users: usize,
65	/// Total number of connections (may be > users if multiple tabs)
66	pub total_connections: usize,
67	/// Users per tenant
68	pub users_per_tenant: HashMap<TnId, usize>,
69}
70
71/// Type alias for the user registry map: TnId -> id_tag -> Vec<UserConnection>
72type UserRegistryMap = HashMap<TnId, HashMap<Box<str>, Vec<UserConnection>>>;
73
74/// Configuration
75#[derive(Clone, Debug)]
76pub struct BroadcastConfig {
77	/// Maximum number of messages to buffer per connection
78	pub buffer_size: usize,
79}
80
81impl Default for BroadcastConfig {
82	fn default() -> Self {
83		// Enough for typical WebSocket message bursts without excessive memory
84		Self { buffer_size: 128 }
85	}
86}
87
88/// Manages direct user messaging via WebSocket
89pub struct BroadcastManager {
90	/// User registry for direct messaging
91	users: Arc<RwLock<UserRegistryMap>>,
92	config: BroadcastConfig,
93}
94
95impl BroadcastManager {
96	/// Create a new manager with default config
97	pub fn new() -> Self {
98		Self::with_config(BroadcastConfig::default())
99	}
100
101	/// Create with custom config
102	pub fn with_config(config: BroadcastConfig) -> Self {
103		Self { users: Arc::new(RwLock::new(HashMap::new())), config }
104	}
105
106	/// Register a user connection for direct messaging
107	///
108	/// Returns a receiver for messages targeted at this user.
109	/// The connection_id should be a unique identifier (UUID) for this specific
110	/// connection, allowing multiple connections per user (multiple tabs/devices).
111	pub async fn register_user(
112		&self,
113		tn_id: TnId,
114		id_tag: &str,
115		connection_id: &str,
116	) -> broadcast::Receiver<BroadcastMessage> {
117		let (sender, receiver) = broadcast::channel(self.config.buffer_size);
118
119		let connection = UserConnection {
120			id_tag: id_tag.into(),
121			tn_id,
122			connection_id: connection_id.into(),
123			connected_at: now_timestamp(),
124			sender,
125		};
126
127		let mut users = self.users.write().await;
128		users
129			.entry(tn_id)
130			.or_default()
131			.entry(id_tag.into())
132			.or_default()
133			.push(connection);
134
135		tracing::debug!(tn_id = ?tn_id, id_tag = %id_tag, connection_id = %connection_id, "User registered");
136		receiver
137	}
138
139	/// Unregister a user connection
140	///
141	/// Removes the specific connection identified by connection_id.
142	/// Other connections for the same user (other tabs) are preserved.
143	pub async fn unregister_user(&self, tn_id: TnId, id_tag: &str, connection_id: &str) {
144		let mut users = self.users.write().await;
145
146		if let Some(tenant_users) = users.get_mut(&tn_id) {
147			if let Some(connections) = tenant_users.get_mut(id_tag) {
148				connections.retain(|conn| conn.connection_id.as_ref() != connection_id);
149
150				// Clean up empty entries
151				if connections.is_empty() {
152					tenant_users.remove(id_tag);
153				}
154			}
155
156			// Clean up empty tenant entries
157			if tenant_users.is_empty() {
158				users.remove(&tn_id);
159			}
160		}
161
162		tracing::debug!(tn_id = ?tn_id, id_tag = %id_tag, connection_id = %connection_id, "User unregistered");
163	}
164
165	/// Send a message to a specific user
166	///
167	/// Delivers the message to all connections for the user (multiple tabs/devices).
168	/// Returns `DeliveryResult::Delivered(n)` with the number of connections that
169	/// received the message, or `DeliveryResult::UserOffline` if the user has no
170	/// active connections.
171	pub async fn send_to_user(
172		&self,
173		tn_id: TnId,
174		id_tag: &str,
175		msg: BroadcastMessage,
176	) -> DeliveryResult {
177		let users = self.users.read().await;
178
179		if let Some(tenant_users) = users.get(&tn_id) {
180			if let Some(connections) = tenant_users.get(id_tag) {
181				let mut delivered = 0;
182				for conn in connections {
183					if conn.sender.send(msg.clone()).is_ok() {
184						delivered += 1;
185					}
186				}
187				if delivered > 0 {
188					return DeliveryResult::Delivered(delivered);
189				}
190			}
191		}
192
193		DeliveryResult::UserOffline
194	}
195
196	/// Send a message to all users in a tenant
197	///
198	/// Broadcasts the message to all connections for all users in the tenant.
199	/// Returns the total number of connections that received the message.
200	pub async fn send_to_tenant(&self, tn_id: TnId, msg: BroadcastMessage) -> usize {
201		let users = self.users.read().await;
202
203		let mut delivered = 0;
204		if let Some(tenant_users) = users.get(&tn_id) {
205			for connections in tenant_users.values() {
206				for conn in connections {
207					if conn.sender.send(msg.clone()).is_ok() {
208						delivered += 1;
209					}
210				}
211			}
212		}
213		delivered
214	}
215
216	/// Check if a user is currently online (has at least one connection)
217	pub async fn is_user_online(&self, tn_id: TnId, id_tag: &str) -> bool {
218		let users = self.users.read().await;
219
220		users
221			.get(&tn_id)
222			.and_then(|tenant_users| tenant_users.get(id_tag))
223			.is_some_and(|connections| !connections.is_empty())
224	}
225
226	/// Get list of all online users for a tenant
227	pub async fn online_users(&self, tn_id: TnId) -> Vec<Box<str>> {
228		let users = self.users.read().await;
229
230		users
231			.get(&tn_id)
232			.map(|tenant_users| tenant_users.keys().cloned().collect())
233			.unwrap_or_default()
234	}
235
236	/// Get user registry statistics
237	pub async fn user_stats(&self) -> UserRegistryStats {
238		let users = self.users.read().await;
239
240		let mut online_users = 0;
241		let mut total_connections = 0;
242		let mut users_per_tenant = HashMap::new();
243
244		for (tn_id, tenant_users) in users.iter() {
245			let tenant_user_count = tenant_users.len();
246			online_users += tenant_user_count;
247			users_per_tenant.insert(*tn_id, tenant_user_count);
248
249			for connections in tenant_users.values() {
250				total_connections += connections.len();
251			}
252		}
253
254		UserRegistryStats { online_users, total_connections, users_per_tenant }
255	}
256
257	/// Cleanup disconnected users (users with no active receivers)
258	pub async fn cleanup_users(&self) {
259		let mut users = self.users.write().await;
260
261		for tenant_users in users.values_mut() {
262			for connections in tenant_users.values_mut() {
263				connections.retain(|conn| conn.sender.receiver_count() > 0);
264			}
265			tenant_users.retain(|_, connections| !connections.is_empty());
266		}
267		users.retain(|_, tenant_users| !tenant_users.is_empty());
268	}
269}
270
271impl Default for BroadcastManager {
272	fn default() -> Self {
273		Self::new()
274	}
275}
276
277/// Get current timestamp
278fn now_timestamp() -> u64 {
279	std::time::SystemTime::now()
280		.duration_since(std::time::UNIX_EPOCH)
281		.unwrap_or_default()
282		.as_secs()
283}
284
285#[cfg(test)]
286mod tests {
287	use super::*;
288
289	#[tokio::test]
290	async fn test_register_user() {
291		let manager = BroadcastManager::new();
292		let tn_id = TnId(1);
293
294		let _rx = manager.register_user(tn_id, "alice", "conn-1").await;
295
296		assert!(manager.is_user_online(tn_id, "alice").await);
297		assert!(!manager.is_user_online(tn_id, "bob").await);
298
299		let stats = manager.user_stats().await;
300		assert_eq!(stats.online_users, 1);
301		assert_eq!(stats.total_connections, 1);
302	}
303
304	#[tokio::test]
305	async fn test_multiple_connections_per_user() {
306		let manager = BroadcastManager::new();
307		let tn_id = TnId(1);
308
309		let _rx1 = manager.register_user(tn_id, "alice", "conn-1").await;
310		let _rx2 = manager.register_user(tn_id, "alice", "conn-2").await;
311
312		let stats = manager.user_stats().await;
313		assert_eq!(stats.online_users, 1);
314		assert_eq!(stats.total_connections, 2);
315	}
316
317	#[tokio::test]
318	async fn test_send_to_user() {
319		let manager = BroadcastManager::new();
320		let tn_id = TnId(1);
321
322		let mut rx = manager.register_user(tn_id, "alice", "conn-1").await;
323
324		let msg = BroadcastMessage::new("ACTION", serde_json::json!({ "type": "MSG" }), "system");
325		let result = manager.send_to_user(tn_id, "alice", msg).await;
326
327		assert_eq!(result, DeliveryResult::Delivered(1));
328
329		let received = rx.recv().await.unwrap();
330		assert_eq!(received.cmd, "ACTION");
331	}
332
333	#[tokio::test]
334	async fn test_send_to_offline_user() {
335		let manager = BroadcastManager::new();
336		let tn_id = TnId(1);
337
338		let msg = BroadcastMessage::new("ACTION", serde_json::json!({ "type": "MSG" }), "system");
339		let result = manager.send_to_user(tn_id, "bob", msg).await;
340
341		assert_eq!(result, DeliveryResult::UserOffline);
342	}
343
344	#[tokio::test]
345	async fn test_unregister_user() {
346		let manager = BroadcastManager::new();
347		let tn_id = TnId(1);
348
349		let _rx = manager.register_user(tn_id, "alice", "conn-1").await;
350		assert!(manager.is_user_online(tn_id, "alice").await);
351
352		manager.unregister_user(tn_id, "alice", "conn-1").await;
353		assert!(!manager.is_user_online(tn_id, "alice").await);
354	}
355
356	#[tokio::test]
357	async fn test_multi_tenant_isolation() {
358		let manager = BroadcastManager::new();
359		let tn1 = TnId(1);
360		let tn2 = TnId(2);
361
362		let _rx1 = manager.register_user(tn1, "alice", "conn-1").await;
363		let _rx2 = manager.register_user(tn2, "alice", "conn-2").await;
364
365		assert!(manager.is_user_online(tn1, "alice").await);
366		assert!(manager.is_user_online(tn2, "alice").await);
367
368		let msg = BroadcastMessage::new("test", serde_json::json!({}), "system");
369		let result = manager.send_to_user(tn1, "alice", msg).await;
370		assert_eq!(result, DeliveryResult::Delivered(1));
371
372		let stats = manager.user_stats().await;
373		assert_eq!(stats.online_users, 2);
374	}
375}
376
377// vim: ts=4