1use futures_util::{SinkExt, StreamExt, stream::SplitSink, stream::SplitStream};
2use std::sync::Arc;
3use tokio::net::TcpStream;
4use tokio::sync::Mutex;
5use tokio_tungstenite::{
6 MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
7};
8use tracing::{error, info};
9use uuid::Uuid;
10
11use crate::error::{WebsocKitError, WebsocKitResult};
12
13#[expect(clippy::module_name_repetitions)]
14pub struct WebsocKitClient {
15 writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
16 reader: Arc<Mutex<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
17}
18
19impl WebsocKitClient {
20 pub async fn new(url: &str) -> WebsocKitResult<Self> {
24 let (ws_stream, _) = connect_async(url).await?;
25 info!("WebSocket handshake has been successfully completed");
26
27 let (write, read) = ws_stream.split();
28
29 Ok(WebsocKitClient {
30 writer: Arc::new(Mutex::new(write)),
31 reader: Arc::new(Mutex::new(read)),
32 })
33 }
34
35 pub async fn send_message(&self, message: Vec<u8>) -> WebsocKitResult<()> {
39 let mut writer = self.writer.lock().await;
40 writer.send(Message::Binary(message)).await?;
41 Ok(())
42 }
43
44 pub async fn read_message(&self) -> WebsocKitResult<Option<Vec<u8>>> {
48 let mut reader = self.reader.lock().await;
49 match reader.next().await {
50 Some(Ok(message)) => {
51 match message {
52 Message::Binary(binary) => Ok(Some(binary)),
54 Message::Close(close) => {
55 close.map_or_else(
56 || {
57 info!("Received close frame.");
58 },
59 |close_frame| {
60 info!("Received close frame: {close_frame:?}");
61 },
62 );
63 Ok(None)
64 }
65
66 Message::Text(invalid_text_message) => {
68 Err(WebsocKitError::TextMessagesNotAllowed(
70 Uuid::nil().into(), invalid_text_message,
72 ))
73 }
74 Message::Ping(_ping) => {
75 Ok(None)
77 }
78 Message::Pong(_pong) => {
79 Ok(None)
81 }
82 Message::Frame(frame) => {
83 error!("unexpected frame: {frame}");
84 Ok(None)
85 }
86 }
87 }
88 _ => Ok(None),
89 }
90 }
91}