next_web_websocket/core/
handle_socket.rs

1use std::{net::SocketAddr, sync::Arc};
2
3use axum::{
4    body::Bytes,
5    extract::ws::{Message, WebSocket},
6};
7use futures::{stream::StreamExt, SinkExt};
8use tracing::{debug, error, info};
9
10use super::{session::WebSocketSession, ws_context::WebSocketContext};
11
12/// Actual websocket statemachine (one will be spawned per connection)
13pub async fn handle_socket(
14    mut socket: WebSocket,
15    ctx: Arc<WebSocketContext>,
16    remote_address: SocketAddr,
17    path: String,
18) {
19    debug!("Start processing WebSocket connections: {remote_address}, Path: {path}");
20
21    // send a ping (unsupported by some browsers) just to kick things off and get a response
22    if socket
23        .send(Message::Ping(Bytes::from_static(&[1])))
24        .await
25        .is_err()
26    {
27        error!("Unable to send ping packet to client {remote_address}, The connection may have been disconnected");
28        return;
29    }
30
31    // match the corresponding processor through path matching
32    let handler = match ctx.get_handler(&path) {
33        Some(handler) => handler,
34        None => {
35            error!("Path not found {path} The corresponding processor's connection will be closed");
36            let _ = socket.close().await;
37            return;
38        }
39    };
40
41    let (msg_sender, msg_receiver) = flume::unbounded();
42    let session = WebSocketSession::new(msg_sender, remote_address);
43
44    // By splitting socket we can send and receive at the same time. In this example we will send
45    // unsolicited messages to client based on some sort of server's internal event (i.e .timer).
46    let (mut stream_sender, mut stream_receiver) = socket.split();
47
48    // on_open handle
49    if let Err(e) = handler.on_open(&session).await {
50        error!("Event on_open processing failed: {e}, Client: {remote_address}, Path: {path}");
51        return;
52    }
53
54    info!("WS Connection established: {remote_address}, Path: {path}");
55
56    // send message to client
57    tokio::spawn(async move {
58        while let Ok(msg) = msg_receiver.recv_async().await {
59            let close = if let Message::Close(_) = &msg {
60                true
61            } else {
62                false
63            };
64            if let Err(e) = stream_sender.send(msg).await {
65                error!("Sending message to client failed: {e}, Client: {remote_address}");
66                break;
67            } else {
68                if close {
69                    break;
70                }
71            }
72        }
73    });
74
75    // receive client messages
76    while let Some(result) = stream_receiver.next().await {
77        match result {
78            Ok(msg) => {
79                if let Err(e) = handler.on_message(&session, msg).await {
80                    error!("Failed to process client message: {e}, Client: {remote_address}");
81                    if let Err(e) = handler.on_error(&session, e).await {
82                        error!("Failed to handle error event: {e}, Client: {remote_address}");
83                    }
84                    break;
85                }
86            }
87
88            Err(e) => {
89                error!("Failed to receive client message: {e}, Client: {remote_address}");
90                let boxed_err = Box::new(e);
91                let _ = handler.on_error(&session, boxed_err).await;
92                break;
93            }
94        }
95    }
96
97    // Connection closed
98    if let Err(e) = handler.on_close(&session, None).await {
99        error!("Failed to handle connection closure event: {e}, Client: {remote_address}");
100    }
101}