deribit_websocket/
client.rs

1//! WebSocket client implementation for Deribit
2
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::model::SubscriptionChannel;
7use crate::{
8    callback::MessageHandler,
9    config::WebSocketConfig,
10    connection::WebSocketConnection,
11    error::WebSocketError,
12    message::{
13        notification::NotificationHandler, request::RequestBuilder, response::ResponseHandler,
14    },
15    model::{
16        quote::*,
17        subscription::SubscriptionManager,
18        ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
19    },
20    session::WebSocketSession,
21};
22
23/// WebSocket client for Deribit
24#[derive(Debug)]
25pub struct DeribitWebSocketClient {
26    /// WebSocket configuration
27    pub config: Arc<WebSocketConfig>,
28    connection: Arc<Mutex<WebSocketConnection>>,
29    /// WebSocket session
30    pub session: Arc<WebSocketSession>,
31    request_builder: Arc<Mutex<RequestBuilder>>,
32    #[allow(dead_code)]
33    response_handler: Arc<ResponseHandler>,
34    #[allow(dead_code)]
35    notification_handler: Arc<NotificationHandler>,
36    subscription_manager: Arc<Mutex<SubscriptionManager>>,
37    #[allow(dead_code)]
38    message_sender: Option<mpsc::UnboundedSender<String>>,
39    #[allow(dead_code)]
40    message_receiver: Option<mpsc::UnboundedReceiver<String>>,
41    message_handler: Option<MessageHandler>,
42}
43
44impl DeribitWebSocketClient {
45    /// Create a new WebSocket client
46    pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
47        let connection = Arc::new(Mutex::new(WebSocketConnection::new(config.ws_url.clone())));
48        let session = Arc::new(WebSocketSession::new(config.clone()));
49        let (tx, rx) = mpsc::unbounded_channel();
50
51        let config = Arc::new(config.clone());
52        Ok(Self {
53            config,
54            connection,
55            session,
56            request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
57            response_handler: Arc::new(ResponseHandler::new()),
58            notification_handler: Arc::new(NotificationHandler::new()),
59            subscription_manager: Arc::new(Mutex::new(SubscriptionManager::new())),
60            message_sender: Some(tx),
61            message_receiver: Some(rx),
62            message_handler: None,
63        })
64    }
65
66    /// Create a new WebSocket client with default configuration
67    pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
68        let config = WebSocketConfig::with_url(&ws_url)
69            .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
70        Self::new(&config)
71    }
72
73    /// Create a new WebSocket client for testnet
74    pub fn new_testnet() -> Result<Self, WebSocketError> {
75        Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
76    }
77
78    /// Create a new WebSocket client for production
79    pub fn new_production() -> Result<Self, WebSocketError> {
80        Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
81    }
82
83    /// Connect to the WebSocket server
84    pub async fn connect(&self) -> Result<(), WebSocketError> {
85        let mut connection = self.connection.lock().await;
86        connection.connect().await
87    }
88
89    /// Disconnect from the WebSocket server
90    pub async fn disconnect(&self) -> Result<(), WebSocketError> {
91        let mut connection = self.connection.lock().await;
92        connection.disconnect().await
93    }
94
95    /// Check if connected
96    pub async fn is_connected(&self) -> bool {
97        let connection = self.connection.lock().await;
98        connection.is_connected()
99    }
100
101    /// Authenticate with the server
102    pub async fn authenticate(
103        &self,
104        client_id: &str,
105        client_secret: &str,
106    ) -> Result<JsonRpcResponse, WebSocketError> {
107        let request = {
108            let mut builder = self.request_builder.lock().await;
109            builder.build_auth_request(client_id, client_secret)
110        };
111
112        self.send_request(request).await
113    }
114
115    /// Subscribe to channels
116    pub async fn subscribe(
117        &self,
118        channels: Vec<String>,
119    ) -> Result<JsonRpcResponse, WebSocketError> {
120        let request = {
121            let mut builder = self.request_builder.lock().await;
122            builder.build_subscribe_request(channels.clone())
123        };
124
125        // Update subscription manager
126        let mut sub_manager = self.subscription_manager.lock().await;
127        for channel in channels {
128            let channel_type = self.parse_channel_type(&channel);
129            let instrument = self.extract_instrument(&channel);
130            sub_manager.add_subscription(channel, channel_type, instrument);
131        }
132
133        self.send_request(request).await
134    }
135
136    /// Unsubscribe from channels
137    pub async fn unsubscribe(
138        &self,
139        channels: Vec<String>,
140    ) -> Result<JsonRpcResponse, WebSocketError> {
141        let request = {
142            let mut builder = self.request_builder.lock().await;
143            builder.build_unsubscribe_request(channels.clone())
144        };
145
146        // Update subscription manager
147        let mut sub_manager = self.subscription_manager.lock().await;
148        for channel in channels {
149            sub_manager.remove_subscription(&channel);
150        }
151
152        self.send_request(request).await
153    }
154
155    /// Send a JSON-RPC request
156    pub async fn send_request(
157        &self,
158        request: JsonRpcRequest,
159    ) -> Result<JsonRpcResponse, WebSocketError> {
160        let message = serde_json::to_string(&request).map_err(|e| {
161            WebSocketError::InvalidMessage(format!("Failed to serialize request: {}", e))
162        })?;
163
164        let mut connection = self.connection.lock().await;
165        connection.send(message).await?;
166
167        // Wait for response (simplified - in real implementation would match by ID)
168        let response_text = connection.receive().await?;
169
170        // Try to parse as JSON-RPC response first, then handle notifications
171        let response: JsonRpcResponse = match serde_json::from_str(&response_text) {
172            Ok(resp) => resp,
173            Err(e) => {
174                // Check if this might be a notification (missing id field)
175                if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&response_text)
176                    && json_val.get("method").is_some()
177                    && json_val.get("id").is_none()
178                {
179                    // This is a notification, create a synthetic response
180                    return Ok(JsonRpcResponse {
181                        jsonrpc: "2.0".to_string(),
182                        id: serde_json::Value::Null,
183                        result: crate::model::JsonRpcResult::Success { result: json_val },
184                    });
185                }
186                return Err(WebSocketError::InvalidMessage(format!(
187                    "Failed to parse response: {}",
188                    e
189                )));
190            }
191        };
192
193        Ok(response)
194    }
195
196    /// Send a raw message
197    pub async fn send_message(&self, message: String) -> Result<(), WebSocketError> {
198        let mut connection = self.connection.lock().await;
199        connection.send(message).await
200    }
201
202    /// Receive a message
203    pub async fn receive_message(&self) -> Result<String, WebSocketError> {
204        let mut connection = self.connection.lock().await;
205        connection.receive().await
206    }
207
208    /// Get active subscriptions
209    pub async fn get_subscriptions(&self) -> Vec<String> {
210        let sub_manager = self.subscription_manager.lock().await;
211        sub_manager.get_all_channels()
212    }
213
214    /// Test connection
215    pub async fn test_connection(&self) -> Result<JsonRpcResponse, WebSocketError> {
216        let request = {
217            let mut builder = self.request_builder.lock().await;
218            builder.build_test_request()
219        };
220
221        self.send_request(request).await
222    }
223
224    /// Get server time
225    pub async fn get_time(&self) -> Result<JsonRpcResponse, WebSocketError> {
226        let request = {
227            let mut builder = self.request_builder.lock().await;
228            builder.build_get_time_request()
229        };
230
231        self.send_request(request).await
232    }
233
234    /// Place mass quotes
235    pub async fn mass_quote(
236        &self,
237        request: MassQuoteRequest,
238    ) -> Result<MassQuoteResult, WebSocketError> {
239        // Validate the request first
240        request.validate().map_err(WebSocketError::InvalidMessage)?;
241
242        let json_request = {
243            let mut builder = self.request_builder.lock().await;
244            builder.build_mass_quote_request(request)
245        };
246
247        let response = self.send_request(json_request).await?;
248
249        // Parse the response using WsResponse structure
250        match response.result {
251            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
252                WebSocketError::InvalidMessage(format!(
253                    "Failed to parse mass quote response: {}",
254                    e
255                ))
256            }),
257            JsonRpcResult::Error { error } => {
258                Err(WebSocketError::ApiError(error.code, error.message))
259            }
260        }
261    }
262
263    /// Cancel quotes
264    pub async fn cancel_quotes(
265        &self,
266        request: CancelQuotesRequest,
267    ) -> Result<CancelQuotesResponse, WebSocketError> {
268        let json_request = {
269            let mut builder = self.request_builder.lock().await;
270            builder.build_cancel_quotes_request(request)
271        };
272
273        let response = self.send_request(json_request).await?;
274
275        // Parse the response using JsonRpcResult structure
276        match response.result {
277            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
278                WebSocketError::InvalidMessage(format!(
279                    "Failed to parse cancel quotes response: {}",
280                    e
281                ))
282            }),
283            JsonRpcResult::Error { error } => {
284                Err(WebSocketError::ApiError(error.code, error.message))
285            }
286        }
287    }
288
289    /// Set MMP group configuration
290    pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
291        let json_request = {
292            let mut builder = self.request_builder.lock().await;
293            builder.build_set_mmp_config_request(config)
294        };
295
296        let response = self.send_request(json_request).await?;
297
298        match response.result {
299            JsonRpcResult::Success { .. } => Ok(()),
300            JsonRpcResult::Error { error } => {
301                Err(WebSocketError::ApiError(error.code, error.message))
302            }
303        }
304    }
305
306    /// Get MMP group configuration
307    pub async fn get_mmp_config(
308        &self,
309        mmp_group: Option<String>,
310    ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
311        let json_request = {
312            let mut builder = self.request_builder.lock().await;
313            builder.build_get_mmp_config_request(mmp_group)
314        };
315
316        let response = self.send_request(json_request).await?;
317
318        match response.result {
319            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
320                WebSocketError::InvalidMessage(format!(
321                    "Failed to parse MMP config response: {}",
322                    e
323                ))
324            }),
325            JsonRpcResult::Error { error } => {
326                Err(WebSocketError::ApiError(error.code, error.message))
327            }
328        }
329    }
330
331    /// Reset MMP group
332    pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
333        let json_request = {
334            let mut builder = self.request_builder.lock().await;
335            builder.build_reset_mmp_request(mmp_group)
336        };
337
338        let response = self.send_request(json_request).await?;
339
340        match response.result {
341            JsonRpcResult::Success { .. } => Ok(()),
342            JsonRpcResult::Error { error } => {
343                Err(WebSocketError::ApiError(error.code, error.message))
344            }
345        }
346    }
347
348    /// Get open orders (including quotes)
349    pub async fn get_open_orders(
350        &self,
351        currency: Option<String>,
352        kind: Option<String>,
353        type_filter: Option<String>,
354    ) -> Result<Vec<QuoteInfo>, WebSocketError> {
355        let json_request = {
356            let mut builder = self.request_builder.lock().await;
357            builder.build_get_open_orders_request(currency, kind, type_filter)
358        };
359
360        let response = self.send_request(json_request).await?;
361
362        match response.result {
363            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
364                WebSocketError::InvalidMessage(format!(
365                    "Failed to parse open orders response: {}",
366                    e
367                ))
368            }),
369            JsonRpcResult::Error { error } => {
370                Err(WebSocketError::ApiError(error.code, error.message))
371            }
372        }
373    }
374
375    /// Set message handler with callbacks
376    /// The message_callback processes each incoming message and returns Result<(), Error>
377    /// The error_callback is called only when message_callback returns an error
378    pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
379    where
380        F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
381        E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
382    {
383        self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
384    }
385
386    /// Set message handler using builder pattern
387    pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
388        self.message_handler = Some(handler);
389    }
390
391    /// Remove the current message handler
392    pub fn clear_message_handler(&mut self) {
393        self.message_handler = None;
394    }
395
396    /// Check if message handler is set
397    pub fn has_message_handler(&self) -> bool {
398        self.message_handler.is_some()
399    }
400
401    /// Receive and process a message using the registered callbacks
402    /// This method will:
403    /// 1. Receive a message from the WebSocket
404    /// 2. Call the primary callback with the message
405    /// 3. If primary callback returns error, call error callback with message and error
406    pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
407        let message = self.receive_message().await?;
408
409        if let Some(handler) = &self.message_handler {
410            handler.handle_message(&message);
411        }
412
413        Ok(())
414    }
415
416    /// Start message processing loop with callbacks
417    /// This will continuously receive messages and process them using the registered callbacks
418    /// The loop will continue until an error occurs or the connection is closed
419    pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
420        if self.message_handler.is_none() {
421            return Err(WebSocketError::InvalidMessage(
422                "No message handler set. Use set_message_handler() first.".to_string(),
423            ));
424        }
425
426        loop {
427            match self.receive_and_process_message().await {
428                Ok(()) => {
429                    // Message processed successfully, continue
430                }
431                Err(WebSocketError::ConnectionClosed) => {
432                    // Connection closed, exit loop gracefully
433                    break;
434                }
435                Err(e) => {
436                    // Other error occurred, propagate it
437                    return Err(e);
438                }
439            }
440        }
441
442        Ok(())
443    }
444
445    // Helper methods
446
447    fn parse_channel_type(&self, channel: &str) -> SubscriptionChannel {
448        if channel.starts_with("ticker") {
449            SubscriptionChannel::Ticker(self.extract_instrument(channel).unwrap_or_default())
450        } else if channel.starts_with("book") {
451            SubscriptionChannel::OrderBook(self.extract_instrument(channel).unwrap_or_default())
452        } else if channel.starts_with("trades") {
453            SubscriptionChannel::Trades(self.extract_instrument(channel).unwrap_or_default())
454        } else if channel == "user.orders" {
455            SubscriptionChannel::UserOrders
456        } else if channel == "user.trades" {
457            SubscriptionChannel::UserTrades
458        } else {
459            SubscriptionChannel::Ticker(String::new()) // Default fallback
460        }
461    }
462
463    fn extract_instrument(&self, channel: &str) -> Option<String> {
464        channel
465            .find('.')
466            .map(|dot_pos| channel[dot_pos + 1..].to_string())
467    }
468}
469
470impl Default for DeribitWebSocketClient {
471    fn default() -> Self {
472        let config = WebSocketConfig::default();
473        Self::new(&config).unwrap()
474    }
475}