deribit_websocket/
client.rs

1//! WebSocket client implementation for Deribit
2
3use async_trait::async_trait;
4use deribit_base::{DeribitClient, DeribitConfig, DeribitError, DeribitResult, DeribitUrls};
5use futures_util::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::net::TcpStream;
8use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
9
10type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
11
12/// WebSocket client for Deribit
13#[allow(dead_code)]
14pub struct DeribitWebSocketClient {
15    config: DeribitConfig,
16    ws_url: String,
17    ws_stream: Option<WsStream>,
18    connected: bool,
19}
20
21impl DeribitWebSocketClient {
22    /// Create a new WebSocket client
23    pub fn new(config: DeribitConfig) -> Self {
24        let ws_url = DeribitUrls::get_ws_url(config.test_net).to_string();
25
26        Self {
27            config,
28            ws_url,
29            ws_stream: None,
30            connected: false,
31        }
32    }
33
34    /// Send a message to the WebSocket
35    pub async fn send_message(&mut self, message: Value) -> DeribitResult<()> {
36        if let Some(ws_stream) = &mut self.ws_stream {
37            let text = serde_json::to_string(&message).map_err(|e| {
38                DeribitError::Serialization(format!("Failed to serialize message: {e}"))
39            })?;
40            let msg = Message::Text(text.into());
41
42            ws_stream
43                .send(msg)
44                .await
45                .map_err(|e| DeribitError::Connection(format!("Failed to send message: {e}")))?;
46
47            Ok(())
48        } else {
49            Err(DeribitError::Connection("Not connected".to_string()))
50        }
51    }
52
53    /// Receive a message from the WebSocket
54    pub async fn receive_message(&mut self) -> DeribitResult<Option<Value>> {
55        if let Some(ws_stream) = &mut self.ws_stream {
56            match ws_stream.next().await {
57                Some(Ok(Message::Text(text))) => {
58                    let value: Value = serde_json::from_str(&text).map_err(|e| {
59                        DeribitError::Serialization(format!("Failed to parse message: {e}"))
60                    })?;
61                    Ok(Some(value))
62                }
63                Some(Ok(Message::Close(_))) => {
64                    self.connected = false;
65                    Ok(None)
66                }
67                Some(Err(e)) => Err(DeribitError::Connection(format!("WebSocket error: {e}"))),
68                None => {
69                    self.connected = false;
70                    Ok(None)
71                }
72                _ => Ok(None), // Ignore other message types
73            }
74        } else {
75            Err(DeribitError::Connection("Not connected".to_string()))
76        }
77    }
78}
79
80#[async_trait]
81impl DeribitClient for DeribitWebSocketClient {
82    type Error = DeribitError;
83
84    async fn connect(&mut self) -> Result<(), Self::Error> {
85        let (ws_stream, _) = connect_async(&self.ws_url).await.map_err(|e| {
86            DeribitError::Connection(format!("Failed to connect to WebSocket: {e}"))
87        })?;
88
89        self.ws_stream = Some(ws_stream);
90        self.connected = true;
91        Ok(())
92    }
93
94    async fn disconnect(&mut self) -> Result<(), Self::Error> {
95        if let Some(mut ws_stream) = self.ws_stream.take() {
96            let _ = ws_stream.close(None).await;
97        }
98        self.connected = false;
99        Ok(())
100    }
101
102    fn is_connected(&self) -> bool {
103        self.connected
104    }
105}