next_web_websocket/core/
handle_socket.rs1use std::{net::SocketAddr, sync::Arc};
2
3use axum::{
4 body::Bytes,
5 extract::ws::{Message, WebSocket},
6};
7use futures::{stream::StreamExt, SinkExt};
8use tracing::{debug, error, info};
9
10use super::{session::WebSocketSession, ws_context::WebSocketContext};
11
12pub async fn handle_socket(
14 mut socket: WebSocket,
15 ctx: Arc<WebSocketContext>,
16 remote_address: SocketAddr,
17 path: String,
18) {
19 debug!("Start processing WebSocket connections: {remote_address}, Path: {path}");
20
21 if socket
23 .send(Message::Ping(Bytes::from_static(&[1])))
24 .await
25 .is_err()
26 {
27 error!("Unable to send ping packet to client {remote_address}, The connection may have been disconnected");
28 return;
29 }
30
31 let handler = match ctx.get_handler(&path) {
33 Some(handler) => handler,
34 None => {
35 error!("Path not found {path} The corresponding processor's connection will be closed");
36 let _ = socket.close().await;
37 return;
38 }
39 };
40
41 let (msg_sender, msg_receiver) = flume::unbounded();
42 let session = WebSocketSession::new(msg_sender, remote_address);
43
44 let (mut stream_sender, mut stream_receiver) = socket.split();
47
48 if let Err(e) = handler.on_open(&session).await {
50 error!("Event on_open processing failed: {e}, Client: {remote_address}, Path: {path}");
51 return;
52 }
53
54 info!("WS Connection established: {remote_address}, Path: {path}");
55
56 tokio::spawn(async move {
58 while let Ok(msg) = msg_receiver.recv_async().await {
59 let close = if let Message::Close(_) = &msg {
60 true
61 } else {
62 false
63 };
64 if let Err(e) = stream_sender.send(msg).await {
65 error!("Sending message to client failed: {e}, Client: {remote_address}");
66 break;
67 } else {
68 if close {
69 break;
70 }
71 }
72 }
73 });
74
75 while let Some(result) = stream_receiver.next().await {
77 match result {
78 Ok(msg) => {
79 if let Err(e) = handler.on_message(&session, msg).await {
80 error!("Failed to process client message: {e}, Client: {remote_address}");
81 if let Err(e) = handler.on_error(&session, e).await {
82 error!("Failed to handle error event: {e}, Client: {remote_address}");
83 }
84 break;
85 }
86 }
87
88 Err(e) => {
89 error!("Failed to receive client message: {e}, Client: {remote_address}");
90 let boxed_err = Box::new(e);
91 let _ = handler.on_error(&session, boxed_err).await;
92 break;
93 }
94 }
95 }
96
97 if let Err(e) = handler.on_close(&session, None).await {
99 error!("Failed to handle connection closure event: {e}, Client: {remote_address}");
100 }
101}