1use crate::prelude::*;
15use axum::extract::ws::{Message, WebSocket};
16use cloudillo_types::utils::random_id;
17use futures::sink::SinkExt;
18use futures::stream::StreamExt;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use std::sync::Arc;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct BusMessage {
26 pub id: String,
28
29 pub cmd: String,
31
32 pub data: Value,
34}
35
36impl BusMessage {
37 pub fn new(cmd: impl Into<String>, data: Value) -> Self {
39 Self { id: random_id().unwrap_or_default(), cmd: cmd.into(), data }
40 }
41
42 pub fn ack(id: String, status: &str) -> Self {
44 Self { id, cmd: "ack".to_string(), data: json!({ "status": status }) }
45 }
46
47 pub fn to_ws_message(&self) -> Result<Message, serde_json::Error> {
49 let json = serde_json::to_string(self)?;
50 Ok(Message::Text(json.into()))
51 }
52
53 pub fn from_ws_message(msg: &Message) -> Result<Option<Self>, Box<dyn std::error::Error>> {
55 match msg {
56 Message::Text(text) => {
57 let parsed = serde_json::from_str::<BusMessage>(text)?;
58 Ok(Some(parsed))
59 }
60 Message::Close(_) => Ok(None),
61 Message::Ping(_) | Message::Pong(_) => Ok(None),
62 _ => Ok(None),
63 }
64 }
65}
66
67pub async fn handle_bus_connection(
69 ws: WebSocket,
70 user_id: String,
71 tn_id: TnId,
72 app: crate::app::App,
73) {
74 let connection_id = random_id().unwrap_or_default();
75 info!("Bus connection: {} (tn_id={}, conn={})", user_id, tn_id.0, &connection_id[..8]);
76
77 let user_rx = app.broadcast.register_user(tn_id, &user_id, &connection_id).await;
79 let user_rx = Arc::new(tokio::sync::Mutex::new(user_rx));
80
81 let (ws_tx, ws_rx) = ws.split();
83 let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
84
85 let user_id_clone = user_id.clone();
87 let ws_tx_heartbeat = ws_tx.clone();
88 let heartbeat_task = tokio::spawn(async move {
89 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
90 loop {
91 interval.tick().await;
92 debug!("Bus heartbeat: {}", user_id_clone);
93
94 let mut tx = ws_tx_heartbeat.lock().await;
95 if tx.send(Message::Ping(vec![].into())).await.is_err() {
96 debug!("Client disconnected during heartbeat");
97 return;
98 }
99 }
100 });
101
102 let user_id_clone = user_id.clone();
104 let ws_tx_clone = ws_tx.clone();
105 let ws_recv_task = tokio::spawn(async move {
106 let mut ws_rx = ws_rx;
107 while let Some(msg) = ws_rx.next().await {
108 match msg {
109 Ok(ws_msg) => {
110 let msg = match BusMessage::from_ws_message(&ws_msg) {
111 Ok(Some(m)) => m,
112 Ok(None) => continue,
113 Err(e) => {
114 warn!("Failed to parse bus message: {}", e);
115 continue;
116 }
117 };
118
119 let response = handle_bus_command(&user_id_clone, &msg).await;
121 if let Ok(ws_response) = response.to_ws_message() {
122 let mut tx = ws_tx_clone.lock().await;
123 if tx.send(ws_response).await.is_err() {
124 warn!("Failed to send bus response");
125 break;
126 }
127 }
128 }
129 Err(e) => {
130 warn!("Bus connection error: {}", e);
131 break;
132 }
133 }
134 }
135 });
136
137 let ws_tx_clone = ws_tx.clone();
139 let message_task = tokio::spawn(async move {
140 let mut rx = user_rx.lock().await;
141 loop {
142 match rx.recv().await {
143 Ok(bcast_msg) => {
144 let response = BusMessage::new(bcast_msg.cmd, bcast_msg.data);
146
147 if let Ok(ws_response) = response.to_ws_message() {
148 let mut tx = ws_tx_clone.lock().await;
149 if tx.send(ws_response).await.is_err() {
150 debug!("Client disconnected while forwarding message");
151 return;
152 }
153 }
154 }
155 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
156 warn!("Bus receiver lagged, skipped {} messages", n);
157 }
158 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
159 debug!("User receiver channel closed");
160 return;
161 }
162 }
163 }
164 });
165
166 tokio::select! {
168 _ = ws_recv_task => {
169 debug!("WebSocket receive task ended");
170 }
171 _ = message_task => {
172 debug!("Message task ended");
173 }
174 }
175
176 app.broadcast.unregister_user(tn_id, &user_id, &connection_id).await;
178 heartbeat_task.abort();
179 info!("Bus connection closed: {} (conn={})", user_id, &connection_id[..8]);
180}
181
182async fn handle_bus_command(user_id: &str, msg: &BusMessage) -> BusMessage {
184 match msg.cmd.as_str() {
185 "ping" => BusMessage::ack(msg.id.clone(), "pong"),
186 _ => {
187 debug!("Bus command from {}: {}", user_id, msg.cmd);
188 BusMessage::ack(msg.id.clone(), "ok")
189 }
190 }
191}
192
193