1use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::model::SubscriptionChannel;
7use crate::{
8 callback::MessageHandler,
9 config::WebSocketConfig,
10 connection::WebSocketConnection,
11 error::WebSocketError,
12 message::{
13 notification::NotificationHandler, request::RequestBuilder, response::ResponseHandler,
14 },
15 model::{
16 quote::*,
17 subscription::SubscriptionManager,
18 ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
19 },
20 session::WebSocketSession,
21};
22
23#[derive(Debug)]
25pub struct DeribitWebSocketClient {
26 pub config: Arc<WebSocketConfig>,
28 connection: Arc<Mutex<WebSocketConnection>>,
29 pub session: Arc<WebSocketSession>,
31 request_builder: Arc<Mutex<RequestBuilder>>,
32 #[allow(dead_code)]
33 response_handler: Arc<ResponseHandler>,
34 #[allow(dead_code)]
35 notification_handler: Arc<NotificationHandler>,
36 subscription_manager: Arc<Mutex<SubscriptionManager>>,
37 #[allow(dead_code)]
38 message_sender: Option<mpsc::UnboundedSender<String>>,
39 #[allow(dead_code)]
40 message_receiver: Option<mpsc::UnboundedReceiver<String>>,
41 message_handler: Option<MessageHandler>,
42}
43
44impl DeribitWebSocketClient {
45 pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
47 let connection = Arc::new(Mutex::new(WebSocketConnection::new(config.ws_url.clone())));
48 let session = Arc::new(WebSocketSession::new(config.clone()));
49 let (tx, rx) = mpsc::unbounded_channel();
50
51 let config = Arc::new(config.clone());
52 Ok(Self {
53 config,
54 connection,
55 session,
56 request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
57 response_handler: Arc::new(ResponseHandler::new()),
58 notification_handler: Arc::new(NotificationHandler::new()),
59 subscription_manager: Arc::new(Mutex::new(SubscriptionManager::new())),
60 message_sender: Some(tx),
61 message_receiver: Some(rx),
62 message_handler: None,
63 })
64 }
65
66 pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
68 let config = WebSocketConfig::with_url(&ws_url)
69 .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
70 Self::new(&config)
71 }
72
73 pub fn new_testnet() -> Result<Self, WebSocketError> {
75 Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
76 }
77
78 pub fn new_production() -> Result<Self, WebSocketError> {
80 Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
81 }
82
83 pub async fn connect(&self) -> Result<(), WebSocketError> {
85 let mut connection = self.connection.lock().await;
86 connection.connect().await
87 }
88
89 pub async fn disconnect(&self) -> Result<(), WebSocketError> {
91 let mut connection = self.connection.lock().await;
92 connection.disconnect().await
93 }
94
95 pub async fn is_connected(&self) -> bool {
97 let connection = self.connection.lock().await;
98 connection.is_connected()
99 }
100
101 pub async fn authenticate(
119 &self,
120 client_id: &str,
121 client_secret: &str,
122 ) -> Result<crate::model::AuthResponse, WebSocketError> {
123 let request = {
124 let mut builder = self.request_builder.lock().await;
125 builder.build_auth_request(client_id, client_secret)
126 };
127
128 let response = self.send_request(request).await?;
129
130 match response.result {
131 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
132 WebSocketError::InvalidMessage(format!(
133 "Failed to parse authentication response: {}",
134 e
135 ))
136 }),
137 JsonRpcResult::Error { error } => {
138 Err(WebSocketError::ApiError(error.code, error.message))
139 }
140 }
141 }
142
143 pub async fn subscribe(
145 &self,
146 channels: Vec<String>,
147 ) -> Result<JsonRpcResponse, WebSocketError> {
148 let request = {
149 let mut builder = self.request_builder.lock().await;
150 builder.build_subscribe_request(channels.clone())
151 };
152
153 let mut sub_manager = self.subscription_manager.lock().await;
155 for channel in channels {
156 let channel_type = self.parse_channel_type(&channel);
157 let instrument = self.extract_instrument(&channel);
158 sub_manager.add_subscription(channel, channel_type, instrument);
159 }
160
161 self.send_request(request).await
162 }
163
164 pub async fn unsubscribe(
166 &self,
167 channels: Vec<String>,
168 ) -> Result<JsonRpcResponse, WebSocketError> {
169 let request = {
170 let mut builder = self.request_builder.lock().await;
171 builder.build_unsubscribe_request(channels.clone())
172 };
173
174 let mut sub_manager = self.subscription_manager.lock().await;
176 for channel in channels {
177 sub_manager.remove_subscription(&channel);
178 }
179
180 self.send_request(request).await
181 }
182
183 pub async fn public_unsubscribe_all(&self) -> Result<String, WebSocketError> {
196 let request = {
197 let mut builder = self.request_builder.lock().await;
198 builder.build_public_unsubscribe_all_request()
199 };
200
201 let response = self.send_request(request).await?;
202
203 let mut sub_manager = self.subscription_manager.lock().await;
205 sub_manager.clear();
206
207 match response.result {
208 JsonRpcResult::Success { result } => {
209 result.as_str().map(String::from).ok_or_else(|| {
210 WebSocketError::InvalidMessage(
211 "Expected string result from unsubscribe_all".to_string(),
212 )
213 })
214 }
215 JsonRpcResult::Error { error } => {
216 Err(WebSocketError::ApiError(error.code, error.message))
217 }
218 }
219 }
220
221 pub async fn private_unsubscribe_all(&self) -> Result<String, WebSocketError> {
234 let request = {
235 let mut builder = self.request_builder.lock().await;
236 builder.build_private_unsubscribe_all_request()
237 };
238
239 let response = self.send_request(request).await?;
240
241 let mut sub_manager = self.subscription_manager.lock().await;
243 sub_manager.clear();
244
245 match response.result {
246 JsonRpcResult::Success { result } => {
247 result.as_str().map(String::from).ok_or_else(|| {
248 WebSocketError::InvalidMessage(
249 "Expected string result from unsubscribe_all".to_string(),
250 )
251 })
252 }
253 JsonRpcResult::Error { error } => {
254 Err(WebSocketError::ApiError(error.code, error.message))
255 }
256 }
257 }
258
259 pub async fn send_request(
261 &self,
262 request: JsonRpcRequest,
263 ) -> Result<JsonRpcResponse, WebSocketError> {
264 let message = serde_json::to_string(&request).map_err(|e| {
265 WebSocketError::InvalidMessage(format!("Failed to serialize request: {}", e))
266 })?;
267
268 let mut connection = self.connection.lock().await;
269 connection.send(message).await?;
270
271 let response_text = connection.receive().await?;
273
274 let response: JsonRpcResponse = match serde_json::from_str(&response_text) {
276 Ok(resp) => resp,
277 Err(e) => {
278 if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&response_text)
280 && json_val.get("method").is_some()
281 && json_val.get("id").is_none()
282 {
283 return Ok(JsonRpcResponse {
285 jsonrpc: "2.0".to_string(),
286 id: serde_json::Value::Null,
287 result: crate::model::JsonRpcResult::Success { result: json_val },
288 });
289 }
290 return Err(WebSocketError::InvalidMessage(format!(
291 "Failed to parse response: {}",
292 e
293 )));
294 }
295 };
296
297 Ok(response)
298 }
299
300 pub async fn send_message(&self, message: String) -> Result<(), WebSocketError> {
302 let mut connection = self.connection.lock().await;
303 connection.send(message).await
304 }
305
306 pub async fn receive_message(&self) -> Result<String, WebSocketError> {
308 let mut connection = self.connection.lock().await;
309 connection.receive().await
310 }
311
312 pub async fn get_subscriptions(&self) -> Vec<String> {
314 let sub_manager = self.subscription_manager.lock().await;
315 sub_manager.get_all_channels()
316 }
317
318 pub async fn test_connection(&self) -> Result<crate::model::TestResponse, WebSocketError> {
330 let request = {
331 let mut builder = self.request_builder.lock().await;
332 builder.build_test_request()
333 };
334
335 let response = self.send_request(request).await?;
336
337 match response.result {
338 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
339 WebSocketError::InvalidMessage(format!("Failed to parse test response: {}", e))
340 }),
341 JsonRpcResult::Error { error } => {
342 Err(WebSocketError::ApiError(error.code, error.message))
343 }
344 }
345 }
346
347 pub async fn get_time(&self) -> Result<u64, WebSocketError> {
359 let request = {
360 let mut builder = self.request_builder.lock().await;
361 builder.build_get_time_request()
362 };
363
364 let response = self.send_request(request).await?;
365
366 match response.result {
367 JsonRpcResult::Success { result } => result.as_u64().ok_or_else(|| {
368 WebSocketError::InvalidMessage(
369 "Expected u64 timestamp in get_time response".to_string(),
370 )
371 }),
372 JsonRpcResult::Error { error } => {
373 Err(WebSocketError::ApiError(error.code, error.message))
374 }
375 }
376 }
377
378 pub async fn set_heartbeat(&self, interval: u64) -> Result<String, WebSocketError> {
396 let request = {
397 let mut builder = self.request_builder.lock().await;
398 builder.build_set_heartbeat_request(interval)
399 };
400
401 let response = self.send_request(request).await?;
402
403 match response.result {
404 JsonRpcResult::Success { result } => {
405 result.as_str().map(String::from).ok_or_else(|| {
406 WebSocketError::InvalidMessage(
407 "Expected string result from set_heartbeat".to_string(),
408 )
409 })
410 }
411 JsonRpcResult::Error { error } => {
412 Err(WebSocketError::ApiError(error.code, error.message))
413 }
414 }
415 }
416
417 pub async fn disable_heartbeat(&self) -> Result<String, WebSocketError> {
429 let request = {
430 let mut builder = self.request_builder.lock().await;
431 builder.build_disable_heartbeat_request()
432 };
433
434 let response = self.send_request(request).await?;
435
436 match response.result {
437 JsonRpcResult::Success { result } => {
438 result.as_str().map(String::from).ok_or_else(|| {
439 WebSocketError::InvalidMessage(
440 "Expected string result from disable_heartbeat".to_string(),
441 )
442 })
443 }
444 JsonRpcResult::Error { error } => {
445 Err(WebSocketError::ApiError(error.code, error.message))
446 }
447 }
448 }
449
450 pub async fn hello(
468 &self,
469 client_name: &str,
470 client_version: &str,
471 ) -> Result<crate::model::HelloResponse, WebSocketError> {
472 let request = {
473 let mut builder = self.request_builder.lock().await;
474 builder.build_hello_request(client_name, client_version)
475 };
476
477 let response = self.send_request(request).await?;
478
479 match response.result {
480 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
481 WebSocketError::InvalidMessage(format!("Failed to parse hello response: {}", e))
482 }),
483 JsonRpcResult::Error { error } => {
484 Err(WebSocketError::ApiError(error.code, error.message))
485 }
486 }
487 }
488
489 pub async fn enable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
503 let request = {
504 let mut builder = self.request_builder.lock().await;
505 builder.build_enable_cancel_on_disconnect_request()
506 };
507
508 let response = self.send_request(request).await?;
509
510 match response.result {
511 JsonRpcResult::Success { result } => {
512 result.as_str().map(String::from).ok_or_else(|| {
513 WebSocketError::InvalidMessage(
514 "Expected string result from enable_cancel_on_disconnect".to_string(),
515 )
516 })
517 }
518 JsonRpcResult::Error { error } => {
519 Err(WebSocketError::ApiError(error.code, error.message))
520 }
521 }
522 }
523
524 pub async fn disable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
537 let request = {
538 let mut builder = self.request_builder.lock().await;
539 builder.build_disable_cancel_on_disconnect_request()
540 };
541
542 let response = self.send_request(request).await?;
543
544 match response.result {
545 JsonRpcResult::Success { result } => {
546 result.as_str().map(String::from).ok_or_else(|| {
547 WebSocketError::InvalidMessage(
548 "Expected string result from disable_cancel_on_disconnect".to_string(),
549 )
550 })
551 }
552 JsonRpcResult::Error { error } => {
553 Err(WebSocketError::ApiError(error.code, error.message))
554 }
555 }
556 }
557
558 pub async fn get_cancel_on_disconnect(&self) -> Result<bool, WebSocketError> {
570 let request = {
571 let mut builder = self.request_builder.lock().await;
572 builder.build_get_cancel_on_disconnect_request()
573 };
574
575 let response = self.send_request(request).await?;
576
577 match response.result {
578 JsonRpcResult::Success { result } => {
579 result
581 .get("enabled")
582 .and_then(|v| v.as_bool())
583 .ok_or_else(|| {
584 WebSocketError::InvalidMessage(
585 "Expected 'enabled' boolean in get_cancel_on_disconnect response"
586 .to_string(),
587 )
588 })
589 }
590 JsonRpcResult::Error { error } => {
591 Err(WebSocketError::ApiError(error.code, error.message))
592 }
593 }
594 }
595
596 pub async fn mass_quote(
598 &self,
599 request: MassQuoteRequest,
600 ) -> Result<MassQuoteResult, WebSocketError> {
601 request.validate().map_err(WebSocketError::InvalidMessage)?;
603
604 let json_request = {
605 let mut builder = self.request_builder.lock().await;
606 builder.build_mass_quote_request(request)
607 };
608
609 let response = self.send_request(json_request).await?;
610
611 match response.result {
613 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
614 WebSocketError::InvalidMessage(format!(
615 "Failed to parse mass quote response: {}",
616 e
617 ))
618 }),
619 JsonRpcResult::Error { error } => {
620 Err(WebSocketError::ApiError(error.code, error.message))
621 }
622 }
623 }
624
625 pub async fn cancel_quotes(
627 &self,
628 request: CancelQuotesRequest,
629 ) -> Result<CancelQuotesResponse, WebSocketError> {
630 let json_request = {
631 let mut builder = self.request_builder.lock().await;
632 builder.build_cancel_quotes_request(request)
633 };
634
635 let response = self.send_request(json_request).await?;
636
637 match response.result {
639 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
640 WebSocketError::InvalidMessage(format!(
641 "Failed to parse cancel quotes response: {}",
642 e
643 ))
644 }),
645 JsonRpcResult::Error { error } => {
646 Err(WebSocketError::ApiError(error.code, error.message))
647 }
648 }
649 }
650
651 pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
653 let json_request = {
654 let mut builder = self.request_builder.lock().await;
655 builder.build_set_mmp_config_request(config)
656 };
657
658 let response = self.send_request(json_request).await?;
659
660 match response.result {
661 JsonRpcResult::Success { .. } => Ok(()),
662 JsonRpcResult::Error { error } => {
663 Err(WebSocketError::ApiError(error.code, error.message))
664 }
665 }
666 }
667
668 pub async fn get_mmp_config(
670 &self,
671 mmp_group: Option<String>,
672 ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
673 let json_request = {
674 let mut builder = self.request_builder.lock().await;
675 builder.build_get_mmp_config_request(mmp_group)
676 };
677
678 let response = self.send_request(json_request).await?;
679
680 match response.result {
681 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
682 WebSocketError::InvalidMessage(format!(
683 "Failed to parse MMP config response: {}",
684 e
685 ))
686 }),
687 JsonRpcResult::Error { error } => {
688 Err(WebSocketError::ApiError(error.code, error.message))
689 }
690 }
691 }
692
693 pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
695 let json_request = {
696 let mut builder = self.request_builder.lock().await;
697 builder.build_reset_mmp_request(mmp_group)
698 };
699
700 let response = self.send_request(json_request).await?;
701
702 match response.result {
703 JsonRpcResult::Success { .. } => Ok(()),
704 JsonRpcResult::Error { error } => {
705 Err(WebSocketError::ApiError(error.code, error.message))
706 }
707 }
708 }
709
710 pub async fn get_open_orders(
712 &self,
713 currency: Option<String>,
714 kind: Option<String>,
715 type_filter: Option<String>,
716 ) -> Result<Vec<QuoteInfo>, WebSocketError> {
717 let json_request = {
718 let mut builder = self.request_builder.lock().await;
719 builder.build_get_open_orders_request(currency, kind, type_filter)
720 };
721
722 let response = self.send_request(json_request).await?;
723
724 match response.result {
725 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
726 WebSocketError::InvalidMessage(format!(
727 "Failed to parse open orders response: {}",
728 e
729 ))
730 }),
731 JsonRpcResult::Error { error } => {
732 Err(WebSocketError::ApiError(error.code, error.message))
733 }
734 }
735 }
736
737 pub async fn buy(
747 &self,
748 request: crate::model::trading::OrderRequest,
749 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
750 let json_request = {
751 let mut builder = self.request_builder.lock().await;
752 builder.build_buy_request(&request)
753 };
754
755 let response = self.send_request(json_request).await?;
756
757 match response.result {
758 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
759 WebSocketError::InvalidMessage(format!("Failed to parse buy response: {}", e))
760 }),
761 JsonRpcResult::Error { error } => {
762 Err(WebSocketError::ApiError(error.code, error.message))
763 }
764 }
765 }
766
767 pub async fn sell(
777 &self,
778 request: crate::model::trading::OrderRequest,
779 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
780 let json_request = {
781 let mut builder = self.request_builder.lock().await;
782 builder.build_sell_request(&request)
783 };
784
785 let response = self.send_request(json_request).await?;
786
787 match response.result {
788 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
789 WebSocketError::InvalidMessage(format!("Failed to parse sell response: {}", e))
790 }),
791 JsonRpcResult::Error { error } => {
792 Err(WebSocketError::ApiError(error.code, error.message))
793 }
794 }
795 }
796
797 pub async fn cancel(
807 &self,
808 order_id: &str,
809 ) -> Result<crate::model::trading::OrderInfo, WebSocketError> {
810 let json_request = {
811 let mut builder = self.request_builder.lock().await;
812 builder.build_cancel_request(order_id)
813 };
814
815 let response = self.send_request(json_request).await?;
816
817 match response.result {
818 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
819 WebSocketError::InvalidMessage(format!("Failed to parse cancel response: {}", e))
820 }),
821 JsonRpcResult::Error { error } => {
822 Err(WebSocketError::ApiError(error.code, error.message))
823 }
824 }
825 }
826
827 pub async fn cancel_all(&self) -> Result<u32, WebSocketError> {
833 let json_request = {
834 let mut builder = self.request_builder.lock().await;
835 builder.build_cancel_all_request()
836 };
837
838 let response = self.send_request(json_request).await?;
839
840 match response.result {
841 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
842 WebSocketError::InvalidMessage(format!(
843 "Failed to parse cancel_all response: {}",
844 e
845 ))
846 }),
847 JsonRpcResult::Error { error } => {
848 Err(WebSocketError::ApiError(error.code, error.message))
849 }
850 }
851 }
852
853 pub async fn cancel_all_by_currency(&self, currency: &str) -> Result<u32, WebSocketError> {
863 let json_request = {
864 let mut builder = self.request_builder.lock().await;
865 builder.build_cancel_all_by_currency_request(currency)
866 };
867
868 let response = self.send_request(json_request).await?;
869
870 match response.result {
871 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
872 WebSocketError::InvalidMessage(format!(
873 "Failed to parse cancel_all_by_currency response: {}",
874 e
875 ))
876 }),
877 JsonRpcResult::Error { error } => {
878 Err(WebSocketError::ApiError(error.code, error.message))
879 }
880 }
881 }
882
883 pub async fn cancel_all_by_instrument(
893 &self,
894 instrument_name: &str,
895 ) -> Result<u32, WebSocketError> {
896 let json_request = {
897 let mut builder = self.request_builder.lock().await;
898 builder.build_cancel_all_by_instrument_request(instrument_name)
899 };
900
901 let response = self.send_request(json_request).await?;
902
903 match response.result {
904 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
905 WebSocketError::InvalidMessage(format!(
906 "Failed to parse cancel_all_by_instrument response: {}",
907 e
908 ))
909 }),
910 JsonRpcResult::Error { error } => {
911 Err(WebSocketError::ApiError(error.code, error.message))
912 }
913 }
914 }
915
916 pub async fn edit(
926 &self,
927 request: crate::model::trading::EditOrderRequest,
928 ) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
929 let json_request = {
930 let mut builder = self.request_builder.lock().await;
931 builder.build_edit_request(&request)
932 };
933
934 let response = self.send_request(json_request).await?;
935
936 match response.result {
937 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
938 WebSocketError::InvalidMessage(format!("Failed to parse edit response: {}", e))
939 }),
940 JsonRpcResult::Error { error } => {
941 Err(WebSocketError::ApiError(error.code, error.message))
942 }
943 }
944 }
945
946 pub async fn get_positions(
965 &self,
966 currency: Option<&str>,
967 kind: Option<&str>,
968 ) -> Result<Vec<crate::model::Position>, WebSocketError> {
969 let json_request = {
970 let mut builder = self.request_builder.lock().await;
971 builder.build_get_positions_request(currency, kind)
972 };
973
974 let response = self.send_request(json_request).await?;
975
976 match response.result {
977 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
978 WebSocketError::InvalidMessage(format!("Failed to parse positions response: {}", e))
979 }),
980 JsonRpcResult::Error { error } => {
981 Err(WebSocketError::ApiError(error.code, error.message))
982 }
983 }
984 }
985
986 pub async fn get_account_summary(
1003 &self,
1004 currency: &str,
1005 extended: Option<bool>,
1006 ) -> Result<crate::model::AccountSummary, WebSocketError> {
1007 let json_request = {
1008 let mut builder = self.request_builder.lock().await;
1009 builder.build_get_account_summary_request(currency, extended)
1010 };
1011
1012 let response = self.send_request(json_request).await?;
1013
1014 match response.result {
1015 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1016 WebSocketError::InvalidMessage(format!(
1017 "Failed to parse account summary response: {}",
1018 e
1019 ))
1020 }),
1021 JsonRpcResult::Error { error } => {
1022 Err(WebSocketError::ApiError(error.code, error.message))
1023 }
1024 }
1025 }
1026
1027 pub async fn get_order_state(
1043 &self,
1044 order_id: &str,
1045 ) -> Result<crate::model::OrderInfo, WebSocketError> {
1046 let json_request = {
1047 let mut builder = self.request_builder.lock().await;
1048 builder.build_get_order_state_request(order_id)
1049 };
1050
1051 let response = self.send_request(json_request).await?;
1052
1053 match response.result {
1054 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1055 WebSocketError::InvalidMessage(format!(
1056 "Failed to parse order state response: {}",
1057 e
1058 ))
1059 }),
1060 JsonRpcResult::Error { error } => {
1061 Err(WebSocketError::ApiError(error.code, error.message))
1062 }
1063 }
1064 }
1065
1066 pub async fn get_order_history_by_currency(
1084 &self,
1085 currency: &str,
1086 kind: Option<&str>,
1087 count: Option<u32>,
1088 ) -> Result<Vec<crate::model::OrderInfo>, WebSocketError> {
1089 let json_request = {
1090 let mut builder = self.request_builder.lock().await;
1091 builder.build_get_order_history_by_currency_request(currency, kind, count)
1092 };
1093
1094 let response = self.send_request(json_request).await?;
1095
1096 match response.result {
1097 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1098 WebSocketError::InvalidMessage(format!(
1099 "Failed to parse order history response: {}",
1100 e
1101 ))
1102 }),
1103 JsonRpcResult::Error { error } => {
1104 Err(WebSocketError::ApiError(error.code, error.message))
1105 }
1106 }
1107 }
1108
1109 pub async fn close_position(
1129 &self,
1130 instrument_name: &str,
1131 order_type: &str,
1132 price: Option<f64>,
1133 ) -> Result<crate::model::ClosePositionResponse, WebSocketError> {
1134 let json_request = {
1135 let mut builder = self.request_builder.lock().await;
1136 builder.build_close_position_request(instrument_name, order_type, price)
1137 };
1138
1139 let response = self.send_request(json_request).await?;
1140
1141 match response.result {
1142 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1143 WebSocketError::InvalidMessage(format!(
1144 "Failed to parse close position response: {}",
1145 e
1146 ))
1147 }),
1148 JsonRpcResult::Error { error } => {
1149 Err(WebSocketError::ApiError(error.code, error.message))
1150 }
1151 }
1152 }
1153
1154 pub async fn move_positions(
1173 &self,
1174 currency: &str,
1175 source_uid: u64,
1176 target_uid: u64,
1177 trades: &[crate::model::MovePositionTrade],
1178 ) -> Result<Vec<crate::model::MovePositionResult>, WebSocketError> {
1179 let json_request = {
1180 let mut builder = self.request_builder.lock().await;
1181 builder.build_move_positions_request(currency, source_uid, target_uid, trades)
1182 };
1183
1184 let response = self.send_request(json_request).await?;
1185
1186 match response.result {
1187 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
1188 WebSocketError::InvalidMessage(format!(
1189 "Failed to parse move positions response: {}",
1190 e
1191 ))
1192 }),
1193 JsonRpcResult::Error { error } => {
1194 Err(WebSocketError::ApiError(error.code, error.message))
1195 }
1196 }
1197 }
1198
1199 pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
1203 where
1204 F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
1205 E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
1206 {
1207 self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
1208 }
1209
1210 pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
1212 self.message_handler = Some(handler);
1213 }
1214
1215 pub fn clear_message_handler(&mut self) {
1217 self.message_handler = None;
1218 }
1219
1220 pub fn has_message_handler(&self) -> bool {
1222 self.message_handler.is_some()
1223 }
1224
1225 pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
1231 let message = self.receive_message().await?;
1232
1233 if let Some(handler) = &self.message_handler {
1234 handler.handle_message(&message);
1235 }
1236
1237 Ok(())
1238 }
1239
1240 pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
1244 if self.message_handler.is_none() {
1245 return Err(WebSocketError::InvalidMessage(
1246 "No message handler set. Use set_message_handler() first.".to_string(),
1247 ));
1248 }
1249
1250 loop {
1251 match self.receive_and_process_message().await {
1252 Ok(()) => {
1253 }
1255 Err(WebSocketError::ConnectionClosed) => {
1256 break;
1258 }
1259 Err(e) => {
1260 return Err(e);
1262 }
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268
1269 fn parse_channel_type(&self, channel: &str) -> SubscriptionChannel {
1276 SubscriptionChannel::from_string(channel)
1277 }
1278
1279 fn extract_instrument(&self, channel: &str) -> Option<String> {
1280 let parts: Vec<&str> = channel.split('.').collect();
1281 match parts.as_slice() {
1282 ["ticker", instrument] | ["ticker", instrument, _] => Some(instrument.to_string()),
1283 ["book", instrument, ..] => Some(instrument.to_string()),
1284 ["trades", instrument, ..] => Some(instrument.to_string()),
1285 ["chart", "trades", instrument, _] => Some(instrument.to_string()),
1286 ["user", "changes", instrument, _] => Some(instrument.to_string()),
1287 ["estimated_expiration_price", instrument] => Some(instrument.to_string()),
1288 ["markprice", "options", instrument] => Some(instrument.to_string()),
1289 ["perpetual", instrument, _] => Some(instrument.to_string()),
1290 ["quote", instrument] => Some(instrument.to_string()),
1291 ["incremental_ticker", instrument] => Some(instrument.to_string()),
1292 ["deribit_price_index", index_name]
1293 | ["deribit_price_ranking", index_name]
1294 | ["deribit_price_statistics", index_name]
1295 | ["deribit_volatility_index", index_name] => Some(index_name.to_string()),
1296 ["instrument", "state", _kind, currency] => Some(currency.to_string()),
1297 ["block_rfq", "trades", currency] => Some(currency.to_string()),
1298 ["block_trade_confirmations", currency] => Some(currency.to_string()),
1299 ["user", "mmp_trigger", index_name] => Some(index_name.to_string()),
1300 ["platform_state"]
1301 | ["platform_state", "public_methods_state"]
1302 | ["block_trade_confirmations"]
1303 | ["user", "access_log"]
1304 | ["user", "lock"] => None,
1305 _ => None,
1306 }
1307 }
1308}
1309
1310impl Default for DeribitWebSocketClient {
1311 fn default() -> Self {
1312 let config = WebSocketConfig::default();
1313 Self::new(&config).unwrap()
1314 }
1315}