Skip to main content

cloudillo_core/
ws_bus.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! WebSocket Bus Handler - Direct user messaging
5//!
6//! The bus protocol (`/ws/bus`) provides direct user-to-user messaging.
7//!
8//! Message Format:
9//! ```json
10//! {
11//!   "id": "msg-123",
12//!   "cmd": "ACTION|presence|typing|...",
13//!   "data": { ... }
14//! }
15//! ```
16
17use crate::prelude::*;
18use axum::extract::ws::{Message, WebSocket};
19use cloudillo_types::utils::random_id;
20use futures::sink::SinkExt;
21use futures::stream::StreamExt;
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Value};
24use std::sync::Arc;
25
26/// A message in the bus protocol
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BusMessage {
29	/// Unique message ID
30	pub id: String,
31
32	/// Command type (ACTION, presence, typing, etc.)
33	pub cmd: String,
34
35	/// Command data
36	pub data: Value,
37}
38
39impl BusMessage {
40	/// Create a new bus message
41	pub fn new(cmd: impl Into<String>, data: Value) -> Self {
42		Self { id: random_id().unwrap_or_default(), cmd: cmd.into(), data }
43	}
44
45	/// Create an ack response
46	pub fn ack(id: String, status: &str) -> Self {
47		Self { id, cmd: "ack".to_string(), data: json!({ "status": status }) }
48	}
49
50	/// Serialize to JSON and wrap in WebSocket message
51	pub fn to_ws_message(&self) -> Result<Message, serde_json::Error> {
52		let json = serde_json::to_string(self)?;
53		Ok(Message::Text(json.into()))
54	}
55
56	/// Parse from WebSocket message
57	pub fn from_ws_message(msg: &Message) -> Result<Option<Self>, Box<dyn std::error::Error>> {
58		match msg {
59			Message::Text(text) => {
60				let parsed = serde_json::from_str::<BusMessage>(text)?;
61				Ok(Some(parsed))
62			}
63			Message::Close(_) | Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => {
64				Ok(None)
65			}
66		}
67	}
68}
69
70/// Handle a bus connection
71pub async fn handle_bus_connection(
72	ws: WebSocket,
73	user_id: String,
74	tn_id: TnId,
75	app: crate::app::App,
76) {
77	let connection_id = random_id().unwrap_or_default();
78	info!("Bus connection: {} (tn_id={}, conn={})", user_id, tn_id.0, &connection_id[..8]);
79
80	// Register user for direct messaging
81	let user_rx = app.broadcast.register_user(tn_id, &user_id, &connection_id).await;
82	let user_rx = Arc::new(tokio::sync::Mutex::new(user_rx));
83
84	// Split WebSocket into sender and receiver
85	let (ws_tx, ws_rx) = ws.split();
86	let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
87
88	// Heartbeat task - sends ping frames to keep connection alive
89	let user_id_clone = user_id.clone();
90	let ws_tx_heartbeat = ws_tx.clone();
91	let heartbeat_task = tokio::spawn(async move {
92		let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
93		loop {
94			interval.tick().await;
95			debug!("Bus heartbeat: {}", user_id_clone);
96
97			let mut tx = ws_tx_heartbeat.lock().await;
98			if tx.send(Message::Ping(vec![].into())).await.is_err() {
99				debug!("Client disconnected during heartbeat");
100				return;
101			}
102		}
103	});
104
105	// WebSocket receive task - handles incoming messages
106	let user_id_clone = user_id.clone();
107	let ws_tx_clone = ws_tx.clone();
108	let ws_recv_task = tokio::spawn(async move {
109		let mut ws_rx = ws_rx;
110		while let Some(msg) = ws_rx.next().await {
111			match msg {
112				Ok(ws_msg) => {
113					let msg = match BusMessage::from_ws_message(&ws_msg) {
114						Ok(Some(m)) => m,
115						Ok(None) => continue,
116						Err(e) => {
117							warn!("Failed to parse bus message: {}", e);
118							continue;
119						}
120					};
121
122					// Handle command and send ack
123					let response = handle_bus_command(&user_id_clone, &msg).await;
124					if let Ok(ws_response) = response.to_ws_message() {
125						let mut tx = ws_tx_clone.lock().await;
126						if tx.send(ws_response).await.is_err() {
127							warn!("Failed to send bus response");
128							break;
129						}
130					}
131				}
132				Err(e) => {
133					warn!("Bus connection error: {}", e);
134					break;
135				}
136			}
137		}
138	});
139
140	// Message receive task - forwards messages to WebSocket
141	let ws_tx_clone = ws_tx.clone();
142	let message_task = tokio::spawn(async move {
143		let mut rx = user_rx.lock().await;
144		loop {
145			match rx.recv().await {
146				Ok(bcast_msg) => {
147					// Forward message directly to WebSocket
148					let response = BusMessage::new(bcast_msg.cmd, bcast_msg.data);
149
150					if let Ok(ws_response) = response.to_ws_message() {
151						let mut tx = ws_tx_clone.lock().await;
152						if tx.send(ws_response).await.is_err() {
153							debug!("Client disconnected while forwarding message");
154							return;
155						}
156					}
157				}
158				Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
159					warn!("Bus receiver lagged, skipped {} messages", n);
160				}
161				Err(tokio::sync::broadcast::error::RecvError::Closed) => {
162					debug!("User receiver channel closed");
163					return;
164				}
165			}
166		}
167	});
168
169	// Wait for any task to complete
170	tokio::select! {
171		_ = ws_recv_task => {
172			debug!("WebSocket receive task ended");
173		}
174		_ = message_task => {
175			debug!("Message task ended");
176		}
177	}
178
179	// Cleanup
180	app.broadcast.unregister_user(tn_id, &user_id, &connection_id).await;
181	heartbeat_task.abort();
182	info!("Bus connection closed: {} (conn={})", user_id, &connection_id[..8]);
183}
184
185/// Handle a bus command
186async fn handle_bus_command(user_id: &str, msg: &BusMessage) -> BusMessage {
187	if msg.cmd.as_str() == "ping" {
188		BusMessage::ack(msg.id.clone(), "pong")
189	} else {
190		debug!("Bus command from {}: {}", user_id, msg.cmd);
191		BusMessage::ack(msg.id.clone(), "ok")
192	}
193}
194
195// vim: ts=4