Skip to main content

deribit_websocket/
client.rs

1//! WebSocket client implementation for Deribit
2
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::model::SubscriptionChannel;
7use crate::{
8    callback::MessageHandler,
9    config::WebSocketConfig,
10    connection::WebSocketConnection,
11    error::WebSocketError,
12    message::{
13        notification::NotificationHandler, request::RequestBuilder, response::ResponseHandler,
14    },
15    model::{
16        quote::*,
17        subscription::SubscriptionManager,
18        ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
19    },
20    session::WebSocketSession,
21};
22
23/// WebSocket client for Deribit
24#[derive(Debug)]
25pub struct DeribitWebSocketClient {
26    /// WebSocket configuration
27    pub config: Arc<WebSocketConfig>,
28    connection: Arc<Mutex<WebSocketConnection>>,
29    /// WebSocket session
30    pub session: Arc<WebSocketSession>,
31    request_builder: Arc<Mutex<RequestBuilder>>,
32    #[allow(dead_code)]
33    response_handler: Arc<ResponseHandler>,
34    #[allow(dead_code)]
35    notification_handler: Arc<NotificationHandler>,
36    subscription_manager: Arc<Mutex<SubscriptionManager>>,
37    #[allow(dead_code)]
38    message_sender: Option<mpsc::UnboundedSender<String>>,
39    #[allow(dead_code)]
40    message_receiver: Option<mpsc::UnboundedReceiver<String>>,
41    message_handler: Option<MessageHandler>,
42}
43
44impl DeribitWebSocketClient {
45    /// Create a new WebSocket client
46    pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
47        let connection = Arc::new(Mutex::new(WebSocketConnection::new(config.ws_url.clone())));
48        let session = Arc::new(WebSocketSession::new(config.clone()));
49        let (tx, rx) = mpsc::unbounded_channel();
50
51        let config = Arc::new(config.clone());
52        Ok(Self {
53            config,
54            connection,
55            session,
56            request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
57            response_handler: Arc::new(ResponseHandler::new()),
58            notification_handler: Arc::new(NotificationHandler::new()),
59            subscription_manager: Arc::new(Mutex::new(SubscriptionManager::new())),
60            message_sender: Some(tx),
61            message_receiver: Some(rx),
62            message_handler: None,
63        })
64    }
65
66    /// Create a new WebSocket client with default configuration
67    pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
68        let config = WebSocketConfig::with_url(&ws_url)
69            .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
70        Self::new(&config)
71    }
72
73    /// Create a new WebSocket client for testnet
74    pub fn new_testnet() -> Result<Self, WebSocketError> {
75        Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
76    }
77
78    /// Create a new WebSocket client for production
79    pub fn new_production() -> Result<Self, WebSocketError> {
80        Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
81    }
82
83    /// Connect to the WebSocket server
84    pub async fn connect(&self) -> Result<(), WebSocketError> {
85        let mut connection = self.connection.lock().await;
86        connection.connect().await
87    }
88
89    /// Disconnect from the WebSocket server
90    pub async fn disconnect(&self) -> Result<(), WebSocketError> {
91        let mut connection = self.connection.lock().await;
92        connection.disconnect().await
93    }
94
95    /// Check if connected
96    pub async fn is_connected(&self) -> bool {
97        let connection = self.connection.lock().await;
98        connection.is_connected()
99    }
100
101    /// Authenticate with the server
102    ///
103    /// Authenticates the connection using API credentials and returns authentication
104    /// details including access token and refresh token.
105    ///
106    /// # Arguments
107    ///
108    /// * `client_id` - API client ID
109    /// * `client_secret` - API client secret
110    ///
111    /// # Returns
112    ///
113    /// Returns `AuthResponse` containing access token, token type, expiration, and scope
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if authentication fails or credentials are invalid
118    pub async fn authenticate(
119        &self,
120        client_id: &str,
121        client_secret: &str,
122    ) -> Result<crate::model::AuthResponse, WebSocketError> {
123        let request = {
124            let mut builder = self.request_builder.lock().await;
125            builder.build_auth_request(client_id, client_secret)
126        };
127
128        let response = self.send_request(request).await?;
129
130        match response.result {
131            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
132                WebSocketError::InvalidMessage(format!(
133                    "Failed to parse authentication response: {}",
134                    e
135                ))
136            }),
137            JsonRpcResult::Error { error } => {
138                Err(WebSocketError::ApiError(error.code, error.message))
139            }
140        }
141    }
142
143    /// Subscribe to channels
144    pub async fn subscribe(
145        &self,
146        channels: Vec<String>,
147    ) -> Result<JsonRpcResponse, WebSocketError> {
148        let request = {
149            let mut builder = self.request_builder.lock().await;
150            builder.build_subscribe_request(channels.clone())
151        };
152
153        // Update subscription manager
154        let mut sub_manager = self.subscription_manager.lock().await;
155        for channel in channels {
156            let channel_type = self.parse_channel_type(&channel);
157            let instrument = self.extract_instrument(&channel);
158            sub_manager.add_subscription(channel, channel_type, instrument);
159        }
160
161        self.send_request(request).await
162    }
163
164    /// Unsubscribe from channels
165    pub async fn unsubscribe(
166        &self,
167        channels: Vec<String>,
168    ) -> Result<JsonRpcResponse, WebSocketError> {
169        let request = {
170            let mut builder = self.request_builder.lock().await;
171            builder.build_unsubscribe_request(channels.clone())
172        };
173
174        // Update subscription manager
175        let mut sub_manager = self.subscription_manager.lock().await;
176        for channel in channels {
177            sub_manager.remove_subscription(&channel);
178        }
179
180        self.send_request(request).await
181    }
182
183    /// Unsubscribe from all public channels
184    ///
185    /// Unsubscribes from all public channels subscribed so far and clears
186    /// the local subscription manager state.
187    ///
188    /// # Returns
189    ///
190    /// Returns `"ok"` on success
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the request fails or the response cannot be parsed
195    pub async fn public_unsubscribe_all(&self) -> Result<String, WebSocketError> {
196        let request = {
197            let mut builder = self.request_builder.lock().await;
198            builder.build_public_unsubscribe_all_request()
199        };
200
201        let response = self.send_request(request).await?;
202
203        // Clear subscription manager
204        let mut sub_manager = self.subscription_manager.lock().await;
205        sub_manager.clear();
206
207        match response.result {
208            JsonRpcResult::Success { result } => {
209                result.as_str().map(String::from).ok_or_else(|| {
210                    WebSocketError::InvalidMessage(
211                        "Expected string result from unsubscribe_all".to_string(),
212                    )
213                })
214            }
215            JsonRpcResult::Error { error } => {
216                Err(WebSocketError::ApiError(error.code, error.message))
217            }
218        }
219    }
220
221    /// Unsubscribe from all private channels
222    ///
223    /// Unsubscribes from all private channels subscribed so far and clears
224    /// the local subscription manager state. Requires authentication.
225    ///
226    /// # Returns
227    ///
228    /// Returns `"ok"` on success
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the request fails or the response cannot be parsed
233    pub async fn private_unsubscribe_all(&self) -> Result<String, WebSocketError> {
234        let request = {
235            let mut builder = self.request_builder.lock().await;
236            builder.build_private_unsubscribe_all_request()
237        };
238
239        let response = self.send_request(request).await?;
240
241        // Clear subscription manager
242        let mut sub_manager = self.subscription_manager.lock().await;
243        sub_manager.clear();
244
245        match response.result {
246            JsonRpcResult::Success { result } => {
247                result.as_str().map(String::from).ok_or_else(|| {
248                    WebSocketError::InvalidMessage(
249                        "Expected string result from unsubscribe_all".to_string(),
250                    )
251                })
252            }
253            JsonRpcResult::Error { error } => {
254                Err(WebSocketError::ApiError(error.code, error.message))
255            }
256        }
257    }
258
259    /// Send a JSON-RPC request
260    pub async fn send_request(
261        &self,
262        request: JsonRpcRequest,
263    ) -> Result<JsonRpcResponse, WebSocketError> {
264        let message = serde_json::to_string(&request).map_err(|e| {
265            WebSocketError::InvalidMessage(format!("Failed to serialize request: {}", e))
266        })?;
267
268        let mut connection = self.connection.lock().await;
269        connection.send(message).await?;
270
271        // Wait for response (simplified - in real implementation would match by ID)
272        let response_text = connection.receive().await?;
273
274        // Try to parse as JSON-RPC response first, then handle notifications
275        let response: JsonRpcResponse = match serde_json::from_str(&response_text) {
276            Ok(resp) => resp,
277            Err(e) => {
278                // Check if this might be a notification (missing id field)
279                if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&response_text)
280                    && json_val.get("method").is_some()
281                    && json_val.get("id").is_none()
282                {
283                    // This is a notification, create a synthetic response
284                    return Ok(JsonRpcResponse {
285                        jsonrpc: "2.0".to_string(),
286                        id: serde_json::Value::Null,
287                        result: crate::model::JsonRpcResult::Success { result: json_val },
288                    });
289                }
290                return Err(WebSocketError::InvalidMessage(format!(
291                    "Failed to parse response: {}",
292                    e
293                )));
294            }
295        };
296
297        Ok(response)
298    }
299
300    /// Send a raw message
301    pub async fn send_message(&self, message: String) -> Result<(), WebSocketError> {
302        let mut connection = self.connection.lock().await;
303        connection.send(message).await
304    }
305
306    /// Receive a message
307    pub async fn receive_message(&self) -> Result<String, WebSocketError> {
308        let mut connection = self.connection.lock().await;
309        connection.receive().await
310    }
311
312    /// Get active subscriptions
313    pub async fn get_subscriptions(&self) -> Vec<String> {
314        let sub_manager = self.subscription_manager.lock().await;
315        sub_manager.get_all_channels()
316    }
317
318    /// Test connection
319    ///
320    /// Tests the WebSocket connection and returns API version information.
321    ///
322    /// # Returns
323    ///
324    /// Returns `TestResponse` containing the API version string
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the connection test fails
329    pub async fn test_connection(&self) -> Result<crate::model::TestResponse, WebSocketError> {
330        let request = {
331            let mut builder = self.request_builder.lock().await;
332            builder.build_test_request()
333        };
334
335        let response = self.send_request(request).await?;
336
337        match response.result {
338            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
339                WebSocketError::InvalidMessage(format!("Failed to parse test response: {}", e))
340            }),
341            JsonRpcResult::Error { error } => {
342                Err(WebSocketError::ApiError(error.code, error.message))
343            }
344        }
345    }
346
347    /// Get server time
348    ///
349    /// Returns the current server timestamp in milliseconds since Unix epoch.
350    ///
351    /// # Returns
352    ///
353    /// Returns `u64` timestamp in milliseconds
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the request fails
358    pub async fn get_time(&self) -> Result<u64, WebSocketError> {
359        let request = {
360            let mut builder = self.request_builder.lock().await;
361            builder.build_get_time_request()
362        };
363
364        let response = self.send_request(request).await?;
365
366        match response.result {
367            JsonRpcResult::Success { result } => result.as_u64().ok_or_else(|| {
368                WebSocketError::InvalidMessage(
369                    "Expected u64 timestamp in get_time response".to_string(),
370                )
371            }),
372            JsonRpcResult::Error { error } => {
373                Err(WebSocketError::ApiError(error.code, error.message))
374            }
375        }
376    }
377
378    /// Enable heartbeat with specified interval
379    ///
380    /// The server will send a heartbeat message every `interval` seconds.
381    /// If heartbeat is enabled, the server will also send `test_request` notifications
382    /// which the client should respond to with `public/test` to keep the connection alive.
383    ///
384    /// # Arguments
385    ///
386    /// * `interval` - Heartbeat interval in seconds (10-3600)
387    ///
388    /// # Returns
389    ///
390    /// Returns `"ok"` on success
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the request fails or the interval is invalid
395    pub async fn set_heartbeat(&self, interval: u64) -> Result<String, WebSocketError> {
396        let request = {
397            let mut builder = self.request_builder.lock().await;
398            builder.build_set_heartbeat_request(interval)
399        };
400
401        let response = self.send_request(request).await?;
402
403        match response.result {
404            JsonRpcResult::Success { result } => {
405                result.as_str().map(String::from).ok_or_else(|| {
406                    WebSocketError::InvalidMessage(
407                        "Expected string result from set_heartbeat".to_string(),
408                    )
409                })
410            }
411            JsonRpcResult::Error { error } => {
412                Err(WebSocketError::ApiError(error.code, error.message))
413            }
414        }
415    }
416
417    /// Disable heartbeat
418    ///
419    /// Stops the server from sending heartbeat messages and `test_request` notifications.
420    ///
421    /// # Returns
422    ///
423    /// Returns `"ok"` on success
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the request fails
428    pub async fn disable_heartbeat(&self) -> Result<String, WebSocketError> {
429        let request = {
430            let mut builder = self.request_builder.lock().await;
431            builder.build_disable_heartbeat_request()
432        };
433
434        let response = self.send_request(request).await?;
435
436        match response.result {
437            JsonRpcResult::Success { result } => {
438                result.as_str().map(String::from).ok_or_else(|| {
439                    WebSocketError::InvalidMessage(
440                        "Expected string result from disable_heartbeat".to_string(),
441                    )
442                })
443            }
444            JsonRpcResult::Error { error } => {
445                Err(WebSocketError::ApiError(error.code, error.message))
446            }
447        }
448    }
449
450    /// Send client identification to the server
451    ///
452    /// This method identifies the client to the server with its name and version.
453    /// It's recommended to call this after connecting to provide debugging information.
454    ///
455    /// # Arguments
456    ///
457    /// * `client_name` - Name of the client application
458    /// * `client_version` - Version of the client application
459    ///
460    /// # Returns
461    ///
462    /// Returns `HelloResponse` containing the API version information
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the request fails
467    pub async fn hello(
468        &self,
469        client_name: &str,
470        client_version: &str,
471    ) -> Result<crate::model::HelloResponse, WebSocketError> {
472        let request = {
473            let mut builder = self.request_builder.lock().await;
474            builder.build_hello_request(client_name, client_version)
475        };
476
477        let response = self.send_request(request).await?;
478
479        match response.result {
480            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
481                WebSocketError::InvalidMessage(format!("Failed to parse hello response: {}", e))
482            }),
483            JsonRpcResult::Error { error } => {
484                Err(WebSocketError::ApiError(error.code, error.message))
485            }
486        }
487    }
488
489    /// Enable automatic order cancellation on disconnect
490    ///
491    /// When enabled, all open orders will be automatically cancelled if the WebSocket
492    /// connection is lost. This is a safety feature to prevent unintended order
493    /// execution when the client loses connectivity.
494    ///
495    /// # Returns
496    ///
497    /// Returns `"ok"` on success
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if the request fails or requires authentication
502    pub async fn enable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
503        let request = {
504            let mut builder = self.request_builder.lock().await;
505            builder.build_enable_cancel_on_disconnect_request()
506        };
507
508        let response = self.send_request(request).await?;
509
510        match response.result {
511            JsonRpcResult::Success { result } => {
512                result.as_str().map(String::from).ok_or_else(|| {
513                    WebSocketError::InvalidMessage(
514                        "Expected string result from enable_cancel_on_disconnect".to_string(),
515                    )
516                })
517            }
518            JsonRpcResult::Error { error } => {
519                Err(WebSocketError::ApiError(error.code, error.message))
520            }
521        }
522    }
523
524    /// Disable automatic order cancellation on disconnect
525    ///
526    /// When disabled, orders will remain active even if the WebSocket connection
527    /// is lost.
528    ///
529    /// # Returns
530    ///
531    /// Returns `"ok"` on success
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the request fails or requires authentication
536    pub async fn disable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
537        let request = {
538            let mut builder = self.request_builder.lock().await;
539            builder.build_disable_cancel_on_disconnect_request()
540        };
541
542        let response = self.send_request(request).await?;
543
544        match response.result {
545            JsonRpcResult::Success { result } => {
546                result.as_str().map(String::from).ok_or_else(|| {
547                    WebSocketError::InvalidMessage(
548                        "Expected string result from disable_cancel_on_disconnect".to_string(),
549                    )
550                })
551            }
552            JsonRpcResult::Error { error } => {
553                Err(WebSocketError::ApiError(error.code, error.message))
554            }
555        }
556    }
557
558    /// Get current cancel-on-disconnect status
559    ///
560    /// Returns whether automatic order cancellation on disconnect is currently enabled.
561    ///
562    /// # Returns
563    ///
564    /// Returns `true` if cancel-on-disconnect is enabled, `false` otherwise
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if the request fails or requires authentication
569    pub async fn get_cancel_on_disconnect(&self) -> Result<bool, WebSocketError> {
570        let request = {
571            let mut builder = self.request_builder.lock().await;
572            builder.build_get_cancel_on_disconnect_request()
573        };
574
575        let response = self.send_request(request).await?;
576
577        match response.result {
578            JsonRpcResult::Success { result } => {
579                // The result contains "enabled" field
580                result
581                    .get("enabled")
582                    .and_then(|v| v.as_bool())
583                    .ok_or_else(|| {
584                        WebSocketError::InvalidMessage(
585                            "Expected 'enabled' boolean in get_cancel_on_disconnect response"
586                                .to_string(),
587                        )
588                    })
589            }
590            JsonRpcResult::Error { error } => {
591                Err(WebSocketError::ApiError(error.code, error.message))
592            }
593        }
594    }
595
596    /// Place mass quotes
597    pub async fn mass_quote(
598        &self,
599        request: MassQuoteRequest,
600    ) -> Result<MassQuoteResult, WebSocketError> {
601        // Validate the request first
602        request.validate().map_err(WebSocketError::InvalidMessage)?;
603
604        let json_request = {
605            let mut builder = self.request_builder.lock().await;
606            builder.build_mass_quote_request(request)
607        };
608
609        let response = self.send_request(json_request).await?;
610
611        // Parse the response using WsResponse structure
612        match response.result {
613            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
614                WebSocketError::InvalidMessage(format!(
615                    "Failed to parse mass quote response: {}",
616                    e
617                ))
618            }),
619            JsonRpcResult::Error { error } => {
620                Err(WebSocketError::ApiError(error.code, error.message))
621            }
622        }
623    }
624
625    /// Cancel quotes
626    pub async fn cancel_quotes(
627        &self,
628        request: CancelQuotesRequest,
629    ) -> Result<CancelQuotesResponse, WebSocketError> {
630        let json_request = {
631            let mut builder = self.request_builder.lock().await;
632            builder.build_cancel_quotes_request(request)
633        };
634
635        let response = self.send_request(json_request).await?;
636
637        // Parse the response using JsonRpcResult structure
638        match response.result {
639            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
640                WebSocketError::InvalidMessage(format!(
641                    "Failed to parse cancel quotes response: {}",
642                    e
643                ))
644            }),
645            JsonRpcResult::Error { error } => {
646                Err(WebSocketError::ApiError(error.code, error.message))
647            }
648        }
649    }
650
651    /// Set MMP group configuration
652    pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
653        let json_request = {
654            let mut builder = self.request_builder.lock().await;
655            builder.build_set_mmp_config_request(config)
656        };
657
658        let response = self.send_request(json_request).await?;
659
660        match response.result {
661            JsonRpcResult::Success { .. } => Ok(()),
662            JsonRpcResult::Error { error } => {
663                Err(WebSocketError::ApiError(error.code, error.message))
664            }
665        }
666    }
667
668    /// Get MMP group configuration
669    pub async fn get_mmp_config(
670        &self,
671        mmp_group: Option<String>,
672    ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
673        let json_request = {
674            let mut builder = self.request_builder.lock().await;
675            builder.build_get_mmp_config_request(mmp_group)
676        };
677
678        let response = self.send_request(json_request).await?;
679
680        match response.result {
681            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
682                WebSocketError::InvalidMessage(format!(
683                    "Failed to parse MMP config response: {}",
684                    e
685                ))
686            }),
687            JsonRpcResult::Error { error } => {
688                Err(WebSocketError::ApiError(error.code, error.message))
689            }
690        }
691    }
692
693    /// Reset MMP group
694    pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
695        let json_request = {
696            let mut builder = self.request_builder.lock().await;
697            builder.build_reset_mmp_request(mmp_group)
698        };
699
700        let response = self.send_request(json_request).await?;
701
702        match response.result {
703            JsonRpcResult::Success { .. } => Ok(()),
704            JsonRpcResult::Error { error } => {
705                Err(WebSocketError::ApiError(error.code, error.message))
706            }
707        }
708    }
709
710    /// Get open orders (including quotes)
711    pub async fn get_open_orders(
712        &self,
713        currency: Option<String>,
714        kind: Option<String>,
715        type_filter: Option<String>,
716    ) -> Result<Vec<QuoteInfo>, WebSocketError> {
717        let json_request = {
718            let mut builder = self.request_builder.lock().await;
719            builder.build_get_open_orders_request(currency, kind, type_filter)
720        };
721
722        let response = self.send_request(json_request).await?;
723
724        match response.result {
725            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
726                WebSocketError::InvalidMessage(format!(
727                    "Failed to parse open orders response: {}",
728                    e
729                ))
730            }),
731            JsonRpcResult::Error { error } => {
732                Err(WebSocketError::ApiError(error.code, error.message))
733            }
734        }
735    }
736
737    /// Place a buy order
738    ///
739    /// # Arguments
740    ///
741    /// * `request` - The order request parameters
742    ///
743    /// # Returns
744    ///
745    /// Returns `OrderResponse` containing order info and any immediate trades
746    pub async fn buy(
747        &self,
748        request: crate::model::trading::OrderRequest,
749    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
750        let json_request = {
751            let mut builder = self.request_builder.lock().await;
752            builder.build_buy_request(&request)
753        };
754
755        let response = self.send_request(json_request).await?;
756
757        match response.result {
758            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
759                WebSocketError::InvalidMessage(format!("Failed to parse buy response: {}", e))
760            }),
761            JsonRpcResult::Error { error } => {
762                Err(WebSocketError::ApiError(error.code, error.message))
763            }
764        }
765    }
766
767    /// Place a sell order
768    ///
769    /// # Arguments
770    ///
771    /// * `request` - The order request parameters
772    ///
773    /// # Returns
774    ///
775    /// Returns `OrderResponse` containing order info and any immediate trades
776    pub async fn sell(
777        &self,
778        request: crate::model::trading::OrderRequest,
779    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
780        let json_request = {
781            let mut builder = self.request_builder.lock().await;
782            builder.build_sell_request(&request)
783        };
784
785        let response = self.send_request(json_request).await?;
786
787        match response.result {
788            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
789                WebSocketError::InvalidMessage(format!("Failed to parse sell response: {}", e))
790            }),
791            JsonRpcResult::Error { error } => {
792                Err(WebSocketError::ApiError(error.code, error.message))
793            }
794        }
795    }
796
797    /// Cancel an order by ID
798    ///
799    /// # Arguments
800    ///
801    /// * `order_id` - The order ID to cancel
802    ///
803    /// # Returns
804    ///
805    /// Returns `OrderInfo` for the cancelled order
806    pub async fn cancel(
807        &self,
808        order_id: &str,
809    ) -> Result<crate::model::trading::OrderInfo, WebSocketError> {
810        let json_request = {
811            let mut builder = self.request_builder.lock().await;
812            builder.build_cancel_request(order_id)
813        };
814
815        let response = self.send_request(json_request).await?;
816
817        match response.result {
818            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
819                WebSocketError::InvalidMessage(format!("Failed to parse cancel response: {}", e))
820            }),
821            JsonRpcResult::Error { error } => {
822                Err(WebSocketError::ApiError(error.code, error.message))
823            }
824        }
825    }
826
827    /// Cancel all orders
828    ///
829    /// # Returns
830    ///
831    /// Returns the number of orders cancelled
832    pub async fn cancel_all(&self) -> Result<u32, WebSocketError> {
833        let json_request = {
834            let mut builder = self.request_builder.lock().await;
835            builder.build_cancel_all_request()
836        };
837
838        let response = self.send_request(json_request).await?;
839
840        match response.result {
841            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
842                WebSocketError::InvalidMessage(format!(
843                    "Failed to parse cancel_all response: {}",
844                    e
845                ))
846            }),
847            JsonRpcResult::Error { error } => {
848                Err(WebSocketError::ApiError(error.code, error.message))
849            }
850        }
851    }
852
853    /// Cancel all orders by currency
854    ///
855    /// # Arguments
856    ///
857    /// * `currency` - Currency to cancel orders for (e.g., "BTC", "ETH")
858    ///
859    /// # Returns
860    ///
861    /// Returns the number of orders cancelled
862    pub async fn cancel_all_by_currency(&self, currency: &str) -> Result<u32, WebSocketError> {
863        let json_request = {
864            let mut builder = self.request_builder.lock().await;
865            builder.build_cancel_all_by_currency_request(currency)
866        };
867
868        let response = self.send_request(json_request).await?;
869
870        match response.result {
871            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
872                WebSocketError::InvalidMessage(format!(
873                    "Failed to parse cancel_all_by_currency response: {}",
874                    e
875                ))
876            }),
877            JsonRpcResult::Error { error } => {
878                Err(WebSocketError::ApiError(error.code, error.message))
879            }
880        }
881    }
882
883    /// Cancel all orders by instrument
884    ///
885    /// # Arguments
886    ///
887    /// * `instrument_name` - Instrument name to cancel orders for (e.g., "BTC-PERPETUAL")
888    ///
889    /// # Returns
890    ///
891    /// Returns the number of orders cancelled
892    pub async fn cancel_all_by_instrument(
893        &self,
894        instrument_name: &str,
895    ) -> Result<u32, WebSocketError> {
896        let json_request = {
897            let mut builder = self.request_builder.lock().await;
898            builder.build_cancel_all_by_instrument_request(instrument_name)
899        };
900
901        let response = self.send_request(json_request).await?;
902
903        match response.result {
904            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
905                WebSocketError::InvalidMessage(format!(
906                    "Failed to parse cancel_all_by_instrument response: {}",
907                    e
908                ))
909            }),
910            JsonRpcResult::Error { error } => {
911                Err(WebSocketError::ApiError(error.code, error.message))
912            }
913        }
914    }
915
916    /// Edit an existing order
917    ///
918    /// # Arguments
919    ///
920    /// * `request` - The edit order request parameters
921    ///
922    /// # Returns
923    ///
924    /// Returns `OrderResponse` containing updated order info and any trades
925    pub async fn edit(
926        &self,
927        request: crate::model::trading::EditOrderRequest,
928    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
929        let json_request = {
930            let mut builder = self.request_builder.lock().await;
931            builder.build_edit_request(&request)
932        };
933
934        let response = self.send_request(json_request).await?;
935
936        match response.result {
937            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
938                WebSocketError::InvalidMessage(format!("Failed to parse edit response: {}", e))
939            }),
940            JsonRpcResult::Error { error } => {
941                Err(WebSocketError::ApiError(error.code, error.message))
942            }
943        }
944    }
945
946    // Account methods
947
948    /// Get positions for the specified currency and kind
949    ///
950    /// Retrieves user positions filtered by currency and/or instrument kind.
951    ///
952    /// # Arguments
953    ///
954    /// * `currency` - Currency filter (BTC, ETH, USDC, etc.) - optional
955    /// * `kind` - Kind filter (future, option, spot, etc.) - optional
956    ///
957    /// # Returns
958    ///
959    /// A vector of positions matching the filter criteria
960    ///
961    /// # Errors
962    ///
963    /// Returns an error if the request fails or the response cannot be parsed
964    pub async fn get_positions(
965        &self,
966        currency: Option<&str>,
967        kind: Option<&str>,
968    ) -> Result<Vec<crate::model::Position>, WebSocketError> {
969        let json_request = {
970            let mut builder = self.request_builder.lock().await;
971            builder.build_get_positions_request(currency, kind)
972        };
973
974        let response = self.send_request(json_request).await?;
975
976        match response.result {
977            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
978                WebSocketError::InvalidMessage(format!("Failed to parse positions response: {}", e))
979            }),
980            JsonRpcResult::Error { error } => {
981                Err(WebSocketError::ApiError(error.code, error.message))
982            }
983        }
984    }
985
986    /// Get account summary for the specified currency
987    ///
988    /// Retrieves account summary information including balance, margin, and other account details.
989    ///
990    /// # Arguments
991    ///
992    /// * `currency` - Currency to get summary for (BTC, ETH, USDC, etc.)
993    /// * `extended` - Whether to include extended information
994    ///
995    /// # Returns
996    ///
997    /// Account summary for the specified currency
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the request fails or the response cannot be parsed
1002    pub async fn get_account_summary(
1003        &self,
1004        currency: &str,
1005        extended: Option<bool>,
1006    ) -> Result<crate::model::AccountSummary, WebSocketError> {
1007        let json_request = {
1008            let mut builder = self.request_builder.lock().await;
1009            builder.build_get_account_summary_request(currency, extended)
1010        };
1011
1012        let response = self.send_request(json_request).await?;
1013
1014        match response.result {
1015            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1016                WebSocketError::InvalidMessage(format!(
1017                    "Failed to parse account summary response: {}",
1018                    e
1019                ))
1020            }),
1021            JsonRpcResult::Error { error } => {
1022                Err(WebSocketError::ApiError(error.code, error.message))
1023            }
1024        }
1025    }
1026
1027    /// Get the state of an order
1028    ///
1029    /// Retrieves detailed information about a specific order.
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `order_id` - The order ID to get state for
1034    ///
1035    /// # Returns
1036    ///
1037    /// Order information for the specified order
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns an error if the request fails or the response cannot be parsed
1042    pub async fn get_order_state(
1043        &self,
1044        order_id: &str,
1045    ) -> Result<crate::model::OrderInfo, WebSocketError> {
1046        let json_request = {
1047            let mut builder = self.request_builder.lock().await;
1048            builder.build_get_order_state_request(order_id)
1049        };
1050
1051        let response = self.send_request(json_request).await?;
1052
1053        match response.result {
1054            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1055                WebSocketError::InvalidMessage(format!(
1056                    "Failed to parse order state response: {}",
1057                    e
1058                ))
1059            }),
1060            JsonRpcResult::Error { error } => {
1061                Err(WebSocketError::ApiError(error.code, error.message))
1062            }
1063        }
1064    }
1065
1066    /// Get order history by currency
1067    ///
1068    /// Retrieves historical orders for the specified currency.
1069    ///
1070    /// # Arguments
1071    ///
1072    /// * `currency` - Currency to get order history for
1073    /// * `kind` - Kind filter (future, option, spot, etc.) - optional
1074    /// * `count` - Number of items to return - optional
1075    ///
1076    /// # Returns
1077    ///
1078    /// A vector of historical orders matching the filter criteria
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the request fails or the response cannot be parsed
1083    pub async fn get_order_history_by_currency(
1084        &self,
1085        currency: &str,
1086        kind: Option<&str>,
1087        count: Option<u32>,
1088    ) -> Result<Vec<crate::model::OrderInfo>, WebSocketError> {
1089        let json_request = {
1090            let mut builder = self.request_builder.lock().await;
1091            builder.build_get_order_history_by_currency_request(currency, kind, count)
1092        };
1093
1094        let response = self.send_request(json_request).await?;
1095
1096        match response.result {
1097            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1098                WebSocketError::InvalidMessage(format!(
1099                    "Failed to parse order history response: {}",
1100                    e
1101                ))
1102            }),
1103            JsonRpcResult::Error { error } => {
1104                Err(WebSocketError::ApiError(error.code, error.message))
1105            }
1106        }
1107    }
1108
1109    // Position management methods
1110
1111    /// Close an existing position
1112    ///
1113    /// Places a reduce-only order to close an existing position.
1114    ///
1115    /// # Arguments
1116    ///
1117    /// * `instrument_name` - The instrument to close position for
1118    /// * `order_type` - Order type: "limit" or "market"
1119    /// * `price` - Price for limit orders (required if order_type is "limit")
1120    ///
1121    /// # Returns
1122    ///
1123    /// Response containing the order and any trades executed
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns an error if the request fails or the response cannot be parsed
1128    pub async fn close_position(
1129        &self,
1130        instrument_name: &str,
1131        order_type: &str,
1132        price: Option<f64>,
1133    ) -> Result<crate::model::ClosePositionResponse, WebSocketError> {
1134        let json_request = {
1135            let mut builder = self.request_builder.lock().await;
1136            builder.build_close_position_request(instrument_name, order_type, price)
1137        };
1138
1139        let response = self.send_request(json_request).await?;
1140
1141        match response.result {
1142            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1143                WebSocketError::InvalidMessage(format!(
1144                    "Failed to parse close position response: {}",
1145                    e
1146                ))
1147            }),
1148            JsonRpcResult::Error { error } => {
1149                Err(WebSocketError::ApiError(error.code, error.message))
1150            }
1151        }
1152    }
1153
1154    /// Move positions between subaccounts
1155    ///
1156    /// Transfers positions from one subaccount to another within the same main account.
1157    ///
1158    /// # Arguments
1159    ///
1160    /// * `currency` - Currency for the positions (BTC, ETH, etc.)
1161    /// * `source_uid` - Source subaccount ID
1162    /// * `target_uid` - Target subaccount ID
1163    /// * `trades` - List of positions to move
1164    ///
1165    /// # Returns
1166    ///
1167    /// A vector of results for each position moved
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the request fails or the response cannot be parsed
1172    pub async fn move_positions(
1173        &self,
1174        currency: &str,
1175        source_uid: u64,
1176        target_uid: u64,
1177        trades: &[crate::model::MovePositionTrade],
1178    ) -> Result<Vec<crate::model::MovePositionResult>, WebSocketError> {
1179        let json_request = {
1180            let mut builder = self.request_builder.lock().await;
1181            builder.build_move_positions_request(currency, source_uid, target_uid, trades)
1182        };
1183
1184        let response = self.send_request(json_request).await?;
1185
1186        match response.result {
1187            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1188                WebSocketError::InvalidMessage(format!(
1189                    "Failed to parse move positions response: {}",
1190                    e
1191                ))
1192            }),
1193            JsonRpcResult::Error { error } => {
1194                Err(WebSocketError::ApiError(error.code, error.message))
1195            }
1196        }
1197    }
1198
1199    /// Set message handler with callbacks
1200    /// The message_callback processes each incoming message and returns Result<(), Error>
1201    /// The error_callback is called only when message_callback returns an error
1202    pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
1203    where
1204        F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
1205        E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
1206    {
1207        self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
1208    }
1209
1210    /// Set message handler using builder pattern
1211    pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
1212        self.message_handler = Some(handler);
1213    }
1214
1215    /// Remove the current message handler
1216    pub fn clear_message_handler(&mut self) {
1217        self.message_handler = None;
1218    }
1219
1220    /// Check if message handler is set
1221    pub fn has_message_handler(&self) -> bool {
1222        self.message_handler.is_some()
1223    }
1224
1225    /// Receive and process a message using the registered callbacks
1226    /// This method will:
1227    /// 1. Receive a message from the WebSocket
1228    /// 2. Call the primary callback with the message
1229    /// 3. If primary callback returns error, call error callback with message and error
1230    pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
1231        let message = self.receive_message().await?;
1232
1233        if let Some(handler) = &self.message_handler {
1234            handler.handle_message(&message);
1235        }
1236
1237        Ok(())
1238    }
1239
1240    /// Start message processing loop with callbacks
1241    /// This will continuously receive messages and process them using the registered callbacks
1242    /// The loop will continue until an error occurs or the connection is closed
1243    pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
1244        if self.message_handler.is_none() {
1245            return Err(WebSocketError::InvalidMessage(
1246                "No message handler set. Use set_message_handler() first.".to_string(),
1247            ));
1248        }
1249
1250        loop {
1251            match self.receive_and_process_message().await {
1252                Ok(()) => {
1253                    // Message processed successfully, continue
1254                }
1255                Err(WebSocketError::ConnectionClosed) => {
1256                    // Connection closed, exit loop gracefully
1257                    break;
1258                }
1259                Err(e) => {
1260                    // Other error occurred, propagate it
1261                    return Err(e);
1262                }
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268
1269    // Helper methods
1270
1271    /// Parse a channel string into a `SubscriptionChannel` variant
1272    ///
1273    /// Uses `SubscriptionChannel::from_string()` to properly detect all channel types.
1274    /// Unknown channels are returned as `SubscriptionChannel::Unknown(String)`.
1275    fn parse_channel_type(&self, channel: &str) -> SubscriptionChannel {
1276        SubscriptionChannel::from_string(channel)
1277    }
1278
1279    fn extract_instrument(&self, channel: &str) -> Option<String> {
1280        let parts: Vec<&str> = channel.split('.').collect();
1281        match parts.as_slice() {
1282            ["ticker", instrument] | ["ticker", instrument, _] => Some(instrument.to_string()),
1283            ["book", instrument, ..] => Some(instrument.to_string()),
1284            ["trades", instrument, ..] => Some(instrument.to_string()),
1285            ["chart", "trades", instrument, _] => Some(instrument.to_string()),
1286            ["user", "changes", instrument, _] => Some(instrument.to_string()),
1287            ["estimated_expiration_price", instrument] => Some(instrument.to_string()),
1288            ["markprice", "options", instrument] => Some(instrument.to_string()),
1289            ["perpetual", instrument, _] => Some(instrument.to_string()),
1290            ["quote", instrument] => Some(instrument.to_string()),
1291            ["incremental_ticker", instrument] => Some(instrument.to_string()),
1292            ["deribit_price_index", index_name]
1293            | ["deribit_price_ranking", index_name]
1294            | ["deribit_price_statistics", index_name]
1295            | ["deribit_volatility_index", index_name] => Some(index_name.to_string()),
1296            ["instrument", "state", _kind, currency] => Some(currency.to_string()),
1297            ["block_rfq", "trades", currency] => Some(currency.to_string()),
1298            ["block_trade_confirmations", currency] => Some(currency.to_string()),
1299            ["user", "mmp_trigger", index_name] => Some(index_name.to_string()),
1300            ["platform_state"]
1301            | ["platform_state", "public_methods_state"]
1302            | ["block_trade_confirmations"]
1303            | ["user", "access_log"]
1304            | ["user", "lock"] => None,
1305            _ => None,
1306        }
1307    }
1308}
1309
1310impl Default for DeribitWebSocketClient {
1311    fn default() -> Self {
1312        let config = WebSocketConfig::default();
1313        Self::new(&config).unwrap()
1314    }
1315}