1use crate::prelude::*;
18use axum::extract::ws::{Message, WebSocket};
19use cloudillo_types::utils::random_id;
20use futures::sink::SinkExt;
21use futures::stream::StreamExt;
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Value};
24use std::sync::Arc;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BusMessage {
29 pub id: String,
31
32 pub cmd: String,
34
35 pub data: Value,
37}
38
39impl BusMessage {
40 pub fn new(cmd: impl Into<String>, data: Value) -> Self {
42 Self { id: random_id().unwrap_or_default(), cmd: cmd.into(), data }
43 }
44
45 pub fn ack(id: String, status: &str) -> Self {
47 Self { id, cmd: "ack".to_string(), data: json!({ "status": status }) }
48 }
49
50 pub fn to_ws_message(&self) -> Result<Message, serde_json::Error> {
52 let json = serde_json::to_string(self)?;
53 Ok(Message::Text(json.into()))
54 }
55
56 pub fn from_ws_message(msg: &Message) -> Result<Option<Self>, Box<dyn std::error::Error>> {
58 match msg {
59 Message::Text(text) => {
60 let parsed = serde_json::from_str::<BusMessage>(text)?;
61 Ok(Some(parsed))
62 }
63 Message::Close(_) | Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => {
64 Ok(None)
65 }
66 }
67 }
68}
69
70pub async fn handle_bus_connection(
72 ws: WebSocket,
73 user_id: String,
74 tn_id: TnId,
75 app: crate::app::App,
76) {
77 let connection_id = random_id().unwrap_or_default();
78 info!("Bus connection: {} (tn_id={}, conn={})", user_id, tn_id.0, &connection_id[..8]);
79
80 let user_rx = app.broadcast.register_user(tn_id, &user_id, &connection_id).await;
82 let user_rx = Arc::new(tokio::sync::Mutex::new(user_rx));
83
84 let (ws_tx, ws_rx) = ws.split();
86 let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
87
88 let user_id_clone = user_id.clone();
90 let ws_tx_heartbeat = ws_tx.clone();
91 let heartbeat_task = tokio::spawn(async move {
92 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
93 loop {
94 interval.tick().await;
95 debug!("Bus heartbeat: {}", user_id_clone);
96
97 let mut tx = ws_tx_heartbeat.lock().await;
98 if tx.send(Message::Ping(vec![].into())).await.is_err() {
99 debug!("Client disconnected during heartbeat");
100 return;
101 }
102 }
103 });
104
105 let user_id_clone = user_id.clone();
107 let ws_tx_clone = ws_tx.clone();
108 let ws_recv_task = tokio::spawn(async move {
109 let mut ws_rx = ws_rx;
110 while let Some(msg) = ws_rx.next().await {
111 match msg {
112 Ok(ws_msg) => {
113 let msg = match BusMessage::from_ws_message(&ws_msg) {
114 Ok(Some(m)) => m,
115 Ok(None) => continue,
116 Err(e) => {
117 warn!("Failed to parse bus message: {}", e);
118 continue;
119 }
120 };
121
122 let response = handle_bus_command(&user_id_clone, &msg).await;
124 if let Ok(ws_response) = response.to_ws_message() {
125 let mut tx = ws_tx_clone.lock().await;
126 if tx.send(ws_response).await.is_err() {
127 warn!("Failed to send bus response");
128 break;
129 }
130 }
131 }
132 Err(e) => {
133 warn!("Bus connection error: {}", e);
134 break;
135 }
136 }
137 }
138 });
139
140 let ws_tx_clone = ws_tx.clone();
142 let message_task = tokio::spawn(async move {
143 let mut rx = user_rx.lock().await;
144 loop {
145 match rx.recv().await {
146 Ok(bcast_msg) => {
147 let response = BusMessage::new(bcast_msg.cmd, bcast_msg.data);
149
150 if let Ok(ws_response) = response.to_ws_message() {
151 let mut tx = ws_tx_clone.lock().await;
152 if tx.send(ws_response).await.is_err() {
153 debug!("Client disconnected while forwarding message");
154 return;
155 }
156 }
157 }
158 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
159 warn!("Bus receiver lagged, skipped {} messages", n);
160 }
161 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
162 debug!("User receiver channel closed");
163 return;
164 }
165 }
166 }
167 });
168
169 tokio::select! {
171 _ = ws_recv_task => {
172 debug!("WebSocket receive task ended");
173 }
174 _ = message_task => {
175 debug!("Message task ended");
176 }
177 }
178
179 app.broadcast.unregister_user(tn_id, &user_id, &connection_id).await;
181 heartbeat_task.abort();
182 info!("Bus connection closed: {} (conn={})", user_id, &connection_id[..8]);
183}
184
185async fn handle_bus_command(user_id: &str, msg: &BusMessage) -> BusMessage {
187 if msg.cmd.as_str() == "ping" {
188 BusMessage::ack(msg.id.clone(), "pong")
189 } else {
190 debug!("Bus command from {}: {}", user_id, msg.cmd);
191 BusMessage::ack(msg.id.clone(), "ok")
192 }
193}
194
195