Skip to main content

deribit_websocket/
client.rs

1//! WebSocket client implementation for Deribit.
2//!
3//! [`DeribitWebSocketClient`] is the public façade. It owns a shared,
4//! optional [`Dispatcher`] that runs the send/receive loop in a dedicated
5//! tokio task; request/response multiplexing and notification routing all
6//! happen inside that task.
7//!
8//! # Channel architecture
9//!
10//! The client–dispatcher split uses **two bounded `tokio::sync::mpsc`
11//! channels**, both using **Strategy A (await-send)**:
12//!
13//! 1. **Notification channel** (dispatcher → consumer). Carries every
14//!    server-pushed notification and every unmatched frame to the
15//!    consumer reading `next_notification` /
16//!    `start_message_processing_loop`. Depth is
17//!    [`WebSocketConfig::notification_channel_capacity`] (default 1024).
18//!    When full, the dispatcher task blocks on `send().await`, stops
19//!    polling the WebSocket stream, and the TCP recv buffer fills →
20//!    the Deribit server applies flow control. No frames are dropped
21//!    due to backpressure; if the receiver has been closed (for
22//!    example during shutdown or disconnect), the affected frames are
23//!    discarded. Every full-channel event emits a `tracing::warn!` so
24//!    slow consumers are visible in logs.
25//! 2. **Command channel** (client → dispatcher). Carries outbound
26//!    commands — request sends, cancel-request on timeout, shutdown —
27//!    from every client method to the dispatcher. Depth is
28//!    [`WebSocketConfig::dispatcher_command_capacity`]. When full, the
29//!    caller blocks; `request_timeout` on
30//!    [`DeribitWebSocketClient::send_request`] still applies, so the
31//!    caller surfaces [`WebSocketError::Timeout`] if the deadline
32//!    elapses while waiting on the channel.
33//!
34//! Both channels are bounded specifically so that a slow or stuck
35//! consumer can never cause unbounded memory growth in a long-running
36//! trading process. Strategy A (await-send) was chosen over drop-oldest
37//! / drop-newest variants because the notification stream carries
38//! private trading events (order updates, trade reports) where silent
39//! loss is unacceptable.
40
41use std::sync::Arc;
42use tokio::sync::Mutex;
43
44use crate::model::SubscriptionChannel;
45use crate::{
46    callback::MessageHandler,
47    config::WebSocketConfig,
48    connection::Dispatcher,
49    error::{WebSocketError, envelope::build_raw_error_response},
50    message::request::RequestBuilder,
51    model::{
52        quote::*,
53        subscription::SubscriptionManager,
54        ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
55    },
56    session::WebSocketSession,
57};
58
59/// WebSocket client for Deribit
60///
61/// Owns a shared, optional [`Dispatcher`] that runs the send/receive loop
62/// in a dedicated tokio task. All request/response multiplexing and
63/// notification routing happens inside that task; this façade only
64/// clones an `Arc<Dispatcher>` out of the slot and forwards calls to it.
65#[derive(Debug)]
66pub struct DeribitWebSocketClient {
67    /// WebSocket configuration
68    pub config: Arc<WebSocketConfig>,
69    /// Shared slot holding the live dispatcher, if any. The slot's mutex
70    /// is only held long enough to read/insert/remove the `Arc`, never
71    /// across a `send_request` await.
72    dispatcher: Arc<Mutex<Option<Arc<Dispatcher>>>>,
73    /// WebSocket session
74    pub session: Arc<WebSocketSession>,
75    request_builder: Arc<Mutex<RequestBuilder>>,
76    subscription_manager: Arc<Mutex<SubscriptionManager>>,
77    message_handler: Option<MessageHandler>,
78}
79
80impl DeribitWebSocketClient {
81    /// Create a new WebSocket client
82    ///
83    /// The caller's `&WebSocketConfig` is deep-copied once into an
84    /// `Arc<WebSocketConfig>` which is then shared with the inner
85    /// [`WebSocketSession`] via `Arc::clone` — no second struct clone,
86    /// no double allocation.
87    pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
88        let subscription_manager = Arc::new(Mutex::new(SubscriptionManager::new()));
89        // One unavoidable deep copy: we start from `&WebSocketConfig`
90        // and need an owned value to place inside the `Arc`. Every
91        // subsequent hand-off is a cheap `Arc::clone`.
92        let config = Arc::new(config.clone());
93        let session = Arc::new(WebSocketSession::new(
94            Arc::clone(&config),
95            Arc::clone(&subscription_manager),
96        ));
97
98        Ok(Self {
99            config,
100            dispatcher: Arc::new(Mutex::new(None)),
101            session,
102            request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
103            subscription_manager,
104            message_handler: None,
105        })
106    }
107
108    /// Returns a handle to the shared subscription manager. The same
109    /// handle is held by `self.session`, so all subscription state is
110    /// observable from either side.
111    #[must_use]
112    pub fn subscription_manager(&self) -> Arc<Mutex<SubscriptionManager>> {
113        Arc::clone(&self.subscription_manager)
114    }
115
116    /// Create a new WebSocket client with default configuration
117    pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
118        let config = WebSocketConfig::with_url(&ws_url)
119            .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
120        Self::new(&config)
121    }
122
123    /// Create a new WebSocket client for testnet
124    pub fn new_testnet() -> Result<Self, WebSocketError> {
125        Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
126    }
127
128    /// Create a new WebSocket client for production
129    pub fn new_production() -> Result<Self, WebSocketError> {
130        Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
131    }
132
133    /// Connect to the WebSocket server
134    ///
135    /// Spawns the dispatcher task that owns the WebSocket stream. If a
136    /// previous dispatcher is still installed, it is shut down first.
137    ///
138    /// The slot lock is held across the entire shutdown + connect_async +
139    /// install sequence so concurrent `connect()` calls are serialized.
140    /// Without this, two callers could each see an empty slot, each spawn
141    /// a dispatcher, and the loser's dispatcher task would leak. While a
142    /// connect is in flight, other client operations that touch the slot
143    /// (`send_request`, `disconnect`, `is_connected`) wait on the same
144    /// mutex — the desired semantics.
145    pub async fn connect(&self) -> Result<(), WebSocketError> {
146        let mut guard = self.dispatcher.lock().await;
147        if let Some(prev) = guard.take() {
148            let _ = prev.shutdown().await;
149        }
150        let dispatcher = Dispatcher::connect(
151            self.config.ws_url.clone(),
152            self.config.connection_timeout,
153            self.config.request_timeout,
154            self.config.notification_channel_capacity,
155            self.config.dispatcher_command_capacity,
156        )
157        .await?;
158        *guard = Some(Arc::new(dispatcher));
159        Ok(())
160    }
161
162    /// Disconnect from the WebSocket server
163    pub async fn disconnect(&self) -> Result<(), WebSocketError> {
164        // Take the Arc out under the lock so the lock is not held across
165        // the shutdown await.
166        let dispatcher = {
167            let mut guard = self.dispatcher.lock().await;
168            guard.take()
169        };
170        if let Some(dispatcher) = dispatcher {
171            dispatcher.shutdown().await?;
172        }
173        Ok(())
174    }
175
176    /// Check if connected (i.e., a dispatcher is currently installed).
177    pub async fn is_connected(&self) -> bool {
178        self.dispatcher.lock().await.is_some()
179    }
180
181    /// Authenticate with the server
182    ///
183    /// Authenticates the connection using API credentials and returns authentication
184    /// details including access token and refresh token.
185    ///
186    /// # Arguments
187    ///
188    /// * `client_id` - API client ID
189    /// * `client_secret` - API client secret
190    ///
191    /// # Returns
192    ///
193    /// Returns `AuthResponse` containing access token, token type, expiration, and scope
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if authentication fails or credentials are invalid
198    pub async fn authenticate(
199        &self,
200        client_id: &str,
201        client_secret: &str,
202    ) -> Result<crate::model::AuthResponse, WebSocketError> {
203        let request = {
204            let mut builder = self.request_builder.lock().await;
205            builder.build_auth_request(client_id, client_secret)
206        };
207
208        let request_ctx: &JsonRpcRequest = &request;
209        let response = self.send_request(&request).await?;
210
211        match response.result {
212            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
213                WebSocketError::InvalidMessage(format!(
214                    "Failed to parse authentication response: {}",
215                    e
216                ))
217            }),
218            JsonRpcResult::Error { error } => {
219                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
220                Err(WebSocketError::api_error_from_parts(
221                    request_ctx,
222                    error,
223                    Some(raw),
224                ))
225            }
226        }
227    }
228
229    /// Subscribe to channels.
230    ///
231    /// Local subscription state is reconciled against the server-confirmed
232    /// channel list carried by `response.result`, which may be a strict
233    /// subset of the requested channels when the server rejects individual
234    /// entries (unknown channel, permission denied, rate limit). Only the
235    /// channels the server actually acknowledged are added to the local
236    /// [`SubscriptionManager`]. Transport failures and API-error responses
237    /// leave the local view untouched so the caller can retry without
238    /// inconsistency.
239    ///
240    /// The response is parsed and validated outside the
241    /// `subscription_manager` mutex so the lock is only held for the
242    /// `HashMap` mutations themselves, keeping the critical section
243    /// minimal under contention from `get_subscriptions`, concurrent
244    /// subscribes, or the notification consumer.
245    ///
246    /// # Errors
247    ///
248    /// Returns [`WebSocketError::InvalidMessage`] if a `Success` response
249    /// carries a `result` that is not a JSON array of channel strings.
250    pub async fn subscribe(
251        &self,
252        channels: Vec<String>,
253    ) -> Result<JsonRpcResponse, WebSocketError> {
254        let request = {
255            let mut builder = self.request_builder.lock().await;
256            builder.build_subscribe_request(channels)
257        };
258
259        let response = self.send_request(&request).await?;
260
261        // Parse + validate the confirmed list outside the subscription
262        // mutex. Only acquire the lock once we have work to do.
263        if let Some(confirmed) = confirmed_channels(&response, "public/subscribe")? {
264            let mut sub_manager = self.subscription_manager.lock().await;
265            add_confirmed_channels(&mut sub_manager, confirmed);
266        }
267
268        Ok(response)
269    }
270
271    /// Unsubscribe from channels.
272    ///
273    /// Local subscription state is reconciled against the server-confirmed
274    /// channel list carried by `response.result`, which may be a strict
275    /// subset of the requested channels. Only the channels the server
276    /// actually acknowledged are removed from the local
277    /// [`SubscriptionManager`]. Transport failures and API-error responses
278    /// leave the local view untouched so the caller can retry without
279    /// inconsistency.
280    ///
281    /// The response is parsed and validated outside the
282    /// `subscription_manager` mutex; the lock is only held for the
283    /// `HashMap::remove` loop.
284    ///
285    /// # Errors
286    ///
287    /// Returns [`WebSocketError::InvalidMessage`] if a `Success` response
288    /// carries a `result` that is not a JSON array of channel strings.
289    pub async fn unsubscribe(
290        &self,
291        channels: Vec<String>,
292    ) -> Result<JsonRpcResponse, WebSocketError> {
293        let request = {
294            let mut builder = self.request_builder.lock().await;
295            builder.build_unsubscribe_request(channels)
296        };
297
298        let response = self.send_request(&request).await?;
299
300        // Parse + validate the confirmed list outside the subscription
301        // mutex. Only acquire the lock once we have work to do.
302        if let Some(confirmed) = confirmed_channels(&response, "public/unsubscribe")? {
303            let mut sub_manager = self.subscription_manager.lock().await;
304            remove_confirmed_channels(&mut sub_manager, confirmed);
305        }
306
307        Ok(response)
308    }
309
310    /// Unsubscribe from all public channels
311    ///
312    /// Unsubscribes from all public channels subscribed so far and clears
313    /// the local subscription manager state.
314    ///
315    /// # Returns
316    ///
317    /// Returns `"ok"` on success
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the request fails or the response cannot be parsed
322    pub async fn public_unsubscribe_all(&self) -> Result<String, WebSocketError> {
323        let request = {
324            let mut builder = self.request_builder.lock().await;
325            builder.build_public_unsubscribe_all_request()
326        };
327
328        let request_ctx: &JsonRpcRequest = &request;
329        let response = self.send_request(&request).await?;
330
331        // Clear the local subscription manager only after the server
332        // confirms success. On API error (e.g. not authenticated) we
333        // preserve the local view so the caller can retry without
334        // inconsistency.
335        match response.result {
336            JsonRpcResult::Success { result } => {
337                self.subscription_manager.lock().await.clear();
338                result.as_str().map(String::from).ok_or_else(|| {
339                    WebSocketError::InvalidMessage(
340                        "Expected string result from unsubscribe_all".to_string(),
341                    )
342                })
343            }
344            JsonRpcResult::Error { error } => {
345                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
346                Err(WebSocketError::api_error_from_parts(
347                    request_ctx,
348                    error,
349                    Some(raw),
350                ))
351            }
352        }
353    }
354
355    /// Unsubscribe from all private channels
356    ///
357    /// Unsubscribes from all private channels subscribed so far and clears
358    /// the local subscription manager state. Requires authentication.
359    ///
360    /// # Returns
361    ///
362    /// Returns `"ok"` on success
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the request fails or the response cannot be parsed
367    pub async fn private_unsubscribe_all(&self) -> Result<String, WebSocketError> {
368        let request = {
369            let mut builder = self.request_builder.lock().await;
370            builder.build_private_unsubscribe_all_request()
371        };
372
373        let request_ctx: &JsonRpcRequest = &request;
374        let response = self.send_request(&request).await?;
375
376        // Clear the local subscription manager only after the server
377        // confirms success. On API error we preserve the local view so
378        // the caller can retry without inconsistency.
379        match response.result {
380            JsonRpcResult::Success { result } => {
381                self.subscription_manager.lock().await.clear();
382                result.as_str().map(String::from).ok_or_else(|| {
383                    WebSocketError::InvalidMessage(
384                        "Expected string result from unsubscribe_all".to_string(),
385                    )
386                })
387            }
388            JsonRpcResult::Error { error } => {
389                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
390                Err(WebSocketError::api_error_from_parts(
391                    request_ctx,
392                    error,
393                    Some(raw),
394                ))
395            }
396        }
397    }
398
399    /// Send a JSON-RPC request and await the matching response.
400    ///
401    /// Forwards the request to the dispatcher, which serializes it,
402    /// writes it to the WebSocket sink, and routes the response back by
403    /// matching on the JSON-RPC `id` field. Notifications arriving
404    /// between the request and the response do not affect this call and
405    /// are routed to the notification channel instead.
406    ///
407    /// `request` is borrowed: callers retain ownership so they can
408    /// inspect the originating request after the call (for example to
409    /// build an enriched [`WebSocketError::ApiError`]) without paying
410    /// for a clone on the success path.
411    pub async fn send_request(
412        &self,
413        request: &JsonRpcRequest,
414    ) -> Result<JsonRpcResponse, WebSocketError> {
415        // Clone the Arc<Dispatcher> out under the short-lived slot lock,
416        // then drop the guard before awaiting on the dispatcher. This
417        // keeps the per-client mutex off the hot path so concurrent
418        // send_request calls do not serialize against each other.
419        let dispatcher = {
420            let guard = self.dispatcher.lock().await;
421            guard
422                .as_ref()
423                .map(Arc::clone)
424                .ok_or(WebSocketError::ConnectionClosed)?
425        };
426        dispatcher.send_request(request).await
427    }
428
429    /// Receive the next notification (or unmatched frame) from the server.
430    ///
431    /// Returns [`WebSocketError::ConnectionClosed`] if the dispatcher is
432    /// not running, or if its notification channel has been drained and
433    /// closed.
434    pub async fn receive_message(&self) -> Result<String, WebSocketError> {
435        let dispatcher = {
436            let guard = self.dispatcher.lock().await;
437            guard
438                .as_ref()
439                .map(Arc::clone)
440                .ok_or(WebSocketError::ConnectionClosed)?
441        };
442        dispatcher
443            .next_notification()
444            .await
445            .ok_or(WebSocketError::ConnectionClosed)
446    }
447
448    /// Get active subscriptions
449    pub async fn get_subscriptions(&self) -> Vec<String> {
450        let sub_manager = self.subscription_manager.lock().await;
451        sub_manager.get_all_channels()
452    }
453
454    /// Test connection
455    ///
456    /// Tests the WebSocket connection and returns API version information.
457    ///
458    /// # Returns
459    ///
460    /// Returns `TestResponse` containing the API version string
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the connection test fails
465    pub async fn test_connection(&self) -> Result<crate::model::TestResponse, WebSocketError> {
466        let request = {
467            let mut builder = self.request_builder.lock().await;
468            builder.build_test_request()
469        };
470
471        let request_ctx: &JsonRpcRequest = &request;
472        let response = self.send_request(&request).await?;
473
474        match response.result {
475            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
476                WebSocketError::InvalidMessage(format!("Failed to parse test response: {}", e))
477            }),
478            JsonRpcResult::Error { error } => {
479                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
480                Err(WebSocketError::api_error_from_parts(
481                    request_ctx,
482                    error,
483                    Some(raw),
484                ))
485            }
486        }
487    }
488
489    /// Get server time
490    ///
491    /// Returns the current server timestamp in milliseconds since Unix epoch.
492    ///
493    /// # Returns
494    ///
495    /// Returns `u64` timestamp in milliseconds
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the request fails
500    pub async fn get_time(&self) -> Result<u64, WebSocketError> {
501        let request = {
502            let mut builder = self.request_builder.lock().await;
503            builder.build_get_time_request()
504        };
505
506        let request_ctx: &JsonRpcRequest = &request;
507        let response = self.send_request(&request).await?;
508
509        match response.result {
510            JsonRpcResult::Success { result } => result.as_u64().ok_or_else(|| {
511                WebSocketError::InvalidMessage(
512                    "Expected u64 timestamp in get_time response".to_string(),
513                )
514            }),
515            JsonRpcResult::Error { error } => {
516                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
517                Err(WebSocketError::api_error_from_parts(
518                    request_ctx,
519                    error,
520                    Some(raw),
521                ))
522            }
523        }
524    }
525
526    /// Enable heartbeat with specified interval
527    ///
528    /// The server will send a heartbeat message every `interval` seconds.
529    /// If heartbeat is enabled, the server will also send `test_request` notifications
530    /// which the client should respond to with `public/test` to keep the connection alive.
531    ///
532    /// # Arguments
533    ///
534    /// * `interval` - Heartbeat interval in seconds (10-3600)
535    ///
536    /// # Returns
537    ///
538    /// Returns `"ok"` on success
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the request fails or the interval is invalid
543    pub async fn set_heartbeat(&self, interval: u64) -> Result<String, WebSocketError> {
544        let request = {
545            let mut builder = self.request_builder.lock().await;
546            builder.build_set_heartbeat_request(interval)
547        };
548
549        let request_ctx: &JsonRpcRequest = &request;
550        let response = self.send_request(&request).await?;
551
552        match response.result {
553            JsonRpcResult::Success { result } => {
554                result.as_str().map(String::from).ok_or_else(|| {
555                    WebSocketError::InvalidMessage(
556                        "Expected string result from set_heartbeat".to_string(),
557                    )
558                })
559            }
560            JsonRpcResult::Error { error } => {
561                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
562                Err(WebSocketError::api_error_from_parts(
563                    request_ctx,
564                    error,
565                    Some(raw),
566                ))
567            }
568        }
569    }
570
571    /// Disable heartbeat
572    ///
573    /// Stops the server from sending heartbeat messages and `test_request` notifications.
574    ///
575    /// # Returns
576    ///
577    /// Returns `"ok"` on success
578    ///
579    /// # Errors
580    ///
581    /// Returns an error if the request fails
582    pub async fn disable_heartbeat(&self) -> Result<String, WebSocketError> {
583        let request = {
584            let mut builder = self.request_builder.lock().await;
585            builder.build_disable_heartbeat_request()
586        };
587
588        let request_ctx: &JsonRpcRequest = &request;
589        let response = self.send_request(&request).await?;
590
591        match response.result {
592            JsonRpcResult::Success { result } => {
593                result.as_str().map(String::from).ok_or_else(|| {
594                    WebSocketError::InvalidMessage(
595                        "Expected string result from disable_heartbeat".to_string(),
596                    )
597                })
598            }
599            JsonRpcResult::Error { error } => {
600                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
601                Err(WebSocketError::api_error_from_parts(
602                    request_ctx,
603                    error,
604                    Some(raw),
605                ))
606            }
607        }
608    }
609
610    /// Send client identification to the server
611    ///
612    /// This method identifies the client to the server with its name and version.
613    /// It's recommended to call this after connecting to provide debugging information.
614    ///
615    /// # Arguments
616    ///
617    /// * `client_name` - Name of the client application
618    /// * `client_version` - Version of the client application
619    ///
620    /// # Returns
621    ///
622    /// Returns `HelloResponse` containing the API version information
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the request fails
627    pub async fn hello(
628        &self,
629        client_name: &str,
630        client_version: &str,
631    ) -> Result<crate::model::HelloResponse, WebSocketError> {
632        let request = {
633            let mut builder = self.request_builder.lock().await;
634            builder.build_hello_request(client_name, client_version)
635        };
636
637        let request_ctx: &JsonRpcRequest = &request;
638        let response = self.send_request(&request).await?;
639
640        match response.result {
641            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
642                WebSocketError::InvalidMessage(format!("Failed to parse hello response: {}", e))
643            }),
644            JsonRpcResult::Error { error } => {
645                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
646                Err(WebSocketError::api_error_from_parts(
647                    request_ctx,
648                    error,
649                    Some(raw),
650                ))
651            }
652        }
653    }
654
655    /// Enable automatic order cancellation on disconnect
656    ///
657    /// When enabled, all open orders will be automatically cancelled if the WebSocket
658    /// connection is lost. This is a safety feature to prevent unintended order
659    /// execution when the client loses connectivity.
660    ///
661    /// # Returns
662    ///
663    /// Returns `"ok"` on success
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the request fails or requires authentication
668    pub async fn enable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
669        let request = {
670            let mut builder = self.request_builder.lock().await;
671            builder.build_enable_cancel_on_disconnect_request()
672        };
673
674        let request_ctx: &JsonRpcRequest = &request;
675        let response = self.send_request(&request).await?;
676
677        match response.result {
678            JsonRpcResult::Success { result } => {
679                result.as_str().map(String::from).ok_or_else(|| {
680                    WebSocketError::InvalidMessage(
681                        "Expected string result from enable_cancel_on_disconnect".to_string(),
682                    )
683                })
684            }
685            JsonRpcResult::Error { error } => {
686                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
687                Err(WebSocketError::api_error_from_parts(
688                    request_ctx,
689                    error,
690                    Some(raw),
691                ))
692            }
693        }
694    }
695
696    /// Disable automatic order cancellation on disconnect
697    ///
698    /// When disabled, orders will remain active even if the WebSocket connection
699    /// is lost.
700    ///
701    /// # Returns
702    ///
703    /// Returns `"ok"` on success
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the request fails or requires authentication
708    pub async fn disable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
709        let request = {
710            let mut builder = self.request_builder.lock().await;
711            builder.build_disable_cancel_on_disconnect_request()
712        };
713
714        let request_ctx: &JsonRpcRequest = &request;
715        let response = self.send_request(&request).await?;
716
717        match response.result {
718            JsonRpcResult::Success { result } => {
719                result.as_str().map(String::from).ok_or_else(|| {
720                    WebSocketError::InvalidMessage(
721                        "Expected string result from disable_cancel_on_disconnect".to_string(),
722                    )
723                })
724            }
725            JsonRpcResult::Error { error } => {
726                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
727                Err(WebSocketError::api_error_from_parts(
728                    request_ctx,
729                    error,
730                    Some(raw),
731                ))
732            }
733        }
734    }
735
736    /// Get current cancel-on-disconnect status
737    ///
738    /// Returns whether automatic order cancellation on disconnect is currently enabled.
739    ///
740    /// # Returns
741    ///
742    /// Returns `true` if cancel-on-disconnect is enabled, `false` otherwise
743    ///
744    /// # Errors
745    ///
746    /// Returns an error if the request fails or requires authentication
747    pub async fn get_cancel_on_disconnect(&self) -> Result<bool, WebSocketError> {
748        let request = {
749            let mut builder = self.request_builder.lock().await;
750            builder.build_get_cancel_on_disconnect_request()
751        };
752
753        let request_ctx: &JsonRpcRequest = &request;
754        let response = self.send_request(&request).await?;
755
756        match response.result {
757            JsonRpcResult::Success { result } => {
758                // The result contains "enabled" field
759                result
760                    .get("enabled")
761                    .and_then(|v| v.as_bool())
762                    .ok_or_else(|| {
763                        WebSocketError::InvalidMessage(
764                            "Expected 'enabled' boolean in get_cancel_on_disconnect response"
765                                .to_string(),
766                        )
767                    })
768            }
769            JsonRpcResult::Error { error } => {
770                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
771                Err(WebSocketError::api_error_from_parts(
772                    request_ctx,
773                    error,
774                    Some(raw),
775                ))
776            }
777        }
778    }
779
780    /// Place mass quotes
781    pub async fn mass_quote(
782        &self,
783        request: MassQuoteRequest,
784    ) -> Result<MassQuoteResult, WebSocketError> {
785        // Validate the request first
786        request.validate().map_err(WebSocketError::InvalidMessage)?;
787
788        let json_request = {
789            let mut builder = self.request_builder.lock().await;
790            builder.build_mass_quote_request(request)?
791        };
792
793        let request_ctx: &JsonRpcRequest = &json_request;
794        let response = self.send_request(&json_request).await?;
795
796        // Parse the response using WsResponse structure
797        match response.result {
798            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
799                WebSocketError::InvalidMessage(format!(
800                    "Failed to parse mass quote response: {}",
801                    e
802                ))
803            }),
804            JsonRpcResult::Error { error } => {
805                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
806                Err(WebSocketError::api_error_from_parts(
807                    request_ctx,
808                    error,
809                    Some(raw),
810                ))
811            }
812        }
813    }
814
815    /// Cancel quotes
816    pub async fn cancel_quotes(
817        &self,
818        request: CancelQuotesRequest,
819    ) -> Result<CancelQuotesResponse, WebSocketError> {
820        let json_request = {
821            let mut builder = self.request_builder.lock().await;
822            builder.build_cancel_quotes_request(request)?
823        };
824
825        let request_ctx: &JsonRpcRequest = &json_request;
826        let response = self.send_request(&json_request).await?;
827
828        // Parse the response using JsonRpcResult structure
829        match response.result {
830            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
831                WebSocketError::InvalidMessage(format!(
832                    "Failed to parse cancel quotes response: {}",
833                    e
834                ))
835            }),
836            JsonRpcResult::Error { error } => {
837                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
838                Err(WebSocketError::api_error_from_parts(
839                    request_ctx,
840                    error,
841                    Some(raw),
842                ))
843            }
844        }
845    }
846
847    /// Set MMP group configuration
848    pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
849        let json_request = {
850            let mut builder = self.request_builder.lock().await;
851            builder.build_set_mmp_config_request(config)?
852        };
853
854        let request_ctx: &JsonRpcRequest = &json_request;
855        let response = self.send_request(&json_request).await?;
856
857        match response.result {
858            JsonRpcResult::Success { .. } => Ok(()),
859            JsonRpcResult::Error { error } => {
860                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
861                Err(WebSocketError::api_error_from_parts(
862                    request_ctx,
863                    error,
864                    Some(raw),
865                ))
866            }
867        }
868    }
869
870    /// Get MMP group configuration
871    pub async fn get_mmp_config(
872        &self,
873        mmp_group: Option<String>,
874    ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
875        let json_request = {
876            let mut builder = self.request_builder.lock().await;
877            builder.build_get_mmp_config_request(mmp_group)
878        };
879
880        let request_ctx: &JsonRpcRequest = &json_request;
881        let response = self.send_request(&json_request).await?;
882
883        match response.result {
884            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
885                WebSocketError::InvalidMessage(format!(
886                    "Failed to parse MMP config response: {}",
887                    e
888                ))
889            }),
890            JsonRpcResult::Error { error } => {
891                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
892                Err(WebSocketError::api_error_from_parts(
893                    request_ctx,
894                    error,
895                    Some(raw),
896                ))
897            }
898        }
899    }
900
901    /// Reset MMP group
902    pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
903        let json_request = {
904            let mut builder = self.request_builder.lock().await;
905            builder.build_reset_mmp_request(mmp_group)
906        };
907
908        let request_ctx: &JsonRpcRequest = &json_request;
909        let response = self.send_request(&json_request).await?;
910
911        match response.result {
912            JsonRpcResult::Success { .. } => Ok(()),
913            JsonRpcResult::Error { error } => {
914                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
915                Err(WebSocketError::api_error_from_parts(
916                    request_ctx,
917                    error,
918                    Some(raw),
919                ))
920            }
921        }
922    }
923
924    /// Get open orders (including quotes)
925    pub async fn get_open_orders(
926        &self,
927        currency: Option<String>,
928        kind: Option<String>,
929        type_filter: Option<String>,
930    ) -> Result<Vec<QuoteInfo>, WebSocketError> {
931        let json_request = {
932            let mut builder = self.request_builder.lock().await;
933            builder.build_get_open_orders_request(currency, kind, type_filter)
934        };
935
936        let request_ctx: &JsonRpcRequest = &json_request;
937        let response = self.send_request(&json_request).await?;
938
939        match response.result {
940            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
941                WebSocketError::InvalidMessage(format!(
942                    "Failed to parse open orders response: {}",
943                    e
944                ))
945            }),
946            JsonRpcResult::Error { error } => {
947                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
948                Err(WebSocketError::api_error_from_parts(
949                    request_ctx,
950                    error,
951                    Some(raw),
952                ))
953            }
954        }
955    }
956
957    /// Place a buy order
958    ///
959    /// # Arguments
960    ///
961    /// * `request` - The order request parameters
962    ///
963    /// # Returns
964    ///
965    /// Returns `OrderResponse` containing order info and any immediate trades
966    pub async fn buy(
967        &self,
968        request: crate::model::trading::OrderRequest,
969    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
970        let json_request = {
971            let mut builder = self.request_builder.lock().await;
972            builder.build_buy_request(&request)?
973        };
974
975        let request_ctx: &JsonRpcRequest = &json_request;
976        let response = self.send_request(&json_request).await?;
977
978        match response.result {
979            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
980                WebSocketError::InvalidMessage(format!("Failed to parse buy response: {}", e))
981            }),
982            JsonRpcResult::Error { error } => {
983                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
984                Err(WebSocketError::api_error_from_parts(
985                    request_ctx,
986                    error,
987                    Some(raw),
988                ))
989            }
990        }
991    }
992
993    /// Place a sell order
994    ///
995    /// # Arguments
996    ///
997    /// * `request` - The order request parameters
998    ///
999    /// # Returns
1000    ///
1001    /// Returns `OrderResponse` containing order info and any immediate trades
1002    pub async fn sell(
1003        &self,
1004        request: crate::model::trading::OrderRequest,
1005    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
1006        let json_request = {
1007            let mut builder = self.request_builder.lock().await;
1008            builder.build_sell_request(&request)?
1009        };
1010
1011        let request_ctx: &JsonRpcRequest = &json_request;
1012        let response = self.send_request(&json_request).await?;
1013
1014        match response.result {
1015            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1016                WebSocketError::InvalidMessage(format!("Failed to parse sell response: {}", e))
1017            }),
1018            JsonRpcResult::Error { error } => {
1019                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1020                Err(WebSocketError::api_error_from_parts(
1021                    request_ctx,
1022                    error,
1023                    Some(raw),
1024                ))
1025            }
1026        }
1027    }
1028
1029    /// Cancel an order by ID
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `order_id` - The order ID to cancel
1034    ///
1035    /// # Returns
1036    ///
1037    /// Returns `OrderInfo` for the cancelled order
1038    pub async fn cancel(
1039        &self,
1040        order_id: &str,
1041    ) -> Result<crate::model::trading::OrderInfo, WebSocketError> {
1042        let json_request = {
1043            let mut builder = self.request_builder.lock().await;
1044            builder.build_cancel_request(order_id)
1045        };
1046
1047        let request_ctx: &JsonRpcRequest = &json_request;
1048        let response = self.send_request(&json_request).await?;
1049
1050        match response.result {
1051            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1052                WebSocketError::InvalidMessage(format!("Failed to parse cancel response: {}", e))
1053            }),
1054            JsonRpcResult::Error { error } => {
1055                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1056                Err(WebSocketError::api_error_from_parts(
1057                    request_ctx,
1058                    error,
1059                    Some(raw),
1060                ))
1061            }
1062        }
1063    }
1064
1065    /// Cancel all orders
1066    ///
1067    /// # Returns
1068    ///
1069    /// Returns the number of orders cancelled
1070    pub async fn cancel_all(&self) -> Result<u32, WebSocketError> {
1071        let json_request = {
1072            let mut builder = self.request_builder.lock().await;
1073            builder.build_cancel_all_request()
1074        };
1075
1076        let request_ctx: &JsonRpcRequest = &json_request;
1077        let response = self.send_request(&json_request).await?;
1078
1079        match response.result {
1080            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1081                WebSocketError::InvalidMessage(format!(
1082                    "Failed to parse cancel_all response: {}",
1083                    e
1084                ))
1085            }),
1086            JsonRpcResult::Error { error } => {
1087                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1088                Err(WebSocketError::api_error_from_parts(
1089                    request_ctx,
1090                    error,
1091                    Some(raw),
1092                ))
1093            }
1094        }
1095    }
1096
1097    /// Cancel all orders by currency
1098    ///
1099    /// # Arguments
1100    ///
1101    /// * `currency` - Currency to cancel orders for (e.g., "BTC", "ETH")
1102    ///
1103    /// # Returns
1104    ///
1105    /// Returns the number of orders cancelled
1106    pub async fn cancel_all_by_currency(&self, currency: &str) -> Result<u32, WebSocketError> {
1107        let json_request = {
1108            let mut builder = self.request_builder.lock().await;
1109            builder.build_cancel_all_by_currency_request(currency)
1110        };
1111
1112        let request_ctx: &JsonRpcRequest = &json_request;
1113        let response = self.send_request(&json_request).await?;
1114
1115        match response.result {
1116            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1117                WebSocketError::InvalidMessage(format!(
1118                    "Failed to parse cancel_all_by_currency response: {}",
1119                    e
1120                ))
1121            }),
1122            JsonRpcResult::Error { error } => {
1123                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1124                Err(WebSocketError::api_error_from_parts(
1125                    request_ctx,
1126                    error,
1127                    Some(raw),
1128                ))
1129            }
1130        }
1131    }
1132
1133    /// Cancel all orders by instrument
1134    ///
1135    /// # Arguments
1136    ///
1137    /// * `instrument_name` - Instrument name to cancel orders for (e.g., "BTC-PERPETUAL")
1138    ///
1139    /// # Returns
1140    ///
1141    /// Returns the number of orders cancelled
1142    pub async fn cancel_all_by_instrument(
1143        &self,
1144        instrument_name: &str,
1145    ) -> Result<u32, WebSocketError> {
1146        let json_request = {
1147            let mut builder = self.request_builder.lock().await;
1148            builder.build_cancel_all_by_instrument_request(instrument_name)
1149        };
1150
1151        let request_ctx: &JsonRpcRequest = &json_request;
1152        let response = self.send_request(&json_request).await?;
1153
1154        match response.result {
1155            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1156                WebSocketError::InvalidMessage(format!(
1157                    "Failed to parse cancel_all_by_instrument response: {}",
1158                    e
1159                ))
1160            }),
1161            JsonRpcResult::Error { error } => {
1162                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1163                Err(WebSocketError::api_error_from_parts(
1164                    request_ctx,
1165                    error,
1166                    Some(raw),
1167                ))
1168            }
1169        }
1170    }
1171
1172    /// Edit an existing order
1173    ///
1174    /// # Arguments
1175    ///
1176    /// * `request` - The edit order request parameters
1177    ///
1178    /// # Returns
1179    ///
1180    /// Returns `OrderResponse` containing updated order info and any trades
1181    pub async fn edit(
1182        &self,
1183        request: crate::model::trading::EditOrderRequest,
1184    ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
1185        let json_request = {
1186            let mut builder = self.request_builder.lock().await;
1187            builder.build_edit_request(&request)?
1188        };
1189
1190        let request_ctx: &JsonRpcRequest = &json_request;
1191        let response = self.send_request(&json_request).await?;
1192
1193        match response.result {
1194            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1195                WebSocketError::InvalidMessage(format!("Failed to parse edit response: {}", e))
1196            }),
1197            JsonRpcResult::Error { error } => {
1198                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1199                Err(WebSocketError::api_error_from_parts(
1200                    request_ctx,
1201                    error,
1202                    Some(raw),
1203                ))
1204            }
1205        }
1206    }
1207
1208    // Account methods
1209
1210    /// Get positions for the specified currency and kind
1211    ///
1212    /// Retrieves user positions filtered by currency and/or instrument kind.
1213    ///
1214    /// # Arguments
1215    ///
1216    /// * `currency` - Currency filter (BTC, ETH, USDC, etc.) - optional
1217    /// * `kind` - Kind filter (future, option, spot, etc.) - optional
1218    ///
1219    /// # Returns
1220    ///
1221    /// A vector of positions matching the filter criteria
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns an error if the request fails or the response cannot be parsed
1226    pub async fn get_positions(
1227        &self,
1228        currency: Option<&str>,
1229        kind: Option<&str>,
1230    ) -> Result<Vec<crate::model::Position>, WebSocketError> {
1231        let json_request = {
1232            let mut builder = self.request_builder.lock().await;
1233            builder.build_get_positions_request(currency, kind)
1234        };
1235
1236        let request_ctx: &JsonRpcRequest = &json_request;
1237        let response = self.send_request(&json_request).await?;
1238
1239        match response.result {
1240            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1241                WebSocketError::InvalidMessage(format!("Failed to parse positions response: {}", e))
1242            }),
1243            JsonRpcResult::Error { error } => {
1244                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1245                Err(WebSocketError::api_error_from_parts(
1246                    request_ctx,
1247                    error,
1248                    Some(raw),
1249                ))
1250            }
1251        }
1252    }
1253
1254    /// Get account summary for the specified currency
1255    ///
1256    /// Retrieves account summary information including balance, margin, and other account details.
1257    ///
1258    /// # Arguments
1259    ///
1260    /// * `currency` - Currency to get summary for (BTC, ETH, USDC, etc.)
1261    /// * `extended` - Whether to include extended information
1262    ///
1263    /// # Returns
1264    ///
1265    /// Account summary for the specified currency
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if the request fails or the response cannot be parsed
1270    pub async fn get_account_summary(
1271        &self,
1272        currency: &str,
1273        extended: Option<bool>,
1274    ) -> Result<crate::model::AccountSummary, WebSocketError> {
1275        let json_request = {
1276            let mut builder = self.request_builder.lock().await;
1277            builder.build_get_account_summary_request(currency, extended)
1278        };
1279
1280        let request_ctx: &JsonRpcRequest = &json_request;
1281        let response = self.send_request(&json_request).await?;
1282
1283        match response.result {
1284            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1285                WebSocketError::InvalidMessage(format!(
1286                    "Failed to parse account summary response: {}",
1287                    e
1288                ))
1289            }),
1290            JsonRpcResult::Error { error } => {
1291                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1292                Err(WebSocketError::api_error_from_parts(
1293                    request_ctx,
1294                    error,
1295                    Some(raw),
1296                ))
1297            }
1298        }
1299    }
1300
1301    /// Get the state of an order
1302    ///
1303    /// Retrieves detailed information about a specific order.
1304    ///
1305    /// # Arguments
1306    ///
1307    /// * `order_id` - The order ID to get state for
1308    ///
1309    /// # Returns
1310    ///
1311    /// Order information for the specified order
1312    ///
1313    /// # Errors
1314    ///
1315    /// Returns an error if the request fails or the response cannot be parsed
1316    pub async fn get_order_state(
1317        &self,
1318        order_id: &str,
1319    ) -> Result<crate::model::OrderInfo, WebSocketError> {
1320        let json_request = {
1321            let mut builder = self.request_builder.lock().await;
1322            builder.build_get_order_state_request(order_id)
1323        };
1324
1325        let request_ctx: &JsonRpcRequest = &json_request;
1326        let response = self.send_request(&json_request).await?;
1327
1328        match response.result {
1329            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1330                WebSocketError::InvalidMessage(format!(
1331                    "Failed to parse order state response: {}",
1332                    e
1333                ))
1334            }),
1335            JsonRpcResult::Error { error } => {
1336                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1337                Err(WebSocketError::api_error_from_parts(
1338                    request_ctx,
1339                    error,
1340                    Some(raw),
1341                ))
1342            }
1343        }
1344    }
1345
1346    /// Get order history by currency
1347    ///
1348    /// Retrieves historical orders for the specified currency.
1349    ///
1350    /// # Arguments
1351    ///
1352    /// * `currency` - Currency to get order history for
1353    /// * `kind` - Kind filter (future, option, spot, etc.) - optional
1354    /// * `count` - Number of items to return - optional
1355    ///
1356    /// # Returns
1357    ///
1358    /// A vector of historical orders matching the filter criteria
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns an error if the request fails or the response cannot be parsed
1363    pub async fn get_order_history_by_currency(
1364        &self,
1365        currency: &str,
1366        kind: Option<&str>,
1367        count: Option<u32>,
1368    ) -> Result<Vec<crate::model::OrderInfo>, WebSocketError> {
1369        let json_request = {
1370            let mut builder = self.request_builder.lock().await;
1371            builder.build_get_order_history_by_currency_request(currency, kind, count)
1372        };
1373
1374        let request_ctx: &JsonRpcRequest = &json_request;
1375        let response = self.send_request(&json_request).await?;
1376
1377        match response.result {
1378            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1379                WebSocketError::InvalidMessage(format!(
1380                    "Failed to parse order history response: {}",
1381                    e
1382                ))
1383            }),
1384            JsonRpcResult::Error { error } => {
1385                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1386                Err(WebSocketError::api_error_from_parts(
1387                    request_ctx,
1388                    error,
1389                    Some(raw),
1390                ))
1391            }
1392        }
1393    }
1394
1395    // Position management methods
1396
1397    /// Close an existing position
1398    ///
1399    /// Places a reduce-only order to close an existing position.
1400    ///
1401    /// # Arguments
1402    ///
1403    /// * `instrument_name` - The instrument to close position for
1404    /// * `order_type` - Order type: "limit" or "market"
1405    /// * `price` - Price for limit orders (required if order_type is "limit")
1406    ///
1407    /// # Returns
1408    ///
1409    /// Response containing the order and any trades executed
1410    ///
1411    /// # Errors
1412    ///
1413    /// Returns an error if the request fails or the response cannot be parsed
1414    pub async fn close_position(
1415        &self,
1416        instrument_name: &str,
1417        order_type: &str,
1418        price: Option<f64>,
1419    ) -> Result<crate::model::ClosePositionResponse, WebSocketError> {
1420        let json_request = {
1421            let mut builder = self.request_builder.lock().await;
1422            builder.build_close_position_request(instrument_name, order_type, price)?
1423        };
1424
1425        let request_ctx: &JsonRpcRequest = &json_request;
1426        let response = self.send_request(&json_request).await?;
1427
1428        match response.result {
1429            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1430                WebSocketError::InvalidMessage(format!(
1431                    "Failed to parse close position response: {}",
1432                    e
1433                ))
1434            }),
1435            JsonRpcResult::Error { error } => {
1436                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1437                Err(WebSocketError::api_error_from_parts(
1438                    request_ctx,
1439                    error,
1440                    Some(raw),
1441                ))
1442            }
1443        }
1444    }
1445
1446    /// Move positions between subaccounts
1447    ///
1448    /// Transfers positions from one subaccount to another within the same main account.
1449    ///
1450    /// # Arguments
1451    ///
1452    /// * `currency` - Currency for the positions (BTC, ETH, etc.)
1453    /// * `source_uid` - Source subaccount ID
1454    /// * `target_uid` - Target subaccount ID
1455    /// * `trades` - List of positions to move
1456    ///
1457    /// # Returns
1458    ///
1459    /// A vector of results for each position moved
1460    ///
1461    /// # Errors
1462    ///
1463    /// Returns an error if the request fails or the response cannot be parsed
1464    pub async fn move_positions(
1465        &self,
1466        currency: &str,
1467        source_uid: u64,
1468        target_uid: u64,
1469        trades: &[crate::model::MovePositionTrade],
1470    ) -> Result<Vec<crate::model::MovePositionResult>, WebSocketError> {
1471        let json_request = {
1472            let mut builder = self.request_builder.lock().await;
1473            builder.build_move_positions_request(currency, source_uid, target_uid, trades)?
1474        };
1475
1476        let request_ctx: &JsonRpcRequest = &json_request;
1477        let response = self.send_request(&json_request).await?;
1478
1479        match response.result {
1480            JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1481                WebSocketError::InvalidMessage(format!(
1482                    "Failed to parse move positions response: {}",
1483                    e
1484                ))
1485            }),
1486            JsonRpcResult::Error { error } => {
1487                let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1488                Err(WebSocketError::api_error_from_parts(
1489                    request_ctx,
1490                    error,
1491                    Some(raw),
1492                ))
1493            }
1494        }
1495    }
1496
1497    /// Set message handler with callbacks
1498    /// The message_callback processes each incoming message and returns Result<(), Error>
1499    /// The error_callback is called only when message_callback returns an error
1500    pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
1501    where
1502        F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
1503        E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
1504    {
1505        self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
1506    }
1507
1508    /// Set message handler using builder pattern
1509    pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
1510        self.message_handler = Some(handler);
1511    }
1512
1513    /// Remove the current message handler
1514    pub fn clear_message_handler(&mut self) {
1515        self.message_handler = None;
1516    }
1517
1518    /// Check if message handler is set
1519    pub fn has_message_handler(&self) -> bool {
1520        self.message_handler.is_some()
1521    }
1522
1523    /// Receive and process a message using the registered callbacks
1524    /// This method will:
1525    /// 1. Receive a message from the WebSocket
1526    /// 2. Call the primary callback with the message
1527    /// 3. If primary callback returns error, call error callback with message and error
1528    pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
1529        let message = self.receive_message().await?;
1530
1531        if let Some(handler) = &self.message_handler {
1532            handler.handle_message(&message);
1533        }
1534
1535        Ok(())
1536    }
1537
1538    /// Start message processing loop with callbacks
1539    /// This will continuously receive messages and process them using the registered callbacks
1540    /// The loop will continue until an error occurs or the connection is closed
1541    pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
1542        if self.message_handler.is_none() {
1543            return Err(WebSocketError::InvalidMessage(
1544                "No message handler set. Use set_message_handler() first.".to_string(),
1545            ));
1546        }
1547
1548        loop {
1549            match self.receive_and_process_message().await {
1550                Ok(()) => {
1551                    // Message processed successfully, continue
1552                }
1553                Err(WebSocketError::ConnectionClosed) => {
1554                    // Connection closed, exit loop gracefully
1555                    break;
1556                }
1557                Err(e) => {
1558                    // Other error occurred, propagate it
1559                    return Err(e);
1560                }
1561            }
1562        }
1563
1564        Ok(())
1565    }
1566}
1567
1568impl Default for DeribitWebSocketClient {
1569    fn default() -> Self {
1570        let config = WebSocketConfig::default();
1571        // `Default` cannot return `Result`; `Self::new` only fails on invalid
1572        // URL parsing which cannot happen for `WebSocketConfig::default()`.
1573        // Tracked separately for a fallible-only constructor redesign.
1574        #[allow(clippy::unwrap_used)]
1575        Self::new(&config).unwrap()
1576    }
1577}
1578
1579/// Add a pre-parsed list of server-confirmed channels to `manager`.
1580///
1581/// Computes the typed [`SubscriptionChannel`] variant and the instrument
1582/// token for each channel and records the subscription in the manager.
1583/// The caller is expected to invoke this while holding the
1584/// `subscription_manager` lock; parsing and validation of the raw
1585/// response should happen outside the lock (via [`confirmed_channels`]).
1586fn add_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
1587    for channel in channels {
1588        let channel_type = SubscriptionChannel::from_string(&channel);
1589        let instrument = instrument_from_channel(&channel);
1590        manager.add_subscription(channel, channel_type, instrument);
1591    }
1592}
1593
1594/// Remove a pre-parsed list of server-confirmed channels from `manager`.
1595///
1596/// The caller is expected to invoke this while holding the
1597/// `subscription_manager` lock; parsing and validation of the raw
1598/// response should happen outside the lock (via [`confirmed_channels`]).
1599fn remove_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
1600    for channel in channels {
1601        manager.remove_subscription(&channel);
1602    }
1603}
1604
1605/// Extract the confirmed channel list from a subscribe/unsubscribe response.
1606///
1607/// - `Success` with a JSON array of strings → `Ok(Some(list))`.
1608/// - `Success` with any other shape → `Err(InvalidMessage)`.
1609/// - `Error` → `Ok(None)`, signalling the caller to leave local state
1610///   untouched. The caller is expected to return the [`JsonRpcResponse`]
1611///   to its own caller so the API error surfaces.
1612fn confirmed_channels(
1613    response: &JsonRpcResponse,
1614    method: &'static str,
1615) -> Result<Option<Vec<String>>, WebSocketError> {
1616    match &response.result {
1617        JsonRpcResult::Success { result } => serde_json::from_value::<Vec<String>>(result.clone())
1618            .map(Some)
1619            .map_err(|e| {
1620                WebSocketError::InvalidMessage(format!(
1621                    "expected array of confirmed channel strings in {} response: {}",
1622                    method, e
1623                ))
1624            }),
1625        JsonRpcResult::Error { .. } => Ok(None),
1626    }
1627}
1628
1629/// Extract the instrument/currency/index token carried by a channel name.
1630///
1631/// Plain-function counterpart of `DeribitWebSocketClient::extract_instrument`
1632/// so the reconciliation helpers can run without a client instance (and
1633/// therefore be unit-tested in isolation). Keeps the same pattern-match
1634/// shape as the method to avoid drift.
1635fn instrument_from_channel(channel: &str) -> Option<String> {
1636    let parts: Vec<&str> = channel.split('.').collect();
1637    match parts.as_slice() {
1638        ["ticker", instrument] | ["ticker", instrument, _] => Some((*instrument).to_string()),
1639        ["book", instrument, ..] => Some((*instrument).to_string()),
1640        ["trades", instrument, ..] => Some((*instrument).to_string()),
1641        ["chart", "trades", instrument, _] => Some((*instrument).to_string()),
1642        ["user", "changes", instrument, _] => Some((*instrument).to_string()),
1643        ["estimated_expiration_price", instrument] => Some((*instrument).to_string()),
1644        ["markprice", "options", instrument] => Some((*instrument).to_string()),
1645        ["perpetual", instrument, _] => Some((*instrument).to_string()),
1646        ["quote", instrument] => Some((*instrument).to_string()),
1647        ["incremental_ticker", instrument] => Some((*instrument).to_string()),
1648        ["deribit_price_index", index_name]
1649        | ["deribit_price_ranking", index_name]
1650        | ["deribit_price_statistics", index_name]
1651        | ["deribit_volatility_index", index_name] => Some((*index_name).to_string()),
1652        ["instrument", "state", _kind, currency] => Some((*currency).to_string()),
1653        ["block_rfq", "trades", currency] => Some((*currency).to_string()),
1654        ["block_trade_confirmations", currency] => Some((*currency).to_string()),
1655        ["user", "mmp_trigger", index_name] => Some((*index_name).to_string()),
1656        _ => None,
1657    }
1658}
1659
1660#[cfg(test)]
1661#[allow(clippy::unwrap_used, clippy::expect_used)]
1662mod tests {
1663    //! Reconciliation tests for `subscribe` / `unsubscribe` (issue #62).
1664    //!
1665    //! The bulk are stubbed-response tests that drive the pure sync
1666    //! helpers ([`confirmed_channels`] / [`add_confirmed_channels`] /
1667    //! [`remove_confirmed_channels`]) directly with hand-crafted
1668    //! [`JsonRpcResponse`] values and a bare [`SubscriptionManager`]. One
1669    //! end-to-end test stands up a mock WebSocket server and exercises
1670    //! the full [`DeribitWebSocketClient::subscribe`] path to prove the
1671    //! acceptance criterion from the issue.
1672    use super::*;
1673    use crate::model::ws_types::JsonRpcError;
1674    use serde_json::json;
1675
1676    /// Build a `Success` response carrying `result`.
1677    fn success(result: serde_json::Value) -> JsonRpcResponse {
1678        JsonRpcResponse::success(json!(1), result)
1679    }
1680
1681    /// Build an `Error` response with a realistic Deribit-style shape.
1682    fn api_error(code: i32, message: &str) -> JsonRpcResponse {
1683        JsonRpcResponse::error(
1684            json!(1),
1685            JsonRpcError {
1686                code,
1687                message: message.to_string(),
1688                data: None,
1689            },
1690        )
1691    }
1692
1693    /// Drive the same control flow that `subscribe()` uses: parse the
1694    /// response outside the lock, then hand the confirmed list to the
1695    /// mutator. Returns `Ok(())` on success (including API-error
1696    /// responses, which are a deliberate no-op) and the parse error on
1697    /// malformed `Success` responses.
1698    fn reconcile_subscribe(
1699        manager: &mut SubscriptionManager,
1700        response: &JsonRpcResponse,
1701    ) -> Result<(), WebSocketError> {
1702        if let Some(confirmed) = confirmed_channels(response, "public/subscribe")? {
1703            add_confirmed_channels(manager, confirmed);
1704        }
1705        Ok(())
1706    }
1707
1708    /// Mirror of [`reconcile_subscribe`] for the unsubscribe path.
1709    fn reconcile_unsubscribe(
1710        manager: &mut SubscriptionManager,
1711        response: &JsonRpcResponse,
1712    ) -> Result<(), WebSocketError> {
1713        if let Some(confirmed) = confirmed_channels(response, "public/unsubscribe")? {
1714            remove_confirmed_channels(manager, confirmed);
1715        }
1716        Ok(())
1717    }
1718
1719    // ---------------------------------------------------------------
1720    // Stubbed-response unit tests: subscribe reconciliation
1721    // ---------------------------------------------------------------
1722
1723    #[test]
1724    fn test_reconcile_subscribe_adds_only_server_confirmed_channels() {
1725        // Core acceptance criterion for issue #62: caller requested two
1726        // channels, server accepted only one. Local view must reflect
1727        // the server-confirmed subset — never the input.
1728        let mut manager = SubscriptionManager::new();
1729        let response = success(json!(["ticker.BTC-PERPETUAL"]));
1730
1731        reconcile_subscribe(&mut manager, &response)
1732            .expect("well-formed success response reconciles");
1733
1734        let channels = manager.get_all_channels();
1735        assert_eq!(channels, vec!["ticker.BTC-PERPETUAL".to_string()]);
1736        assert!(
1737            manager.get_subscription("ticker.INVALID").is_none(),
1738            "rejected input channel must not leak into local state"
1739        );
1740    }
1741
1742    #[test]
1743    fn test_reconcile_subscribe_happy_path_input_equals_response() {
1744        // Regression guard: when the server confirms everything the
1745        // caller asked for, every requested channel lands in the manager.
1746        let mut manager = SubscriptionManager::new();
1747        let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
1748
1749        reconcile_subscribe(&mut manager, &response).expect("happy-path response reconciles");
1750
1751        let mut channels = manager.get_all_channels();
1752        channels.sort();
1753        assert_eq!(
1754            channels,
1755            vec![
1756                "book.ETH-PERPETUAL.raw".to_string(),
1757                "ticker.BTC-PERPETUAL".to_string(),
1758            ]
1759        );
1760        // Instrument extraction should populate the typed side too.
1761        let ticker = manager
1762            .get_subscription("ticker.BTC-PERPETUAL")
1763            .expect("ticker subscription tracked");
1764        assert_eq!(ticker.instrument.as_deref(), Some("BTC-PERPETUAL"));
1765    }
1766
1767    #[test]
1768    fn test_reconcile_subscribe_empty_result_is_noop() {
1769        // Server accepted zero of the requested channels. The function
1770        // succeeds but makes no entries.
1771        let mut manager = SubscriptionManager::new();
1772        let response = success(json!([] as [&str; 0]));
1773
1774        reconcile_subscribe(&mut manager, &response).expect("empty confirmation is valid");
1775
1776        assert!(manager.get_all_channels().is_empty());
1777    }
1778
1779    #[test]
1780    fn test_reconcile_subscribe_api_error_is_noop() {
1781        // API-error responses are surfaced to the caller verbatim and
1782        // must leave the local view untouched so the caller can retry.
1783        let mut manager = SubscriptionManager::new();
1784        manager.add_subscription(
1785            "ticker.BTC-PERPETUAL".to_string(),
1786            SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1787            Some("BTC-PERPETUAL".to_string()),
1788        );
1789        let before = manager.get_all_channels();
1790        let response = api_error(-32000, "subscription rejected");
1791
1792        // Must resolve to Ok(None) — no mutation attempted.
1793        assert!(
1794            matches!(confirmed_channels(&response, "public/subscribe"), Ok(None)),
1795            "api-error response must yield Ok(None)"
1796        );
1797        reconcile_subscribe(&mut manager, &response)
1798            .expect("api-error response must not return Err");
1799
1800        assert_eq!(
1801            manager.get_all_channels(),
1802            before,
1803            "api-error response must not mutate the manager"
1804        );
1805    }
1806
1807    #[test]
1808    fn test_reconcile_subscribe_non_array_result_returns_invalid_message() {
1809        // A `Success` whose `result` is not an array of strings is a
1810        // protocol violation — we surface it as `InvalidMessage` rather
1811        // than silently skipping reconciliation.
1812        let mut manager = SubscriptionManager::new();
1813        let response = success(json!({ "channels": ["ticker.BTC-PERPETUAL"] }));
1814
1815        let err = reconcile_subscribe(&mut manager, &response)
1816            .expect_err("object result must not parse as Vec<String>");
1817        assert!(
1818            matches!(err, WebSocketError::InvalidMessage(_)),
1819            "expected InvalidMessage, got {:?}",
1820            err
1821        );
1822        assert!(
1823            manager.get_all_channels().is_empty(),
1824            "failed reconciliation must not partially mutate the manager"
1825        );
1826    }
1827
1828    // ---------------------------------------------------------------
1829    // Stubbed-response unit tests: unsubscribe reconciliation
1830    // ---------------------------------------------------------------
1831
1832    #[test]
1833    fn test_reconcile_unsubscribe_removes_only_server_confirmed_channels() {
1834        // Mirror of the subscribe subset test: two channels live in the
1835        // manager; the server confirms only one was unsubscribed. The
1836        // other must stay.
1837        let mut manager = SubscriptionManager::new();
1838        manager.add_subscription(
1839            "ticker.BTC-PERPETUAL".to_string(),
1840            SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1841            Some("BTC-PERPETUAL".to_string()),
1842        );
1843        manager.add_subscription(
1844            "ticker.ETH-PERPETUAL".to_string(),
1845            SubscriptionChannel::Ticker("ETH-PERPETUAL".to_string()),
1846            Some("ETH-PERPETUAL".to_string()),
1847        );
1848        let response = success(json!(["ticker.BTC-PERPETUAL"]));
1849
1850        reconcile_unsubscribe(&mut manager, &response)
1851            .expect("well-formed unsubscribe response reconciles");
1852
1853        let channels = manager.get_all_channels();
1854        assert_eq!(channels, vec!["ticker.ETH-PERPETUAL".to_string()]);
1855    }
1856
1857    #[test]
1858    fn test_reconcile_unsubscribe_happy_path() {
1859        // Regression guard: server confirms everything the caller asked
1860        // to drop; the manager ends empty.
1861        let mut manager = SubscriptionManager::new();
1862        manager.add_subscription(
1863            "ticker.BTC-PERPETUAL".to_string(),
1864            SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1865            Some("BTC-PERPETUAL".to_string()),
1866        );
1867        manager.add_subscription(
1868            "book.ETH-PERPETUAL.raw".to_string(),
1869            SubscriptionChannel::OrderBook("ETH-PERPETUAL".to_string()),
1870            Some("ETH-PERPETUAL".to_string()),
1871        );
1872        let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
1873
1874        reconcile_unsubscribe(&mut manager, &response).expect("happy-path unsubscribe reconciles");
1875
1876        assert!(manager.get_all_channels().is_empty());
1877    }
1878
1879    #[test]
1880    fn test_reconcile_unsubscribe_api_error_is_noop() {
1881        let mut manager = SubscriptionManager::new();
1882        manager.add_subscription(
1883            "ticker.BTC-PERPETUAL".to_string(),
1884            SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1885            Some("BTC-PERPETUAL".to_string()),
1886        );
1887        let before = manager.get_all_channels();
1888        let response = api_error(-32000, "unsubscribe rejected");
1889
1890        reconcile_unsubscribe(&mut manager, &response)
1891            .expect("api-error response must not return Err");
1892
1893        assert_eq!(
1894            manager.get_all_channels(),
1895            before,
1896            "api-error response must not mutate the manager"
1897        );
1898    }
1899
1900    #[test]
1901    fn test_reconcile_unsubscribe_non_array_result_returns_invalid_message() {
1902        let mut manager = SubscriptionManager::new();
1903        manager.add_subscription(
1904            "ticker.BTC-PERPETUAL".to_string(),
1905            SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1906            Some("BTC-PERPETUAL".to_string()),
1907        );
1908        let response = success(json!("not an array"));
1909
1910        let err = reconcile_unsubscribe(&mut manager, &response)
1911            .expect_err("string result must not parse as Vec<String>");
1912        assert!(
1913            matches!(err, WebSocketError::InvalidMessage(_)),
1914            "expected InvalidMessage, got {:?}",
1915            err
1916        );
1917        assert_eq!(
1918            manager.get_all_channels(),
1919            vec!["ticker.BTC-PERPETUAL".to_string()],
1920            "failed reconciliation must not partially mutate the manager"
1921        );
1922    }
1923
1924    // ---------------------------------------------------------------
1925    // End-to-end mock WebSocket server test (acceptance criterion #1)
1926    // ---------------------------------------------------------------
1927
1928    /// Spawn a single-shot mock WebSocket server on an ephemeral port.
1929    ///
1930    /// The `scenario` closure receives the split sink/stream for the one
1931    /// accepted connection and drives the test-specific server-side
1932    /// behaviour. Returns the bound address and a join handle the test
1933    /// must await at the end to surface any panic from the task.
1934    async fn spawn_mock_server<F, Fut>(
1935        scenario: F,
1936    ) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>)
1937    where
1938        F: FnOnce(
1939                futures_util::stream::SplitSink<
1940                    tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
1941                    tokio_tungstenite::tungstenite::Message,
1942                >,
1943                futures_util::stream::SplitStream<
1944                    tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
1945                >,
1946            ) -> Fut
1947            + Send
1948            + 'static,
1949        Fut: std::future::Future<Output = ()> + Send,
1950    {
1951        use futures_util::StreamExt;
1952        use tokio::net::TcpListener;
1953        use tokio_tungstenite::accept_async;
1954
1955        let listener = TcpListener::bind("127.0.0.1:0")
1956            .await
1957            .expect("bind localhost ephemeral port");
1958        let addr = listener
1959            .local_addr()
1960            .expect("read local addr of bound listener");
1961        let handle = tokio::spawn(async move {
1962            let (socket, _peer) = match listener.accept().await {
1963                Ok(pair) => pair,
1964                Err(_) => return,
1965            };
1966            let ws = match accept_async(socket).await {
1967                Ok(ws) => ws,
1968                Err(_) => return,
1969            };
1970            let (sink, stream) = ws.split();
1971            scenario(sink, stream).await;
1972        });
1973        (addr, handle)
1974    }
1975
1976    #[tokio::test]
1977    async fn test_subscribe_reconciles_local_state_with_server_subset() {
1978        // End-to-end proof of the issue #62 acceptance criterion:
1979        //
1980        //   client.subscribe(["ticker.INVALID", "ticker.BTC-PERPETUAL"])
1981        //
1982        // against a server that replies with `["ticker.BTC-PERPETUAL"]`
1983        // leaves only BTC-PERPETUAL in the local manager.
1984        use futures_util::{SinkExt, StreamExt};
1985        use tokio_tungstenite::tungstenite::Message;
1986
1987        let (addr, server) = spawn_mock_server(|mut sink, mut stream| async move {
1988            // Read the subscribe request, echo back only the BTC channel.
1989            if let Some(Ok(Message::Text(t))) = stream.next().await {
1990                let req: serde_json::Value =
1991                    serde_json::from_str(&t).expect("server parses request");
1992                let id = req.get("id").cloned().unwrap_or(serde_json::Value::Null);
1993                let resp = json!({
1994                    "jsonrpc": "2.0",
1995                    "id": id,
1996                    "result": ["ticker.BTC-PERPETUAL"],
1997                });
1998                let _ = sink.send(Message::Text(resp.to_string().into())).await;
1999            }
2000            // Hold the socket open long enough for the client to read.
2001            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2002        })
2003        .await;
2004
2005        let config = WebSocketConfig::with_url(&format!("ws://{}/", addr)).expect("valid ws url");
2006        let client = DeribitWebSocketClient::new(&config).expect("client construction");
2007        client.connect().await.expect("client connects to mock");
2008
2009        let response = client
2010            .subscribe(vec![
2011                "ticker.INVALID".to_string(),
2012                "ticker.BTC-PERPETUAL".to_string(),
2013            ])
2014            .await
2015            .expect("subscribe returns the server-confirmed response");
2016
2017        // Server-confirmed response is surfaced verbatim.
2018        let JsonRpcResult::Success { result } = response.result else {
2019            panic!("expected Success result, got {:?}", response.result);
2020        };
2021        assert_eq!(result, json!(["ticker.BTC-PERPETUAL"]));
2022
2023        // Local manager reflects the server-confirmed subset — only BTC,
2024        // never the rejected INVALID channel.
2025        let manager = client.subscription_manager();
2026        let channels = manager.lock().await.get_all_channels();
2027        assert_eq!(
2028            channels,
2029            vec!["ticker.BTC-PERPETUAL".to_string()],
2030            "local manager must drop rejected channels from the input"
2031        );
2032
2033        client.disconnect().await.expect("client disconnects");
2034        server.await.expect("server task did not panic");
2035    }
2036}