websoc_kit/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::{
    connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
};
use tracing::{error, info};
use uuid::Uuid;

use crate::error::{WebsocKitError, WebsocKitResult};

#[expect(clippy::module_name_repetitions)]
pub struct WebsocKitClient {
    writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
    reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
}

impl WebsocKitClient {
    /// # Errors
    ///
    /// TODO
    pub async fn new(url: &str) -> WebsocKitResult<Self> {
        let (ws_stream, _) = connect_async(url).await?;
        info!("WebSocket handshake has been successfully completed");

        let (write, read) = ws_stream.split();

        Ok(WebsocKitClient {
            writer: Arc::new(Mutex::new(write)),
            reader: Arc::new(Mutex::new(read)),
        })
    }

    /// # Errors
    ///
    /// TODO
    pub async fn send_message(&self, message: Vec<u8>) -> WebsocKitResult<()> {
        let mut writer = self.writer.lock().await;
        writer.send(Message::Binary(message)).await?;
        Ok(())
    }

    /// # Errors
    ///
    /// TODO
    pub async fn read_message(&self) -> WebsocKitResult<Option<Vec<u8>>> {
        let mut reader = self.reader.lock().await;
        if let Some(Ok(message)) = reader.next().await {
            match message {
                // valid
                Message::Binary(binary) => Ok(Some(binary)),
                Message::Close(close) => {
                    close.map_or_else(
                        || {
                            info!("Received close frame.");
                        },
                        |close_frame| {
                            info!("Received close frame: {close_frame:?}");
                        },
                    );
                    Ok(None)
                }

                // invalid
                Message::Text(invalid_text_message) => {
                    // terminate the connection for not sending binary
                    Err(WebsocKitError::TextMessagesNotAllowed(
                        Uuid::nil().into(), // ConnectionId only makes sense in the context of WebsocKitManager
                        invalid_text_message,
                    ))
                }
                Message::Ping(_ping) => {
                    // NOP - handled by Axum
                    Ok(None)
                }
                Message::Pong(_pong) => {
                    // NOP - handled by Axum
                    Ok(None)
                }
                Message::Frame(frame) => {
                    error!("unexpected frame: {frame}");
                    Ok(None)
                }
            }
        } else {
            Ok(None)
        }
    }
}