websoc_kit/
client.rs

1use futures_util::{SinkExt, StreamExt, stream::SplitSink, stream::SplitStream};
2use std::sync::Arc;
3use tokio::net::TcpStream;
4use tokio::sync::Mutex;
5use tokio_tungstenite::{
6    MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
7};
8use tracing::{error, info};
9use uuid::Uuid;
10
11use crate::error::{WebsocKitError, WebsocKitResult};
12
13#[expect(clippy::module_name_repetitions)]
14pub struct WebsocKitClient {
15    writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
16    reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
17}
18
19impl WebsocKitClient {
20    /// # Errors
21    ///
22    /// TODO
23    pub async fn new(url: &str) -> WebsocKitResult<Self> {
24        let (ws_stream, _) = connect_async(url).await?;
25        info!("WebSocket handshake has been successfully completed");
26
27        let (write, read) = ws_stream.split();
28
29        Ok(WebsocKitClient {
30            writer: Arc::new(Mutex::new(write)),
31            reader: Arc::new(Mutex::new(read)),
32        })
33    }
34
35    /// # Errors
36    ///
37    /// TODO
38    pub async fn send_message(&self, message: Vec<u8>) -> WebsocKitResult<()> {
39        let mut writer = self.writer.lock().await;
40        writer.send(Message::Binary(message)).await?;
41        Ok(())
42    }
43
44    /// # Errors
45    ///
46    /// TODO
47    pub async fn read_message(&self) -> WebsocKitResult<Option<Vec<u8>>> {
48        let mut reader = self.reader.lock().await;
49        match reader.next().await {
50            Some(Ok(message)) => {
51                match message {
52                    // valid
53                    Message::Binary(binary) => Ok(Some(binary)),
54                    Message::Close(close) => {
55                        close.map_or_else(
56                            || {
57                                info!("Received close frame.");
58                            },
59                            |close_frame| {
60                                info!("Received close frame: {close_frame:?}");
61                            },
62                        );
63                        Ok(None)
64                    }
65
66                    // invalid
67                    Message::Text(invalid_text_message) => {
68                        // terminate the connection for not sending binary
69                        Err(WebsocKitError::TextMessagesNotAllowed(
70                            Uuid::nil().into(), // ConnectionId only makes sense in the context of WebsocKitManager
71                            invalid_text_message,
72                        ))
73                    }
74                    Message::Ping(_ping) => {
75                        // NOP - handled by Axum
76                        Ok(None)
77                    }
78                    Message::Pong(_pong) => {
79                        // NOP - handled by Axum
80                        Ok(None)
81                    }
82                    Message::Frame(frame) => {
83                        error!("unexpected frame: {frame}");
84                        Ok(None)
85                    }
86                }
87            }
88            _ => Ok(None),
89        }
90    }
91}