deribit_websocket/connection/
ws_connection.rs

1//! WebSocket connection management
2
3use crate::error::WebSocketError;
4use futures_util::{SinkExt, StreamExt};
5use tokio::net::TcpStream;
6use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
7use url::Url;
8
9/// WebSocket connection wrapper
10#[derive(Debug)]
11pub struct WebSocketConnection {
12    url: Url,
13    stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
14}
15
16impl WebSocketConnection {
17    /// Create a new WebSocket connection
18    pub fn new(url: Url) -> Self {
19        Self { url, stream: None }
20    }
21
22    /// Connect to the WebSocket server
23    pub async fn connect(&mut self) -> Result<(), WebSocketError> {
24        match connect_async(self.url.as_str()).await {
25            Ok((stream, _response)) => {
26                self.stream = Some(stream);
27                Ok(())
28            }
29            Err(e) => Err(WebSocketError::ConnectionFailed(format!(
30                "Failed to connect: {}",
31                e
32            ))),
33        }
34    }
35
36    /// Disconnect from the WebSocket server
37    pub async fn disconnect(&mut self) -> Result<(), WebSocketError> {
38        self.stream = None;
39        Ok(())
40    }
41
42    /// Check if connected
43    pub fn is_connected(&self) -> bool {
44        self.stream.is_some()
45    }
46
47    /// Send a message
48    pub async fn send(&mut self, message: String) -> Result<(), WebSocketError> {
49        if let Some(stream) = &mut self.stream {
50            match stream.send(Message::Text(message.into())).await {
51                Ok(()) => Ok(()),
52                Err(e) => {
53                    self.stream = None;
54                    Err(WebSocketError::ConnectionFailed(format!(
55                        "Failed to send message: {}",
56                        e
57                    )))
58                }
59            }
60        } else {
61            Err(WebSocketError::ConnectionClosed)
62        }
63    }
64
65    /// Receive a message
66    pub async fn receive(&mut self) -> Result<String, WebSocketError> {
67        if let Some(stream) = &mut self.stream {
68            match stream.next().await {
69                Some(Ok(Message::Text(text))) => Ok(text.to_string()),
70                Some(Ok(Message::Close(_))) => {
71                    self.stream = None;
72                    Err(WebSocketError::ConnectionClosed)
73                }
74                Some(Ok(_)) => {
75                    // Skip non-text messages (binary, ping, pong) - try again
76                    Box::pin(self.receive()).await
77                }
78                Some(Err(e)) => {
79                    self.stream = None;
80                    Err(WebSocketError::ConnectionFailed(format!(
81                        "Failed to receive message: {}",
82                        e
83                    )))
84                }
85                None => {
86                    self.stream = None;
87                    Err(WebSocketError::ConnectionClosed)
88                }
89            }
90        } else {
91            Err(WebSocketError::ConnectionClosed)
92        }
93    }
94
95    /// Get the connection URL
96    pub fn url(&self) -> &Url {
97        &self.url
98    }
99}