1use std::sync::Arc;
42use tokio::sync::Mutex;
43
44use crate::model::SubscriptionChannel;
45use crate::{
46 callback::MessageHandler,
47 config::WebSocketConfig,
48 connection::Dispatcher,
49 error::{WebSocketError, envelope::build_raw_error_response},
50 message::request::RequestBuilder,
51 model::{
52 quote::*,
53 subscription::SubscriptionManager,
54 ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
55 },
56 session::WebSocketSession,
57};
58
59#[derive(Debug)]
66pub struct DeribitWebSocketClient {
67 pub config: Arc<WebSocketConfig>,
69 dispatcher: Arc<Mutex<Option<Arc<Dispatcher>>>>,
73 pub session: Arc<WebSocketSession>,
75 request_builder: Arc<Mutex<RequestBuilder>>,
76 subscription_manager: Arc<Mutex<SubscriptionManager>>,
77 message_handler: Option<MessageHandler>,
78}
79
80impl DeribitWebSocketClient {
81 pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
88 let subscription_manager = Arc::new(Mutex::new(SubscriptionManager::new()));
89 let config = Arc::new(config.clone());
93 let session = Arc::new(WebSocketSession::new(
94 Arc::clone(&config),
95 Arc::clone(&subscription_manager),
96 ));
97
98 Ok(Self {
99 config,
100 dispatcher: Arc::new(Mutex::new(None)),
101 session,
102 request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
103 subscription_manager,
104 message_handler: None,
105 })
106 }
107
108 #[must_use]
112 pub fn subscription_manager(&self) -> Arc<Mutex<SubscriptionManager>> {
113 Arc::clone(&self.subscription_manager)
114 }
115
116 pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
118 let config = WebSocketConfig::with_url(&ws_url)
119 .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
120 Self::new(&config)
121 }
122
123 pub fn new_testnet() -> Result<Self, WebSocketError> {
125 Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
126 }
127
128 pub fn new_production() -> Result<Self, WebSocketError> {
130 Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
131 }
132
133 pub async fn connect(&self) -> Result<(), WebSocketError> {
146 let mut guard = self.dispatcher.lock().await;
147 if let Some(prev) = guard.take() {
148 let _ = prev.shutdown().await;
149 }
150 let dispatcher = Dispatcher::connect(
151 self.config.ws_url.clone(),
152 self.config.connection_timeout,
153 self.config.request_timeout,
154 self.config.notification_channel_capacity,
155 self.config.dispatcher_command_capacity,
156 )
157 .await?;
158 *guard = Some(Arc::new(dispatcher));
159 Ok(())
160 }
161
162 pub async fn disconnect(&self) -> Result<(), WebSocketError> {
164 let dispatcher = {
167 let mut guard = self.dispatcher.lock().await;
168 guard.take()
169 };
170 if let Some(dispatcher) = dispatcher {
171 dispatcher.shutdown().await?;
172 }
173 Ok(())
174 }
175
176 pub async fn is_connected(&self) -> bool {
178 self.dispatcher.lock().await.is_some()
179 }
180
181 pub async fn authenticate(
199 &self,
200 client_id: &str,
201 client_secret: &str,
202 ) -> Result<crate::model::AuthResponse, WebSocketError> {
203 let request = {
204 let mut builder = self.request_builder.lock().await;
205 builder.build_auth_request(client_id, client_secret)
206 };
207
208 let request_ctx: &JsonRpcRequest = &request;
209 let response = self.send_request(&request).await?;
210
211 match response.result {
212 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
213 WebSocketError::InvalidMessage(format!(
214 "Failed to parse authentication response: {}",
215 e
216 ))
217 }),
218 JsonRpcResult::Error { error } => {
219 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
220 Err(WebSocketError::api_error_from_parts(
221 request_ctx,
222 error,
223 Some(raw),
224 ))
225 }
226 }
227 }
228
229 pub async fn subscribe(
251 &self,
252 channels: Vec<String>,
253 ) -> Result<JsonRpcResponse, WebSocketError> {
254 let request = {
255 let mut builder = self.request_builder.lock().await;
256 builder.build_subscribe_request(channels)
257 };
258
259 let response = self.send_request(&request).await?;
260
261 if let Some(confirmed) = confirmed_channels(&response, "public/subscribe")? {
264 let mut sub_manager = self.subscription_manager.lock().await;
265 add_confirmed_channels(&mut sub_manager, confirmed);
266 }
267
268 Ok(response)
269 }
270
271 pub async fn unsubscribe(
290 &self,
291 channels: Vec<String>,
292 ) -> Result<JsonRpcResponse, WebSocketError> {
293 let request = {
294 let mut builder = self.request_builder.lock().await;
295 builder.build_unsubscribe_request(channels)
296 };
297
298 let response = self.send_request(&request).await?;
299
300 if let Some(confirmed) = confirmed_channels(&response, "public/unsubscribe")? {
303 let mut sub_manager = self.subscription_manager.lock().await;
304 remove_confirmed_channels(&mut sub_manager, confirmed);
305 }
306
307 Ok(response)
308 }
309
310 pub async fn public_unsubscribe_all(&self) -> Result<String, WebSocketError> {
323 let request = {
324 let mut builder = self.request_builder.lock().await;
325 builder.build_public_unsubscribe_all_request()
326 };
327
328 let request_ctx: &JsonRpcRequest = &request;
329 let response = self.send_request(&request).await?;
330
331 match response.result {
336 JsonRpcResult::Success { result } => {
337 self.subscription_manager.lock().await.clear();
338 result.as_str().map(String::from).ok_or_else(|| {
339 WebSocketError::InvalidMessage(
340 "Expected string result from unsubscribe_all".to_string(),
341 )
342 })
343 }
344 JsonRpcResult::Error { error } => {
345 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
346 Err(WebSocketError::api_error_from_parts(
347 request_ctx,
348 error,
349 Some(raw),
350 ))
351 }
352 }
353 }
354
355 pub async fn private_unsubscribe_all(&self) -> Result<String, WebSocketError> {
368 let request = {
369 let mut builder = self.request_builder.lock().await;
370 builder.build_private_unsubscribe_all_request()
371 };
372
373 let request_ctx: &JsonRpcRequest = &request;
374 let response = self.send_request(&request).await?;
375
376 match response.result {
380 JsonRpcResult::Success { result } => {
381 self.subscription_manager.lock().await.clear();
382 result.as_str().map(String::from).ok_or_else(|| {
383 WebSocketError::InvalidMessage(
384 "Expected string result from unsubscribe_all".to_string(),
385 )
386 })
387 }
388 JsonRpcResult::Error { error } => {
389 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
390 Err(WebSocketError::api_error_from_parts(
391 request_ctx,
392 error,
393 Some(raw),
394 ))
395 }
396 }
397 }
398
399 pub async fn send_request(
412 &self,
413 request: &JsonRpcRequest,
414 ) -> Result<JsonRpcResponse, WebSocketError> {
415 let dispatcher = {
420 let guard = self.dispatcher.lock().await;
421 guard
422 .as_ref()
423 .map(Arc::clone)
424 .ok_or(WebSocketError::ConnectionClosed)?
425 };
426 dispatcher.send_request(request).await
427 }
428
429 pub async fn receive_message(&self) -> Result<String, WebSocketError> {
435 let dispatcher = {
436 let guard = self.dispatcher.lock().await;
437 guard
438 .as_ref()
439 .map(Arc::clone)
440 .ok_or(WebSocketError::ConnectionClosed)?
441 };
442 dispatcher
443 .next_notification()
444 .await
445 .ok_or(WebSocketError::ConnectionClosed)
446 }
447
448 pub async fn get_subscriptions(&self) -> Vec<String> {
450 let sub_manager = self.subscription_manager.lock().await;
451 sub_manager.get_all_channels()
452 }
453
454 pub async fn test_connection(&self) -> Result<crate::model::TestResponse, WebSocketError> {
466 let request = {
467 let mut builder = self.request_builder.lock().await;
468 builder.build_test_request()
469 };
470
471 let request_ctx: &JsonRpcRequest = &request;
472 let response = self.send_request(&request).await?;
473
474 match response.result {
475 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
476 WebSocketError::InvalidMessage(format!("Failed to parse test response: {}", e))
477 }),
478 JsonRpcResult::Error { error } => {
479 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
480 Err(WebSocketError::api_error_from_parts(
481 request_ctx,
482 error,
483 Some(raw),
484 ))
485 }
486 }
487 }
488
489 pub async fn get_time(&self) -> Result<u64, WebSocketError> {
501 let request = {
502 let mut builder = self.request_builder.lock().await;
503 builder.build_get_time_request()
504 };
505
506 let request_ctx: &JsonRpcRequest = &request;
507 let response = self.send_request(&request).await?;
508
509 match response.result {
510 JsonRpcResult::Success { result } => result.as_u64().ok_or_else(|| {
511 WebSocketError::InvalidMessage(
512 "Expected u64 timestamp in get_time response".to_string(),
513 )
514 }),
515 JsonRpcResult::Error { error } => {
516 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
517 Err(WebSocketError::api_error_from_parts(
518 request_ctx,
519 error,
520 Some(raw),
521 ))
522 }
523 }
524 }
525
526 pub async fn set_heartbeat(&self, interval: u64) -> Result<String, WebSocketError> {
544 let request = {
545 let mut builder = self.request_builder.lock().await;
546 builder.build_set_heartbeat_request(interval)
547 };
548
549 let request_ctx: &JsonRpcRequest = &request;
550 let response = self.send_request(&request).await?;
551
552 match response.result {
553 JsonRpcResult::Success { result } => {
554 result.as_str().map(String::from).ok_or_else(|| {
555 WebSocketError::InvalidMessage(
556 "Expected string result from set_heartbeat".to_string(),
557 )
558 })
559 }
560 JsonRpcResult::Error { error } => {
561 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
562 Err(WebSocketError::api_error_from_parts(
563 request_ctx,
564 error,
565 Some(raw),
566 ))
567 }
568 }
569 }
570
571 pub async fn disable_heartbeat(&self) -> Result<String, WebSocketError> {
583 let request = {
584 let mut builder = self.request_builder.lock().await;
585 builder.build_disable_heartbeat_request()
586 };
587
588 let request_ctx: &JsonRpcRequest = &request;
589 let response = self.send_request(&request).await?;
590
591 match response.result {
592 JsonRpcResult::Success { result } => {
593 result.as_str().map(String::from).ok_or_else(|| {
594 WebSocketError::InvalidMessage(
595 "Expected string result from disable_heartbeat".to_string(),
596 )
597 })
598 }
599 JsonRpcResult::Error { error } => {
600 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
601 Err(WebSocketError::api_error_from_parts(
602 request_ctx,
603 error,
604 Some(raw),
605 ))
606 }
607 }
608 }
609
610 pub async fn hello(
628 &self,
629 client_name: &str,
630 client_version: &str,
631 ) -> Result<crate::model::HelloResponse, WebSocketError> {
632 let request = {
633 let mut builder = self.request_builder.lock().await;
634 builder.build_hello_request(client_name, client_version)
635 };
636
637 let request_ctx: &JsonRpcRequest = &request;
638 let response = self.send_request(&request).await?;
639
640 match response.result {
641 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
642 WebSocketError::InvalidMessage(format!("Failed to parse hello response: {}", e))
643 }),
644 JsonRpcResult::Error { error } => {
645 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
646 Err(WebSocketError::api_error_from_parts(
647 request_ctx,
648 error,
649 Some(raw),
650 ))
651 }
652 }
653 }
654
655 pub async fn enable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
669 let request = {
670 let mut builder = self.request_builder.lock().await;
671 builder.build_enable_cancel_on_disconnect_request()
672 };
673
674 let request_ctx: &JsonRpcRequest = &request;
675 let response = self.send_request(&request).await?;
676
677 match response.result {
678 JsonRpcResult::Success { result } => {
679 result.as_str().map(String::from).ok_or_else(|| {
680 WebSocketError::InvalidMessage(
681 "Expected string result from enable_cancel_on_disconnect".to_string(),
682 )
683 })
684 }
685 JsonRpcResult::Error { error } => {
686 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
687 Err(WebSocketError::api_error_from_parts(
688 request_ctx,
689 error,
690 Some(raw),
691 ))
692 }
693 }
694 }
695
696 pub async fn disable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
709 let request = {
710 let mut builder = self.request_builder.lock().await;
711 builder.build_disable_cancel_on_disconnect_request()
712 };
713
714 let request_ctx: &JsonRpcRequest = &request;
715 let response = self.send_request(&request).await?;
716
717 match response.result {
718 JsonRpcResult::Success { result } => {
719 result.as_str().map(String::from).ok_or_else(|| {
720 WebSocketError::InvalidMessage(
721 "Expected string result from disable_cancel_on_disconnect".to_string(),
722 )
723 })
724 }
725 JsonRpcResult::Error { error } => {
726 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
727 Err(WebSocketError::api_error_from_parts(
728 request_ctx,
729 error,
730 Some(raw),
731 ))
732 }
733 }
734 }
735
736 pub async fn get_cancel_on_disconnect(&self) -> Result<bool, WebSocketError> {
748 let request = {
749 let mut builder = self.request_builder.lock().await;
750 builder.build_get_cancel_on_disconnect_request()
751 };
752
753 let request_ctx: &JsonRpcRequest = &request;
754 let response = self.send_request(&request).await?;
755
756 match response.result {
757 JsonRpcResult::Success { result } => {
758 result
760 .get("enabled")
761 .and_then(|v| v.as_bool())
762 .ok_or_else(|| {
763 WebSocketError::InvalidMessage(
764 "Expected 'enabled' boolean in get_cancel_on_disconnect response"
765 .to_string(),
766 )
767 })
768 }
769 JsonRpcResult::Error { error } => {
770 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
771 Err(WebSocketError::api_error_from_parts(
772 request_ctx,
773 error,
774 Some(raw),
775 ))
776 }
777 }
778 }
779
780 pub async fn mass_quote(
782 &self,
783 request: MassQuoteRequest,
784 ) -> Result<MassQuoteResult, WebSocketError> {
785 request.validate().map_err(WebSocketError::InvalidMessage)?;
787
788 let json_request = {
789 let mut builder = self.request_builder.lock().await;
790 builder.build_mass_quote_request(request)?
791 };
792
793 let request_ctx: &JsonRpcRequest = &json_request;
794 let response = self.send_request(&json_request).await?;
795
796 match response.result {
798 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
799 WebSocketError::InvalidMessage(format!(
800 "Failed to parse mass quote response: {}",
801 e
802 ))
803 }),
804 JsonRpcResult::Error { error } => {
805 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
806 Err(WebSocketError::api_error_from_parts(
807 request_ctx,
808 error,
809 Some(raw),
810 ))
811 }
812 }
813 }
814
815 pub async fn cancel_quotes(
817 &self,
818 request: CancelQuotesRequest,
819 ) -> Result<CancelQuotesResponse, WebSocketError> {
820 let json_request = {
821 let mut builder = self.request_builder.lock().await;
822 builder.build_cancel_quotes_request(request)?
823 };
824
825 let request_ctx: &JsonRpcRequest = &json_request;
826 let response = self.send_request(&json_request).await?;
827
828 match response.result {
830 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
831 WebSocketError::InvalidMessage(format!(
832 "Failed to parse cancel quotes response: {}",
833 e
834 ))
835 }),
836 JsonRpcResult::Error { error } => {
837 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
838 Err(WebSocketError::api_error_from_parts(
839 request_ctx,
840 error,
841 Some(raw),
842 ))
843 }
844 }
845 }
846
847 pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
849 let json_request = {
850 let mut builder = self.request_builder.lock().await;
851 builder.build_set_mmp_config_request(config)?
852 };
853
854 let request_ctx: &JsonRpcRequest = &json_request;
855 let response = self.send_request(&json_request).await?;
856
857 match response.result {
858 JsonRpcResult::Success { .. } => Ok(()),
859 JsonRpcResult::Error { error } => {
860 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
861 Err(WebSocketError::api_error_from_parts(
862 request_ctx,
863 error,
864 Some(raw),
865 ))
866 }
867 }
868 }
869
870 pub async fn get_mmp_config(
872 &self,
873 mmp_group: Option<String>,
874 ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
875 let json_request = {
876 let mut builder = self.request_builder.lock().await;
877 builder.build_get_mmp_config_request(mmp_group)
878 };
879
880 let request_ctx: &JsonRpcRequest = &json_request;
881 let response = self.send_request(&json_request).await?;
882
883 match response.result {
884 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
885 WebSocketError::InvalidMessage(format!(
886 "Failed to parse MMP config response: {}",
887 e
888 ))
889 }),
890 JsonRpcResult::Error { error } => {
891 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
892 Err(WebSocketError::api_error_from_parts(
893 request_ctx,
894 error,
895 Some(raw),
896 ))
897 }
898 }
899 }
900
901 pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
903 let json_request = {
904 let mut builder = self.request_builder.lock().await;
905 builder.build_reset_mmp_request(mmp_group)
906 };
907
908 let request_ctx: &JsonRpcRequest = &json_request;
909 let response = self.send_request(&json_request).await?;
910
911 match response.result {
912 JsonRpcResult::Success { .. } => Ok(()),
913 JsonRpcResult::Error { error } => {
914 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
915 Err(WebSocketError::api_error_from_parts(
916 request_ctx,
917 error,
918 Some(raw),
919 ))
920 }
921 }
922 }
923
924 pub async fn get_open_orders(
926 &self,
927 currency: Option<String>,
928 kind: Option<String>,
929 type_filter: Option<String>,
930 ) -> Result<Vec<QuoteInfo>, WebSocketError> {
931 let json_request = {
932 let mut builder = self.request_builder.lock().await;
933 builder.build_get_open_orders_request(currency, kind, type_filter)
934 };
935
936 let request_ctx: &JsonRpcRequest = &json_request;
937 let response = self.send_request(&json_request).await?;
938
939 match response.result {
940 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
941 WebSocketError::InvalidMessage(format!(
942 "Failed to parse open orders response: {}",
943 e
944 ))
945 }),
946 JsonRpcResult::Error { error } => {
947 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
948 Err(WebSocketError::api_error_from_parts(
949 request_ctx,
950 error,
951 Some(raw),
952 ))
953 }
954 }
955 }
956
957 pub async fn buy(
967 &self,
968 request: crate::model::trading::OrderRequest,
969 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
970 let json_request = {
971 let mut builder = self.request_builder.lock().await;
972 builder.build_buy_request(&request)?
973 };
974
975 let request_ctx: &JsonRpcRequest = &json_request;
976 let response = self.send_request(&json_request).await?;
977
978 match response.result {
979 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
980 WebSocketError::InvalidMessage(format!("Failed to parse buy response: {}", e))
981 }),
982 JsonRpcResult::Error { error } => {
983 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
984 Err(WebSocketError::api_error_from_parts(
985 request_ctx,
986 error,
987 Some(raw),
988 ))
989 }
990 }
991 }
992
993 pub async fn sell(
1003 &self,
1004 request: crate::model::trading::OrderRequest,
1005 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
1006 let json_request = {
1007 let mut builder = self.request_builder.lock().await;
1008 builder.build_sell_request(&request)?
1009 };
1010
1011 let request_ctx: &JsonRpcRequest = &json_request;
1012 let response = self.send_request(&json_request).await?;
1013
1014 match response.result {
1015 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1016 WebSocketError::InvalidMessage(format!("Failed to parse sell response: {}", e))
1017 }),
1018 JsonRpcResult::Error { error } => {
1019 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1020 Err(WebSocketError::api_error_from_parts(
1021 request_ctx,
1022 error,
1023 Some(raw),
1024 ))
1025 }
1026 }
1027 }
1028
1029 pub async fn cancel(
1039 &self,
1040 order_id: &str,
1041 ) -> Result<crate::model::trading::OrderInfo, WebSocketError> {
1042 let json_request = {
1043 let mut builder = self.request_builder.lock().await;
1044 builder.build_cancel_request(order_id)
1045 };
1046
1047 let request_ctx: &JsonRpcRequest = &json_request;
1048 let response = self.send_request(&json_request).await?;
1049
1050 match response.result {
1051 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1052 WebSocketError::InvalidMessage(format!("Failed to parse cancel response: {}", e))
1053 }),
1054 JsonRpcResult::Error { error } => {
1055 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1056 Err(WebSocketError::api_error_from_parts(
1057 request_ctx,
1058 error,
1059 Some(raw),
1060 ))
1061 }
1062 }
1063 }
1064
1065 pub async fn cancel_all(&self) -> Result<u32, WebSocketError> {
1071 let json_request = {
1072 let mut builder = self.request_builder.lock().await;
1073 builder.build_cancel_all_request()
1074 };
1075
1076 let request_ctx: &JsonRpcRequest = &json_request;
1077 let response = self.send_request(&json_request).await?;
1078
1079 match response.result {
1080 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1081 WebSocketError::InvalidMessage(format!(
1082 "Failed to parse cancel_all response: {}",
1083 e
1084 ))
1085 }),
1086 JsonRpcResult::Error { error } => {
1087 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1088 Err(WebSocketError::api_error_from_parts(
1089 request_ctx,
1090 error,
1091 Some(raw),
1092 ))
1093 }
1094 }
1095 }
1096
1097 pub async fn cancel_all_by_currency(&self, currency: &str) -> Result<u32, WebSocketError> {
1107 let json_request = {
1108 let mut builder = self.request_builder.lock().await;
1109 builder.build_cancel_all_by_currency_request(currency)
1110 };
1111
1112 let request_ctx: &JsonRpcRequest = &json_request;
1113 let response = self.send_request(&json_request).await?;
1114
1115 match response.result {
1116 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1117 WebSocketError::InvalidMessage(format!(
1118 "Failed to parse cancel_all_by_currency response: {}",
1119 e
1120 ))
1121 }),
1122 JsonRpcResult::Error { error } => {
1123 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1124 Err(WebSocketError::api_error_from_parts(
1125 request_ctx,
1126 error,
1127 Some(raw),
1128 ))
1129 }
1130 }
1131 }
1132
1133 pub async fn cancel_all_by_instrument(
1143 &self,
1144 instrument_name: &str,
1145 ) -> Result<u32, WebSocketError> {
1146 let json_request = {
1147 let mut builder = self.request_builder.lock().await;
1148 builder.build_cancel_all_by_instrument_request(instrument_name)
1149 };
1150
1151 let request_ctx: &JsonRpcRequest = &json_request;
1152 let response = self.send_request(&json_request).await?;
1153
1154 match response.result {
1155 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1156 WebSocketError::InvalidMessage(format!(
1157 "Failed to parse cancel_all_by_instrument response: {}",
1158 e
1159 ))
1160 }),
1161 JsonRpcResult::Error { error } => {
1162 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1163 Err(WebSocketError::api_error_from_parts(
1164 request_ctx,
1165 error,
1166 Some(raw),
1167 ))
1168 }
1169 }
1170 }
1171
1172 pub async fn edit(
1182 &self,
1183 request: crate::model::trading::EditOrderRequest,
1184 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
1185 let json_request = {
1186 let mut builder = self.request_builder.lock().await;
1187 builder.build_edit_request(&request)?
1188 };
1189
1190 let request_ctx: &JsonRpcRequest = &json_request;
1191 let response = self.send_request(&json_request).await?;
1192
1193 match response.result {
1194 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1195 WebSocketError::InvalidMessage(format!("Failed to parse edit response: {}", e))
1196 }),
1197 JsonRpcResult::Error { error } => {
1198 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1199 Err(WebSocketError::api_error_from_parts(
1200 request_ctx,
1201 error,
1202 Some(raw),
1203 ))
1204 }
1205 }
1206 }
1207
1208 pub async fn get_positions(
1227 &self,
1228 currency: Option<&str>,
1229 kind: Option<&str>,
1230 ) -> Result<Vec<crate::model::Position>, WebSocketError> {
1231 let json_request = {
1232 let mut builder = self.request_builder.lock().await;
1233 builder.build_get_positions_request(currency, kind)
1234 };
1235
1236 let request_ctx: &JsonRpcRequest = &json_request;
1237 let response = self.send_request(&json_request).await?;
1238
1239 match response.result {
1240 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1241 WebSocketError::InvalidMessage(format!("Failed to parse positions response: {}", e))
1242 }),
1243 JsonRpcResult::Error { error } => {
1244 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1245 Err(WebSocketError::api_error_from_parts(
1246 request_ctx,
1247 error,
1248 Some(raw),
1249 ))
1250 }
1251 }
1252 }
1253
1254 pub async fn get_account_summary(
1271 &self,
1272 currency: &str,
1273 extended: Option<bool>,
1274 ) -> Result<crate::model::AccountSummary, WebSocketError> {
1275 let json_request = {
1276 let mut builder = self.request_builder.lock().await;
1277 builder.build_get_account_summary_request(currency, extended)
1278 };
1279
1280 let request_ctx: &JsonRpcRequest = &json_request;
1281 let response = self.send_request(&json_request).await?;
1282
1283 match response.result {
1284 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1285 WebSocketError::InvalidMessage(format!(
1286 "Failed to parse account summary response: {}",
1287 e
1288 ))
1289 }),
1290 JsonRpcResult::Error { error } => {
1291 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1292 Err(WebSocketError::api_error_from_parts(
1293 request_ctx,
1294 error,
1295 Some(raw),
1296 ))
1297 }
1298 }
1299 }
1300
1301 pub async fn get_order_state(
1317 &self,
1318 order_id: &str,
1319 ) -> Result<crate::model::OrderInfo, WebSocketError> {
1320 let json_request = {
1321 let mut builder = self.request_builder.lock().await;
1322 builder.build_get_order_state_request(order_id)
1323 };
1324
1325 let request_ctx: &JsonRpcRequest = &json_request;
1326 let response = self.send_request(&json_request).await?;
1327
1328 match response.result {
1329 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1330 WebSocketError::InvalidMessage(format!(
1331 "Failed to parse order state response: {}",
1332 e
1333 ))
1334 }),
1335 JsonRpcResult::Error { error } => {
1336 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1337 Err(WebSocketError::api_error_from_parts(
1338 request_ctx,
1339 error,
1340 Some(raw),
1341 ))
1342 }
1343 }
1344 }
1345
1346 pub async fn get_order_history_by_currency(
1364 &self,
1365 currency: &str,
1366 kind: Option<&str>,
1367 count: Option<u32>,
1368 ) -> Result<Vec<crate::model::OrderInfo>, WebSocketError> {
1369 let json_request = {
1370 let mut builder = self.request_builder.lock().await;
1371 builder.build_get_order_history_by_currency_request(currency, kind, count)
1372 };
1373
1374 let request_ctx: &JsonRpcRequest = &json_request;
1375 let response = self.send_request(&json_request).await?;
1376
1377 match response.result {
1378 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1379 WebSocketError::InvalidMessage(format!(
1380 "Failed to parse order history response: {}",
1381 e
1382 ))
1383 }),
1384 JsonRpcResult::Error { error } => {
1385 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1386 Err(WebSocketError::api_error_from_parts(
1387 request_ctx,
1388 error,
1389 Some(raw),
1390 ))
1391 }
1392 }
1393 }
1394
1395 pub async fn close_position(
1415 &self,
1416 instrument_name: &str,
1417 order_type: &str,
1418 price: Option<f64>,
1419 ) -> Result<crate::model::ClosePositionResponse, WebSocketError> {
1420 let json_request = {
1421 let mut builder = self.request_builder.lock().await;
1422 builder.build_close_position_request(instrument_name, order_type, price)?
1423 };
1424
1425 let request_ctx: &JsonRpcRequest = &json_request;
1426 let response = self.send_request(&json_request).await?;
1427
1428 match response.result {
1429 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1430 WebSocketError::InvalidMessage(format!(
1431 "Failed to parse close position response: {}",
1432 e
1433 ))
1434 }),
1435 JsonRpcResult::Error { error } => {
1436 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1437 Err(WebSocketError::api_error_from_parts(
1438 request_ctx,
1439 error,
1440 Some(raw),
1441 ))
1442 }
1443 }
1444 }
1445
1446 pub async fn move_positions(
1465 &self,
1466 currency: &str,
1467 source_uid: u64,
1468 target_uid: u64,
1469 trades: &[crate::model::MovePositionTrade],
1470 ) -> Result<Vec<crate::model::MovePositionResult>, WebSocketError> {
1471 let json_request = {
1472 let mut builder = self.request_builder.lock().await;
1473 builder.build_move_positions_request(currency, source_uid, target_uid, trades)?
1474 };
1475
1476 let request_ctx: &JsonRpcRequest = &json_request;
1477 let response = self.send_request(&json_request).await?;
1478
1479 match response.result {
1480 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1481 WebSocketError::InvalidMessage(format!(
1482 "Failed to parse move positions response: {}",
1483 e
1484 ))
1485 }),
1486 JsonRpcResult::Error { error } => {
1487 let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
1488 Err(WebSocketError::api_error_from_parts(
1489 request_ctx,
1490 error,
1491 Some(raw),
1492 ))
1493 }
1494 }
1495 }
1496
1497 pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
1501 where
1502 F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
1503 E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
1504 {
1505 self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
1506 }
1507
1508 pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
1510 self.message_handler = Some(handler);
1511 }
1512
1513 pub fn clear_message_handler(&mut self) {
1515 self.message_handler = None;
1516 }
1517
1518 pub fn has_message_handler(&self) -> bool {
1520 self.message_handler.is_some()
1521 }
1522
1523 pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
1529 let message = self.receive_message().await?;
1530
1531 if let Some(handler) = &self.message_handler {
1532 handler.handle_message(&message);
1533 }
1534
1535 Ok(())
1536 }
1537
1538 pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
1542 if self.message_handler.is_none() {
1543 return Err(WebSocketError::InvalidMessage(
1544 "No message handler set. Use set_message_handler() first.".to_string(),
1545 ));
1546 }
1547
1548 loop {
1549 match self.receive_and_process_message().await {
1550 Ok(()) => {
1551 }
1553 Err(WebSocketError::ConnectionClosed) => {
1554 break;
1556 }
1557 Err(e) => {
1558 return Err(e);
1560 }
1561 }
1562 }
1563
1564 Ok(())
1565 }
1566}
1567
1568impl Default for DeribitWebSocketClient {
1569 fn default() -> Self {
1570 let config = WebSocketConfig::default();
1571 #[allow(clippy::unwrap_used)]
1575 Self::new(&config).unwrap()
1576 }
1577}
1578
1579fn add_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
1587 for channel in channels {
1588 let channel_type = SubscriptionChannel::from_string(&channel);
1589 let instrument = instrument_from_channel(&channel);
1590 manager.add_subscription(channel, channel_type, instrument);
1591 }
1592}
1593
1594fn remove_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
1600 for channel in channels {
1601 manager.remove_subscription(&channel);
1602 }
1603}
1604
1605fn confirmed_channels(
1613 response: &JsonRpcResponse,
1614 method: &'static str,
1615) -> Result<Option<Vec<String>>, WebSocketError> {
1616 match &response.result {
1617 JsonRpcResult::Success { result } => serde_json::from_value::<Vec<String>>(result.clone())
1618 .map(Some)
1619 .map_err(|e| {
1620 WebSocketError::InvalidMessage(format!(
1621 "expected array of confirmed channel strings in {} response: {}",
1622 method, e
1623 ))
1624 }),
1625 JsonRpcResult::Error { .. } => Ok(None),
1626 }
1627}
1628
1629fn instrument_from_channel(channel: &str) -> Option<String> {
1636 let parts: Vec<&str> = channel.split('.').collect();
1637 match parts.as_slice() {
1638 ["ticker", instrument] | ["ticker", instrument, _] => Some((*instrument).to_string()),
1639 ["book", instrument, ..] => Some((*instrument).to_string()),
1640 ["trades", instrument, ..] => Some((*instrument).to_string()),
1641 ["chart", "trades", instrument, _] => Some((*instrument).to_string()),
1642 ["user", "changes", instrument, _] => Some((*instrument).to_string()),
1643 ["estimated_expiration_price", instrument] => Some((*instrument).to_string()),
1644 ["markprice", "options", instrument] => Some((*instrument).to_string()),
1645 ["perpetual", instrument, _] => Some((*instrument).to_string()),
1646 ["quote", instrument] => Some((*instrument).to_string()),
1647 ["incremental_ticker", instrument] => Some((*instrument).to_string()),
1648 ["deribit_price_index", index_name]
1649 | ["deribit_price_ranking", index_name]
1650 | ["deribit_price_statistics", index_name]
1651 | ["deribit_volatility_index", index_name] => Some((*index_name).to_string()),
1652 ["instrument", "state", _kind, currency] => Some((*currency).to_string()),
1653 ["block_rfq", "trades", currency] => Some((*currency).to_string()),
1654 ["block_trade_confirmations", currency] => Some((*currency).to_string()),
1655 ["user", "mmp_trigger", index_name] => Some((*index_name).to_string()),
1656 _ => None,
1657 }
1658}
1659
1660#[cfg(test)]
1661#[allow(clippy::unwrap_used, clippy::expect_used)]
1662mod tests {
1663 use super::*;
1673 use crate::model::ws_types::JsonRpcError;
1674 use serde_json::json;
1675
1676 fn success(result: serde_json::Value) -> JsonRpcResponse {
1678 JsonRpcResponse::success(json!(1), result)
1679 }
1680
1681 fn api_error(code: i32, message: &str) -> JsonRpcResponse {
1683 JsonRpcResponse::error(
1684 json!(1),
1685 JsonRpcError {
1686 code,
1687 message: message.to_string(),
1688 data: None,
1689 },
1690 )
1691 }
1692
1693 fn reconcile_subscribe(
1699 manager: &mut SubscriptionManager,
1700 response: &JsonRpcResponse,
1701 ) -> Result<(), WebSocketError> {
1702 if let Some(confirmed) = confirmed_channels(response, "public/subscribe")? {
1703 add_confirmed_channels(manager, confirmed);
1704 }
1705 Ok(())
1706 }
1707
1708 fn reconcile_unsubscribe(
1710 manager: &mut SubscriptionManager,
1711 response: &JsonRpcResponse,
1712 ) -> Result<(), WebSocketError> {
1713 if let Some(confirmed) = confirmed_channels(response, "public/unsubscribe")? {
1714 remove_confirmed_channels(manager, confirmed);
1715 }
1716 Ok(())
1717 }
1718
1719 #[test]
1724 fn test_reconcile_subscribe_adds_only_server_confirmed_channels() {
1725 let mut manager = SubscriptionManager::new();
1729 let response = success(json!(["ticker.BTC-PERPETUAL"]));
1730
1731 reconcile_subscribe(&mut manager, &response)
1732 .expect("well-formed success response reconciles");
1733
1734 let channels = manager.get_all_channels();
1735 assert_eq!(channels, vec!["ticker.BTC-PERPETUAL".to_string()]);
1736 assert!(
1737 manager.get_subscription("ticker.INVALID").is_none(),
1738 "rejected input channel must not leak into local state"
1739 );
1740 }
1741
1742 #[test]
1743 fn test_reconcile_subscribe_happy_path_input_equals_response() {
1744 let mut manager = SubscriptionManager::new();
1747 let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
1748
1749 reconcile_subscribe(&mut manager, &response).expect("happy-path response reconciles");
1750
1751 let mut channels = manager.get_all_channels();
1752 channels.sort();
1753 assert_eq!(
1754 channels,
1755 vec![
1756 "book.ETH-PERPETUAL.raw".to_string(),
1757 "ticker.BTC-PERPETUAL".to_string(),
1758 ]
1759 );
1760 let ticker = manager
1762 .get_subscription("ticker.BTC-PERPETUAL")
1763 .expect("ticker subscription tracked");
1764 assert_eq!(ticker.instrument.as_deref(), Some("BTC-PERPETUAL"));
1765 }
1766
1767 #[test]
1768 fn test_reconcile_subscribe_empty_result_is_noop() {
1769 let mut manager = SubscriptionManager::new();
1772 let response = success(json!([] as [&str; 0]));
1773
1774 reconcile_subscribe(&mut manager, &response).expect("empty confirmation is valid");
1775
1776 assert!(manager.get_all_channels().is_empty());
1777 }
1778
1779 #[test]
1780 fn test_reconcile_subscribe_api_error_is_noop() {
1781 let mut manager = SubscriptionManager::new();
1784 manager.add_subscription(
1785 "ticker.BTC-PERPETUAL".to_string(),
1786 SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1787 Some("BTC-PERPETUAL".to_string()),
1788 );
1789 let before = manager.get_all_channels();
1790 let response = api_error(-32000, "subscription rejected");
1791
1792 assert!(
1794 matches!(confirmed_channels(&response, "public/subscribe"), Ok(None)),
1795 "api-error response must yield Ok(None)"
1796 );
1797 reconcile_subscribe(&mut manager, &response)
1798 .expect("api-error response must not return Err");
1799
1800 assert_eq!(
1801 manager.get_all_channels(),
1802 before,
1803 "api-error response must not mutate the manager"
1804 );
1805 }
1806
1807 #[test]
1808 fn test_reconcile_subscribe_non_array_result_returns_invalid_message() {
1809 let mut manager = SubscriptionManager::new();
1813 let response = success(json!({ "channels": ["ticker.BTC-PERPETUAL"] }));
1814
1815 let err = reconcile_subscribe(&mut manager, &response)
1816 .expect_err("object result must not parse as Vec<String>");
1817 assert!(
1818 matches!(err, WebSocketError::InvalidMessage(_)),
1819 "expected InvalidMessage, got {:?}",
1820 err
1821 );
1822 assert!(
1823 manager.get_all_channels().is_empty(),
1824 "failed reconciliation must not partially mutate the manager"
1825 );
1826 }
1827
1828 #[test]
1833 fn test_reconcile_unsubscribe_removes_only_server_confirmed_channels() {
1834 let mut manager = SubscriptionManager::new();
1838 manager.add_subscription(
1839 "ticker.BTC-PERPETUAL".to_string(),
1840 SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1841 Some("BTC-PERPETUAL".to_string()),
1842 );
1843 manager.add_subscription(
1844 "ticker.ETH-PERPETUAL".to_string(),
1845 SubscriptionChannel::Ticker("ETH-PERPETUAL".to_string()),
1846 Some("ETH-PERPETUAL".to_string()),
1847 );
1848 let response = success(json!(["ticker.BTC-PERPETUAL"]));
1849
1850 reconcile_unsubscribe(&mut manager, &response)
1851 .expect("well-formed unsubscribe response reconciles");
1852
1853 let channels = manager.get_all_channels();
1854 assert_eq!(channels, vec!["ticker.ETH-PERPETUAL".to_string()]);
1855 }
1856
1857 #[test]
1858 fn test_reconcile_unsubscribe_happy_path() {
1859 let mut manager = SubscriptionManager::new();
1862 manager.add_subscription(
1863 "ticker.BTC-PERPETUAL".to_string(),
1864 SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1865 Some("BTC-PERPETUAL".to_string()),
1866 );
1867 manager.add_subscription(
1868 "book.ETH-PERPETUAL.raw".to_string(),
1869 SubscriptionChannel::OrderBook("ETH-PERPETUAL".to_string()),
1870 Some("ETH-PERPETUAL".to_string()),
1871 );
1872 let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
1873
1874 reconcile_unsubscribe(&mut manager, &response).expect("happy-path unsubscribe reconciles");
1875
1876 assert!(manager.get_all_channels().is_empty());
1877 }
1878
1879 #[test]
1880 fn test_reconcile_unsubscribe_api_error_is_noop() {
1881 let mut manager = SubscriptionManager::new();
1882 manager.add_subscription(
1883 "ticker.BTC-PERPETUAL".to_string(),
1884 SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1885 Some("BTC-PERPETUAL".to_string()),
1886 );
1887 let before = manager.get_all_channels();
1888 let response = api_error(-32000, "unsubscribe rejected");
1889
1890 reconcile_unsubscribe(&mut manager, &response)
1891 .expect("api-error response must not return Err");
1892
1893 assert_eq!(
1894 manager.get_all_channels(),
1895 before,
1896 "api-error response must not mutate the manager"
1897 );
1898 }
1899
1900 #[test]
1901 fn test_reconcile_unsubscribe_non_array_result_returns_invalid_message() {
1902 let mut manager = SubscriptionManager::new();
1903 manager.add_subscription(
1904 "ticker.BTC-PERPETUAL".to_string(),
1905 SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
1906 Some("BTC-PERPETUAL".to_string()),
1907 );
1908 let response = success(json!("not an array"));
1909
1910 let err = reconcile_unsubscribe(&mut manager, &response)
1911 .expect_err("string result must not parse as Vec<String>");
1912 assert!(
1913 matches!(err, WebSocketError::InvalidMessage(_)),
1914 "expected InvalidMessage, got {:?}",
1915 err
1916 );
1917 assert_eq!(
1918 manager.get_all_channels(),
1919 vec!["ticker.BTC-PERPETUAL".to_string()],
1920 "failed reconciliation must not partially mutate the manager"
1921 );
1922 }
1923
1924 async fn spawn_mock_server<F, Fut>(
1935 scenario: F,
1936 ) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>)
1937 where
1938 F: FnOnce(
1939 futures_util::stream::SplitSink<
1940 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
1941 tokio_tungstenite::tungstenite::Message,
1942 >,
1943 futures_util::stream::SplitStream<
1944 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
1945 >,
1946 ) -> Fut
1947 + Send
1948 + 'static,
1949 Fut: std::future::Future<Output = ()> + Send,
1950 {
1951 use futures_util::StreamExt;
1952 use tokio::net::TcpListener;
1953 use tokio_tungstenite::accept_async;
1954
1955 let listener = TcpListener::bind("127.0.0.1:0")
1956 .await
1957 .expect("bind localhost ephemeral port");
1958 let addr = listener
1959 .local_addr()
1960 .expect("read local addr of bound listener");
1961 let handle = tokio::spawn(async move {
1962 let (socket, _peer) = match listener.accept().await {
1963 Ok(pair) => pair,
1964 Err(_) => return,
1965 };
1966 let ws = match accept_async(socket).await {
1967 Ok(ws) => ws,
1968 Err(_) => return,
1969 };
1970 let (sink, stream) = ws.split();
1971 scenario(sink, stream).await;
1972 });
1973 (addr, handle)
1974 }
1975
1976 #[tokio::test]
1977 async fn test_subscribe_reconciles_local_state_with_server_subset() {
1978 use futures_util::{SinkExt, StreamExt};
1985 use tokio_tungstenite::tungstenite::Message;
1986
1987 let (addr, server) = spawn_mock_server(|mut sink, mut stream| async move {
1988 if let Some(Ok(Message::Text(t))) = stream.next().await {
1990 let req: serde_json::Value =
1991 serde_json::from_str(&t).expect("server parses request");
1992 let id = req.get("id").cloned().unwrap_or(serde_json::Value::Null);
1993 let resp = json!({
1994 "jsonrpc": "2.0",
1995 "id": id,
1996 "result": ["ticker.BTC-PERPETUAL"],
1997 });
1998 let _ = sink.send(Message::Text(resp.to_string().into())).await;
1999 }
2000 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2002 })
2003 .await;
2004
2005 let config = WebSocketConfig::with_url(&format!("ws://{}/", addr)).expect("valid ws url");
2006 let client = DeribitWebSocketClient::new(&config).expect("client construction");
2007 client.connect().await.expect("client connects to mock");
2008
2009 let response = client
2010 .subscribe(vec![
2011 "ticker.INVALID".to_string(),
2012 "ticker.BTC-PERPETUAL".to_string(),
2013 ])
2014 .await
2015 .expect("subscribe returns the server-confirmed response");
2016
2017 let JsonRpcResult::Success { result } = response.result else {
2019 panic!("expected Success result, got {:?}", response.result);
2020 };
2021 assert_eq!(result, json!(["ticker.BTC-PERPETUAL"]));
2022
2023 let manager = client.subscription_manager();
2026 let channels = manager.lock().await.get_all_channels();
2027 assert_eq!(
2028 channels,
2029 vec!["ticker.BTC-PERPETUAL".to_string()],
2030 "local manager must drop rejected channels from the input"
2031 );
2032
2033 client.disconnect().await.expect("client disconnects");
2034 server.await.expect("server task did not panic");
2035 }
2036}