deribit_websocket/
client.rs1use async_trait::async_trait;
4use deribit_base::{DeribitClient, DeribitConfig, DeribitError, DeribitResult, DeribitUrls};
5use futures_util::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::net::TcpStream;
8use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
9
10type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
11
12#[allow(dead_code)]
14pub struct DeribitWebSocketClient {
15 config: DeribitConfig,
16 ws_url: String,
17 ws_stream: Option<WsStream>,
18 connected: bool,
19}
20
21impl DeribitWebSocketClient {
22 pub fn new(config: DeribitConfig) -> Self {
24 let ws_url = DeribitUrls::get_ws_url(config.test_net).to_string();
25
26 Self {
27 config,
28 ws_url,
29 ws_stream: None,
30 connected: false,
31 }
32 }
33
34 pub async fn send_message(&mut self, message: Value) -> DeribitResult<()> {
36 if let Some(ws_stream) = &mut self.ws_stream {
37 let text = serde_json::to_string(&message).map_err(|e| {
38 DeribitError::Serialization(format!("Failed to serialize message: {e}"))
39 })?;
40 let msg = Message::Text(text.into());
41
42 ws_stream
43 .send(msg)
44 .await
45 .map_err(|e| DeribitError::Connection(format!("Failed to send message: {e}")))?;
46
47 Ok(())
48 } else {
49 Err(DeribitError::Connection("Not connected".to_string()))
50 }
51 }
52
53 pub async fn receive_message(&mut self) -> DeribitResult<Option<Value>> {
55 if let Some(ws_stream) = &mut self.ws_stream {
56 match ws_stream.next().await {
57 Some(Ok(Message::Text(text))) => {
58 let value: Value = serde_json::from_str(&text).map_err(|e| {
59 DeribitError::Serialization(format!("Failed to parse message: {e}"))
60 })?;
61 Ok(Some(value))
62 }
63 Some(Ok(Message::Close(_))) => {
64 self.connected = false;
65 Ok(None)
66 }
67 Some(Err(e)) => Err(DeribitError::Connection(format!("WebSocket error: {e}"))),
68 None => {
69 self.connected = false;
70 Ok(None)
71 }
72 _ => Ok(None), }
74 } else {
75 Err(DeribitError::Connection("Not connected".to_string()))
76 }
77 }
78}
79
80#[async_trait]
81impl DeribitClient for DeribitWebSocketClient {
82 type Error = DeribitError;
83
84 async fn connect(&mut self) -> Result<(), Self::Error> {
85 let (ws_stream, _) = connect_async(&self.ws_url).await.map_err(|e| {
86 DeribitError::Connection(format!("Failed to connect to WebSocket: {e}"))
87 })?;
88
89 self.ws_stream = Some(ws_stream);
90 self.connected = true;
91 Ok(())
92 }
93
94 async fn disconnect(&mut self) -> Result<(), Self::Error> {
95 if let Some(mut ws_stream) = self.ws_stream.take() {
96 let _ = ws_stream.close(None).await;
97 }
98 self.connected = false;
99 Ok(())
100 }
101
102 fn is_connected(&self) -> bool {
103 self.connected
104 }
105}