Skip to main content

cloudillo_core/
ws_bus.rs

1//! WebSocket Bus Handler - Direct user messaging
2//!
3//! The bus protocol (`/ws/bus`) provides direct user-to-user messaging.
4//!
5//! Message Format:
6//! ```json
7//! {
8//!   "id": "msg-123",
9//!   "cmd": "ACTION|presence|typing|...",
10//!   "data": { ... }
11//! }
12//! ```
13
14use crate::prelude::*;
15use axum::extract::ws::{Message, WebSocket};
16use cloudillo_types::utils::random_id;
17use futures::sink::SinkExt;
18use futures::stream::StreamExt;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use std::sync::Arc;
22
23/// A message in the bus protocol
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct BusMessage {
26	/// Unique message ID
27	pub id: String,
28
29	/// Command type (ACTION, presence, typing, etc.)
30	pub cmd: String,
31
32	/// Command data
33	pub data: Value,
34}
35
36impl BusMessage {
37	/// Create a new bus message
38	pub fn new(cmd: impl Into<String>, data: Value) -> Self {
39		Self { id: random_id().unwrap_or_default(), cmd: cmd.into(), data }
40	}
41
42	/// Create an ack response
43	pub fn ack(id: String, status: &str) -> Self {
44		Self { id, cmd: "ack".to_string(), data: json!({ "status": status }) }
45	}
46
47	/// Serialize to JSON and wrap in WebSocket message
48	pub fn to_ws_message(&self) -> Result<Message, serde_json::Error> {
49		let json = serde_json::to_string(self)?;
50		Ok(Message::Text(json.into()))
51	}
52
53	/// Parse from WebSocket message
54	pub fn from_ws_message(msg: &Message) -> Result<Option<Self>, Box<dyn std::error::Error>> {
55		match msg {
56			Message::Text(text) => {
57				let parsed = serde_json::from_str::<BusMessage>(text)?;
58				Ok(Some(parsed))
59			}
60			Message::Close(_) => Ok(None),
61			Message::Ping(_) | Message::Pong(_) => Ok(None),
62			_ => Ok(None),
63		}
64	}
65}
66
67/// Handle a bus connection
68pub async fn handle_bus_connection(
69	ws: WebSocket,
70	user_id: String,
71	tn_id: TnId,
72	app: crate::app::App,
73) {
74	let connection_id = random_id().unwrap_or_default();
75	info!("Bus connection: {} (tn_id={}, conn={})", user_id, tn_id.0, &connection_id[..8]);
76
77	// Register user for direct messaging
78	let user_rx = app.broadcast.register_user(tn_id, &user_id, &connection_id).await;
79	let user_rx = Arc::new(tokio::sync::Mutex::new(user_rx));
80
81	// Split WebSocket into sender and receiver
82	let (ws_tx, ws_rx) = ws.split();
83	let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
84
85	// Heartbeat task - sends ping frames to keep connection alive
86	let user_id_clone = user_id.clone();
87	let ws_tx_heartbeat = ws_tx.clone();
88	let heartbeat_task = tokio::spawn(async move {
89		let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
90		loop {
91			interval.tick().await;
92			debug!("Bus heartbeat: {}", user_id_clone);
93
94			let mut tx = ws_tx_heartbeat.lock().await;
95			if tx.send(Message::Ping(vec![].into())).await.is_err() {
96				debug!("Client disconnected during heartbeat");
97				return;
98			}
99		}
100	});
101
102	// WebSocket receive task - handles incoming messages
103	let user_id_clone = user_id.clone();
104	let ws_tx_clone = ws_tx.clone();
105	let ws_recv_task = tokio::spawn(async move {
106		let mut ws_rx = ws_rx;
107		while let Some(msg) = ws_rx.next().await {
108			match msg {
109				Ok(ws_msg) => {
110					let msg = match BusMessage::from_ws_message(&ws_msg) {
111						Ok(Some(m)) => m,
112						Ok(None) => continue,
113						Err(e) => {
114							warn!("Failed to parse bus message: {}", e);
115							continue;
116						}
117					};
118
119					// Handle command and send ack
120					let response = handle_bus_command(&user_id_clone, &msg).await;
121					if let Ok(ws_response) = response.to_ws_message() {
122						let mut tx = ws_tx_clone.lock().await;
123						if tx.send(ws_response).await.is_err() {
124							warn!("Failed to send bus response");
125							break;
126						}
127					}
128				}
129				Err(e) => {
130					warn!("Bus connection error: {}", e);
131					break;
132				}
133			}
134		}
135	});
136
137	// Message receive task - forwards messages to WebSocket
138	let ws_tx_clone = ws_tx.clone();
139	let message_task = tokio::spawn(async move {
140		let mut rx = user_rx.lock().await;
141		loop {
142			match rx.recv().await {
143				Ok(bcast_msg) => {
144					// Forward message directly to WebSocket
145					let response = BusMessage::new(bcast_msg.cmd, bcast_msg.data);
146
147					if let Ok(ws_response) = response.to_ws_message() {
148						let mut tx = ws_tx_clone.lock().await;
149						if tx.send(ws_response).await.is_err() {
150							debug!("Client disconnected while forwarding message");
151							return;
152						}
153					}
154				}
155				Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
156					warn!("Bus receiver lagged, skipped {} messages", n);
157				}
158				Err(tokio::sync::broadcast::error::RecvError::Closed) => {
159					debug!("User receiver channel closed");
160					return;
161				}
162			}
163		}
164	});
165
166	// Wait for any task to complete
167	tokio::select! {
168		_ = ws_recv_task => {
169			debug!("WebSocket receive task ended");
170		}
171		_ = message_task => {
172			debug!("Message task ended");
173		}
174	}
175
176	// Cleanup
177	app.broadcast.unregister_user(tn_id, &user_id, &connection_id).await;
178	heartbeat_task.abort();
179	info!("Bus connection closed: {} (conn={})", user_id, &connection_id[..8]);
180}
181
182/// Handle a bus command
183async fn handle_bus_command(user_id: &str, msg: &BusMessage) -> BusMessage {
184	match msg.cmd.as_str() {
185		"ping" => BusMessage::ack(msg.id.clone(), "pong"),
186		_ => {
187			debug!("Bus command from {}: {}", user_id, msg.cmd);
188			BusMessage::ack(msg.id.clone(), "ok")
189		}
190	}
191}
192
193// vim: ts=4