1use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5
6use crate::model::SubscriptionChannel;
7use crate::{
8 callback::MessageHandler,
9 config::WebSocketConfig,
10 connection::WebSocketConnection,
11 error::WebSocketError,
12 message::{
13 notification::NotificationHandler, request::RequestBuilder, response::ResponseHandler,
14 },
15 model::{
16 quote::*,
17 subscription::SubscriptionManager,
18 ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
19 },
20 session::WebSocketSession,
21};
22
23#[derive(Debug)]
25pub struct DeribitWebSocketClient {
26 pub config: Arc<WebSocketConfig>,
28 connection: Arc<Mutex<WebSocketConnection>>,
29 pub session: Arc<WebSocketSession>,
31 request_builder: Arc<Mutex<RequestBuilder>>,
32 #[allow(dead_code)]
33 response_handler: Arc<ResponseHandler>,
34 #[allow(dead_code)]
35 notification_handler: Arc<NotificationHandler>,
36 subscription_manager: Arc<Mutex<SubscriptionManager>>,
37 #[allow(dead_code)]
38 message_sender: Option<mpsc::UnboundedSender<String>>,
39 #[allow(dead_code)]
40 message_receiver: Option<mpsc::UnboundedReceiver<String>>,
41 message_handler: Option<MessageHandler>,
42}
43
44impl DeribitWebSocketClient {
45 pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
47 let connection = Arc::new(Mutex::new(WebSocketConnection::new(config.ws_url.clone())));
48 let session = Arc::new(WebSocketSession::new(config.clone()));
49 let (tx, rx) = mpsc::unbounded_channel();
50
51 let config = Arc::new(config.clone());
52 Ok(Self {
53 config,
54 connection,
55 session,
56 request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
57 response_handler: Arc::new(ResponseHandler::new()),
58 notification_handler: Arc::new(NotificationHandler::new()),
59 subscription_manager: Arc::new(Mutex::new(SubscriptionManager::new())),
60 message_sender: Some(tx),
61 message_receiver: Some(rx),
62 message_handler: None,
63 })
64 }
65
66 pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
68 let config = WebSocketConfig::with_url(&ws_url)
69 .map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
70 Self::new(&config)
71 }
72
73 pub fn new_testnet() -> Result<Self, WebSocketError> {
75 Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
76 }
77
78 pub fn new_production() -> Result<Self, WebSocketError> {
80 Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
81 }
82
83 pub async fn connect(&self) -> Result<(), WebSocketError> {
85 let mut connection = self.connection.lock().await;
86 connection.connect().await
87 }
88
89 pub async fn disconnect(&self) -> Result<(), WebSocketError> {
91 let mut connection = self.connection.lock().await;
92 connection.disconnect().await
93 }
94
95 pub async fn is_connected(&self) -> bool {
97 let connection = self.connection.lock().await;
98 connection.is_connected()
99 }
100
101 pub async fn authenticate(
103 &self,
104 client_id: &str,
105 client_secret: &str,
106 ) -> Result<JsonRpcResponse, WebSocketError> {
107 let request = {
108 let mut builder = self.request_builder.lock().await;
109 builder.build_auth_request(client_id, client_secret)
110 };
111
112 self.send_request(request).await
113 }
114
115 pub async fn subscribe(
117 &self,
118 channels: Vec<String>,
119 ) -> Result<JsonRpcResponse, WebSocketError> {
120 let request = {
121 let mut builder = self.request_builder.lock().await;
122 builder.build_subscribe_request(channels.clone())
123 };
124
125 let mut sub_manager = self.subscription_manager.lock().await;
127 for channel in channels {
128 let channel_type = self.parse_channel_type(&channel);
129 let instrument = self.extract_instrument(&channel);
130 sub_manager.add_subscription(channel, channel_type, instrument);
131 }
132
133 self.send_request(request).await
134 }
135
136 pub async fn unsubscribe(
138 &self,
139 channels: Vec<String>,
140 ) -> Result<JsonRpcResponse, WebSocketError> {
141 let request = {
142 let mut builder = self.request_builder.lock().await;
143 builder.build_unsubscribe_request(channels.clone())
144 };
145
146 let mut sub_manager = self.subscription_manager.lock().await;
148 for channel in channels {
149 sub_manager.remove_subscription(&channel);
150 }
151
152 self.send_request(request).await
153 }
154
155 pub async fn send_request(
157 &self,
158 request: JsonRpcRequest,
159 ) -> Result<JsonRpcResponse, WebSocketError> {
160 let message = serde_json::to_string(&request).map_err(|e| {
161 WebSocketError::InvalidMessage(format!("Failed to serialize request: {}", e))
162 })?;
163
164 let mut connection = self.connection.lock().await;
165 connection.send(message).await?;
166
167 let response_text = connection.receive().await?;
169
170 let response: JsonRpcResponse = match serde_json::from_str(&response_text) {
172 Ok(resp) => resp,
173 Err(e) => {
174 if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&response_text)
176 && json_val.get("method").is_some()
177 && json_val.get("id").is_none()
178 {
179 return Ok(JsonRpcResponse {
181 jsonrpc: "2.0".to_string(),
182 id: serde_json::Value::Null,
183 result: crate::model::JsonRpcResult::Success { result: json_val },
184 });
185 }
186 return Err(WebSocketError::InvalidMessage(format!(
187 "Failed to parse response: {}",
188 e
189 )));
190 }
191 };
192
193 Ok(response)
194 }
195
196 pub async fn send_message(&self, message: String) -> Result<(), WebSocketError> {
198 let mut connection = self.connection.lock().await;
199 connection.send(message).await
200 }
201
202 pub async fn receive_message(&self) -> Result<String, WebSocketError> {
204 let mut connection = self.connection.lock().await;
205 connection.receive().await
206 }
207
208 pub async fn get_subscriptions(&self) -> Vec<String> {
210 let sub_manager = self.subscription_manager.lock().await;
211 sub_manager.get_all_channels()
212 }
213
214 pub async fn test_connection(&self) -> Result<JsonRpcResponse, WebSocketError> {
216 let request = {
217 let mut builder = self.request_builder.lock().await;
218 builder.build_test_request()
219 };
220
221 self.send_request(request).await
222 }
223
224 pub async fn get_time(&self) -> Result<JsonRpcResponse, WebSocketError> {
226 let request = {
227 let mut builder = self.request_builder.lock().await;
228 builder.build_get_time_request()
229 };
230
231 self.send_request(request).await
232 }
233
234 pub async fn mass_quote(
236 &self,
237 request: MassQuoteRequest,
238 ) -> Result<MassQuoteResult, WebSocketError> {
239 request.validate().map_err(WebSocketError::InvalidMessage)?;
241
242 let json_request = {
243 let mut builder = self.request_builder.lock().await;
244 builder.build_mass_quote_request(request)
245 };
246
247 let response = self.send_request(json_request).await?;
248
249 match response.result {
251 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
252 WebSocketError::InvalidMessage(format!(
253 "Failed to parse mass quote response: {}",
254 e
255 ))
256 }),
257 JsonRpcResult::Error { error } => {
258 Err(WebSocketError::ApiError(error.code, error.message))
259 }
260 }
261 }
262
263 pub async fn cancel_quotes(
265 &self,
266 request: CancelQuotesRequest,
267 ) -> Result<CancelQuotesResponse, WebSocketError> {
268 let json_request = {
269 let mut builder = self.request_builder.lock().await;
270 builder.build_cancel_quotes_request(request)
271 };
272
273 let response = self.send_request(json_request).await?;
274
275 match response.result {
277 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
278 WebSocketError::InvalidMessage(format!(
279 "Failed to parse cancel quotes response: {}",
280 e
281 ))
282 }),
283 JsonRpcResult::Error { error } => {
284 Err(WebSocketError::ApiError(error.code, error.message))
285 }
286 }
287 }
288
289 pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
291 let json_request = {
292 let mut builder = self.request_builder.lock().await;
293 builder.build_set_mmp_config_request(config)
294 };
295
296 let response = self.send_request(json_request).await?;
297
298 match response.result {
299 JsonRpcResult::Success { .. } => Ok(()),
300 JsonRpcResult::Error { error } => {
301 Err(WebSocketError::ApiError(error.code, error.message))
302 }
303 }
304 }
305
306 pub async fn get_mmp_config(
308 &self,
309 mmp_group: Option<String>,
310 ) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
311 let json_request = {
312 let mut builder = self.request_builder.lock().await;
313 builder.build_get_mmp_config_request(mmp_group)
314 };
315
316 let response = self.send_request(json_request).await?;
317
318 match response.result {
319 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
320 WebSocketError::InvalidMessage(format!(
321 "Failed to parse MMP config response: {}",
322 e
323 ))
324 }),
325 JsonRpcResult::Error { error } => {
326 Err(WebSocketError::ApiError(error.code, error.message))
327 }
328 }
329 }
330
331 pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
333 let json_request = {
334 let mut builder = self.request_builder.lock().await;
335 builder.build_reset_mmp_request(mmp_group)
336 };
337
338 let response = self.send_request(json_request).await?;
339
340 match response.result {
341 JsonRpcResult::Success { .. } => Ok(()),
342 JsonRpcResult::Error { error } => {
343 Err(WebSocketError::ApiError(error.code, error.message))
344 }
345 }
346 }
347
348 pub async fn get_open_orders(
350 &self,
351 currency: Option<String>,
352 kind: Option<String>,
353 type_filter: Option<String>,
354 ) -> Result<Vec<QuoteInfo>, WebSocketError> {
355 let json_request = {
356 let mut builder = self.request_builder.lock().await;
357 builder.build_get_open_orders_request(currency, kind, type_filter)
358 };
359
360 let response = self.send_request(json_request).await?;
361
362 match response.result {
363 JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
364 WebSocketError::InvalidMessage(format!(
365 "Failed to parse open orders response: {}",
366 e
367 ))
368 }),
369 JsonRpcResult::Error { error } => {
370 Err(WebSocketError::ApiError(error.code, error.message))
371 }
372 }
373 }
374
375 pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
379 where
380 F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
381 E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
382 {
383 self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
384 }
385
386 pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
388 self.message_handler = Some(handler);
389 }
390
391 pub fn clear_message_handler(&mut self) {
393 self.message_handler = None;
394 }
395
396 pub fn has_message_handler(&self) -> bool {
398 self.message_handler.is_some()
399 }
400
401 pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
407 let message = self.receive_message().await?;
408
409 if let Some(handler) = &self.message_handler {
410 handler.handle_message(&message);
411 }
412
413 Ok(())
414 }
415
416 pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
420 if self.message_handler.is_none() {
421 return Err(WebSocketError::InvalidMessage(
422 "No message handler set. Use set_message_handler() first.".to_string(),
423 ));
424 }
425
426 loop {
427 match self.receive_and_process_message().await {
428 Ok(()) => {
429 }
431 Err(WebSocketError::ConnectionClosed) => {
432 break;
434 }
435 Err(e) => {
436 return Err(e);
438 }
439 }
440 }
441
442 Ok(())
443 }
444
445 fn parse_channel_type(&self, channel: &str) -> SubscriptionChannel {
448 if channel.starts_with("ticker") {
449 SubscriptionChannel::Ticker(self.extract_instrument(channel).unwrap_or_default())
450 } else if channel.starts_with("book") {
451 SubscriptionChannel::OrderBook(self.extract_instrument(channel).unwrap_or_default())
452 } else if channel.starts_with("trades") {
453 SubscriptionChannel::Trades(self.extract_instrument(channel).unwrap_or_default())
454 } else if channel == "user.orders" {
455 SubscriptionChannel::UserOrders
456 } else if channel == "user.trades" {
457 SubscriptionChannel::UserTrades
458 } else {
459 SubscriptionChannel::Ticker(String::new()) }
461 }
462
463 fn extract_instrument(&self, channel: &str) -> Option<String> {
464 channel
465 .find('.')
466 .map(|dot_pos| channel[dot_pos + 1..].to_string())
467 }
468}
469
470impl Default for DeribitWebSocketClient {
471 fn default() -> Self {
472 let config = WebSocketConfig::default();
473 Self::new(&config).unwrap()
474 }
475}