1#![allow(unused_imports)]
20use anyhow::Context;
21use async_trait::async_trait;
22use derive_builder::Builder;
23use rust_decimal::prelude::*;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{collections::BTreeMap, sync::Arc};
27
28use crate::common::{
29 errors::WebsocketError,
30 models::{ParamBuildError, WebsocketApiResponse},
31 utils::remove_empty_value,
32 websocket::{WebsocketApi, WebsocketMessageSendOptions},
33};
34use crate::spot::websocket_api::models;
35
36#[async_trait]
37pub trait UserDataStreamApi: Send + Sync {
38 async fn session_subscriptions(
39 &self,
40 params: SessionSubscriptionsParams,
41 ) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>;
42 async fn user_data_stream_subscribe(
43 &self,
44 params: UserDataStreamSubscribeParams,
45 ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
46 async fn user_data_stream_subscribe_signature(
47 &self,
48 params: UserDataStreamSubscribeSignatureParams,
49 ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
50 async fn user_data_stream_unsubscribe(
51 &self,
52 params: UserDataStreamUnsubscribeParams,
53 ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>>;
54}
55
56#[derive(Clone)]
57pub struct UserDataStreamApiClient {
58 websocket_api_base: Arc<WebsocketApi>,
59}
60
61impl UserDataStreamApiClient {
62 pub fn new(websocket_api_base: Arc<WebsocketApi>) -> Self {
63 Self { websocket_api_base }
64 }
65}
66
67#[derive(Clone, Debug, Builder, Default)]
72#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
73pub struct SessionSubscriptionsParams {
74 #[builder(setter(into), default)]
78 pub id: Option<String>,
79}
80
81impl SessionSubscriptionsParams {
82 #[must_use]
85 pub fn builder() -> SessionSubscriptionsParamsBuilder {
86 SessionSubscriptionsParamsBuilder::default()
87 }
88}
89#[derive(Clone, Debug, Builder, Default)]
94#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
95pub struct UserDataStreamSubscribeParams {
96 #[builder(setter(into), default)]
100 pub id: Option<String>,
101}
102
103impl UserDataStreamSubscribeParams {
104 #[must_use]
107 pub fn builder() -> UserDataStreamSubscribeParamsBuilder {
108 UserDataStreamSubscribeParamsBuilder::default()
109 }
110}
111#[derive(Clone, Debug, Builder, Default)]
116#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
117pub struct UserDataStreamSubscribeSignatureParams {
118 #[builder(setter(into), default)]
122 pub id: Option<String>,
123 #[builder(setter(into), default)]
127 pub recv_window: Option<rust_decimal::Decimal>,
128}
129
130impl UserDataStreamSubscribeSignatureParams {
131 #[must_use]
134 pub fn builder() -> UserDataStreamSubscribeSignatureParamsBuilder {
135 UserDataStreamSubscribeSignatureParamsBuilder::default()
136 }
137}
138#[derive(Clone, Debug, Builder, Default)]
143#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
144pub struct UserDataStreamUnsubscribeParams {
145 #[builder(setter(into), default)]
149 pub id: Option<String>,
150 #[builder(setter(into), default)]
154 pub subscription_id: Option<i32>,
155}
156
157impl UserDataStreamUnsubscribeParams {
158 #[must_use]
161 pub fn builder() -> UserDataStreamUnsubscribeParamsBuilder {
162 UserDataStreamUnsubscribeParamsBuilder::default()
163 }
164}
165
166#[async_trait]
167impl UserDataStreamApi for UserDataStreamApiClient {
168 async fn session_subscriptions(
169 &self,
170 params: SessionSubscriptionsParams,
171 ) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>
172 {
173 let SessionSubscriptionsParams { id } = params;
174
175 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
176 if let Some(value) = id {
177 payload.insert("id".to_string(), serde_json::json!(value));
178 }
179 let payload = remove_empty_value(payload);
180
181 self.websocket_api_base
182 .send_message::<Vec<models::SessionSubscriptionsResponseResultInner>>(
183 "/session.subscriptions".trim_start_matches('/'),
184 payload,
185 WebsocketMessageSendOptions::new(),
186 )
187 .await
188 .map_err(anyhow::Error::from)?
189 .into_iter()
190 .next()
191 .ok_or(WebsocketError::NoResponse)
192 .map_err(anyhow::Error::from)
193 }
194
195 async fn user_data_stream_subscribe(
196 &self,
197 params: UserDataStreamSubscribeParams,
198 ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
199 {
200 let UserDataStreamSubscribeParams { id } = params;
201
202 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
203 if let Some(value) = id {
204 payload.insert("id".to_string(), serde_json::json!(value));
205 }
206 let payload = remove_empty_value(payload);
207
208 self.websocket_api_base
209 .send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
210 "/userDataStream.subscribe".trim_start_matches('/'),
211 payload,
212 WebsocketMessageSendOptions::new(),
213 )
214 .await
215 .map_err(anyhow::Error::from)?
216 .into_iter()
217 .next()
218 .ok_or(WebsocketError::NoResponse)
219 .map_err(anyhow::Error::from)
220 }
221
222 async fn user_data_stream_subscribe_signature(
223 &self,
224 params: UserDataStreamSubscribeSignatureParams,
225 ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
226 {
227 let UserDataStreamSubscribeSignatureParams { id, recv_window } = params;
228
229 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
230 if let Some(value) = id {
231 payload.insert("id".to_string(), serde_json::json!(value));
232 }
233 if let Some(value) = recv_window {
234 payload.insert("recvWindow".to_string(), serde_json::json!(value));
235 }
236 let payload = remove_empty_value(payload);
237
238 self.websocket_api_base
239 .send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
240 "/userDataStream.subscribe.signature".trim_start_matches('/'),
241 payload,
242 WebsocketMessageSendOptions::new().signed(),
243 )
244 .await
245 .map_err(anyhow::Error::from)?
246 .into_iter()
247 .next()
248 .ok_or(WebsocketError::NoResponse)
249 .map_err(anyhow::Error::from)
250 }
251
252 async fn user_data_stream_unsubscribe(
253 &self,
254 params: UserDataStreamUnsubscribeParams,
255 ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>> {
256 let UserDataStreamUnsubscribeParams {
257 id,
258 subscription_id,
259 } = params;
260
261 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
262 if let Some(value) = id {
263 payload.insert("id".to_string(), serde_json::json!(value));
264 }
265 if let Some(value) = subscription_id {
266 payload.insert("subscriptionId".to_string(), serde_json::json!(value));
267 }
268 let payload = remove_empty_value(payload);
269
270 self.websocket_api_base
271 .send_message::<serde_json::Value>(
272 "/userDataStream.unsubscribe".trim_start_matches('/'),
273 payload,
274 WebsocketMessageSendOptions::new(),
275 )
276 .await
277 .map_err(anyhow::Error::from)?
278 .into_iter()
279 .next()
280 .ok_or(WebsocketError::NoResponse)
281 .map_err(anyhow::Error::from)
282 }
283}
284
285#[cfg(all(test, feature = "spot"))]
286mod tests {
287 use super::*;
288 use crate::TOKIO_SHARED_RT;
289 use crate::common::websocket::{WebsocketApi, WebsocketConnection, WebsocketHandler};
290 use crate::config::ConfigurationWebsocketApi;
291 use crate::errors::WebsocketError;
292 use crate::models::WebsocketApiRateLimit;
293 use serde_json::{Value, json};
294 use tokio::spawn;
295 use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
296 use tokio::time::{Duration, timeout};
297 use tokio_tungstenite::tungstenite::Message;
298
299 async fn setup() -> (
300 Arc<WebsocketApi>,
301 Arc<WebsocketConnection>,
302 UnboundedReceiver<Message>,
303 ) {
304 let conn = WebsocketConnection::new("test-conn");
305 let (tx, rx) = unbounded_channel::<Message>();
306 {
307 let mut conn_state = conn.state.lock().await;
308 conn_state.ws_write_tx = Some(tx);
309 }
310
311 let config = ConfigurationWebsocketApi::builder()
312 .api_key("key")
313 .api_secret("secret")
314 .build()
315 .expect("Failed to build configuration");
316 let ws_api = WebsocketApi::new(config, vec![conn.clone()]);
317 conn.set_handler(ws_api.clone() as Arc<dyn WebsocketHandler>)
318 .await;
319 ws_api.clone().connect().await.unwrap();
320
321 (ws_api, conn, rx)
322 }
323
324 #[test]
325 fn session_subscriptions_success() {
326 TOKIO_SHARED_RT.block_on(async {
327 let (ws_api, conn, mut rx) = setup().await;
328 let client = UserDataStreamApiClient::new(ws_api.clone());
329
330 let handle = spawn(async move {
331 let params = SessionSubscriptionsParams::builder().build().unwrap();
332 client.session_subscriptions(params).await
333 });
334
335 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
336 let Message::Text(text) = sent else { panic!() };
337 let v: Value = serde_json::from_str(&text).unwrap();
338 let id = v["id"].as_str().unwrap();
339 assert_eq!(v["method"], "/session.subscriptions".trim_start_matches('/'));
340
341 let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df5a22-88ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":[{"subscriptionId":1},{"subscriptionId":0}]}"#).unwrap();
342 resp_json["id"] = id.into();
343
344 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
345 let expected_data: Vec<models::SessionSubscriptionsResponseResultInner> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
346 let empty_array = Value::Array(vec![]);
347 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
348 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
349 match raw_rate_limits.as_array() {
350 Some(arr) if arr.is_empty() => None,
351 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
352 None => None,
353 };
354
355 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
356
357 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
358
359
360 let response_rate_limits = response.rate_limits.clone();
361 let response_data = response.data().expect("deserialize data");
362
363 assert_eq!(response_rate_limits, expected_rate_limits);
364 assert_eq!(response_data, expected_data);
365 });
366 }
367
368 #[test]
369 fn session_subscriptions_error_response() {
370 TOKIO_SHARED_RT.block_on(async {
371 let (ws_api, conn, mut rx) = setup().await;
372 let client = UserDataStreamApiClient::new(ws_api.clone());
373
374 let handle = tokio::spawn(async move {
375 let params = SessionSubscriptionsParams::builder().build().unwrap();
376 client.session_subscriptions(params).await
377 });
378
379 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
380 let Message::Text(text) = sent else { panic!() };
381 let v: Value = serde_json::from_str(&text).unwrap();
382 let id = v["id"].as_str().unwrap().to_string();
383
384 let resp_json = json!({
385 "id": id,
386 "status": 400,
387 "error": {
388 "code": -2010,
389 "msg": "Account has insufficient balance for requested action.",
390 },
391 "rateLimits": [
392 {
393 "rateLimitType": "ORDERS",
394 "interval": "SECOND",
395 "intervalNum": 10,
396 "limit": 50,
397 "count": 13
398 },
399 ],
400 });
401 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
402
403 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
404 match join {
405 Ok(Err(e)) => {
406 let msg = e.to_string();
407 assert!(
408 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
409 "Expected error msg to contain server error, got: {msg}"
410 );
411 }
412 Ok(Ok(_)) => panic!("Expected error"),
413 Err(_) => panic!("Task panicked"),
414 }
415 });
416 }
417
418 #[test]
419 fn session_subscriptions_request_timeout() {
420 TOKIO_SHARED_RT.block_on(async {
421 let (ws_api, _conn, mut rx) = setup().await;
422 let client = UserDataStreamApiClient::new(ws_api.clone());
423
424 let handle = spawn(async move {
425 let params = SessionSubscriptionsParams::builder().build().unwrap();
426 client.session_subscriptions(params).await
427 });
428
429 let sent = timeout(Duration::from_secs(1), rx.recv())
430 .await
431 .expect("send should occur")
432 .expect("channel closed");
433 let Message::Text(text) = sent else {
434 panic!("expected Message Text")
435 };
436
437 let _: Value = serde_json::from_str(&text).unwrap();
438
439 let result = handle.await.expect("task completed");
440 match result {
441 Err(e) => {
442 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
443 assert!(matches!(inner, WebsocketError::Timeout));
444 } else {
445 panic!("Unexpected error type: {:?}", e);
446 }
447 }
448 Ok(_) => panic!("Expected timeout error"),
449 }
450 });
451 }
452
453 #[test]
454 fn user_data_stream_subscribe_success() {
455 TOKIO_SHARED_RT.block_on(async {
456 let (ws_api, conn, mut rx) = setup().await;
457 let client = UserDataStreamApiClient::new(ws_api.clone());
458
459 let handle = spawn(async move {
460 let params = UserDataStreamSubscribeParams::builder().build().unwrap();
461 client.user_data_stream_subscribe(params).await
462 });
463
464 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
465 let Message::Text(text) = sent else { panic!() };
466 let v: Value = serde_json::from_str(&text).unwrap();
467 let id = v["id"].as_str().unwrap();
468 assert_eq!(v["method"], "/userDataStream.subscribe".trim_start_matches('/'));
469
470 let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
471 resp_json["id"] = id.into();
472
473 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
474 let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
475 let empty_array = Value::Array(vec![]);
476 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
477 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
478 match raw_rate_limits.as_array() {
479 Some(arr) if arr.is_empty() => None,
480 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
481 None => None,
482 };
483
484 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
485
486 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
487
488
489 let response_rate_limits = response.rate_limits.clone();
490 let response_data = response.data().expect("deserialize data");
491
492 assert_eq!(response_rate_limits, expected_rate_limits);
493 assert_eq!(response_data, expected_data);
494 });
495 }
496
497 #[test]
498 fn user_data_stream_subscribe_error_response() {
499 TOKIO_SHARED_RT.block_on(async {
500 let (ws_api, conn, mut rx) = setup().await;
501 let client = UserDataStreamApiClient::new(ws_api.clone());
502
503 let handle = tokio::spawn(async move {
504 let params = UserDataStreamSubscribeParams::builder().build().unwrap();
505 client.user_data_stream_subscribe(params).await
506 });
507
508 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
509 let Message::Text(text) = sent else { panic!() };
510 let v: Value = serde_json::from_str(&text).unwrap();
511 let id = v["id"].as_str().unwrap().to_string();
512
513 let resp_json = json!({
514 "id": id,
515 "status": 400,
516 "error": {
517 "code": -2010,
518 "msg": "Account has insufficient balance for requested action.",
519 },
520 "rateLimits": [
521 {
522 "rateLimitType": "ORDERS",
523 "interval": "SECOND",
524 "intervalNum": 10,
525 "limit": 50,
526 "count": 13
527 },
528 ],
529 });
530 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
531
532 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
533 match join {
534 Ok(Err(e)) => {
535 let msg = e.to_string();
536 assert!(
537 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
538 "Expected error msg to contain server error, got: {msg}"
539 );
540 }
541 Ok(Ok(_)) => panic!("Expected error"),
542 Err(_) => panic!("Task panicked"),
543 }
544 });
545 }
546
547 #[test]
548 fn user_data_stream_subscribe_request_timeout() {
549 TOKIO_SHARED_RT.block_on(async {
550 let (ws_api, _conn, mut rx) = setup().await;
551 let client = UserDataStreamApiClient::new(ws_api.clone());
552
553 let handle = spawn(async move {
554 let params = UserDataStreamSubscribeParams::builder().build().unwrap();
555 client.user_data_stream_subscribe(params).await
556 });
557
558 let sent = timeout(Duration::from_secs(1), rx.recv())
559 .await
560 .expect("send should occur")
561 .expect("channel closed");
562 let Message::Text(text) = sent else {
563 panic!("expected Message Text")
564 };
565
566 let _: Value = serde_json::from_str(&text).unwrap();
567
568 let result = handle.await.expect("task completed");
569 match result {
570 Err(e) => {
571 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
572 assert!(matches!(inner, WebsocketError::Timeout));
573 } else {
574 panic!("Unexpected error type: {:?}", e);
575 }
576 }
577 Ok(_) => panic!("Expected timeout error"),
578 }
579 });
580 }
581
582 #[test]
583 fn user_data_stream_subscribe_signature_success() {
584 TOKIO_SHARED_RT.block_on(async {
585 let (ws_api, conn, mut rx) = setup().await;
586 let client = UserDataStreamApiClient::new(ws_api.clone());
587
588 let handle = spawn(async move {
589 let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
590 client.user_data_stream_subscribe_signature(params).await
591 });
592
593 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
594 let Message::Text(text) = sent else { panic!() };
595 let v: Value = serde_json::from_str(&text).unwrap();
596 let id = v["id"].as_str().unwrap();
597 assert_eq!(v["method"], "/userDataStream.subscribe.signature".trim_start_matches('/'));
598
599 let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a22-98ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
600 resp_json["id"] = id.into();
601
602 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
603 let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
604 let empty_array = Value::Array(vec![]);
605 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
606 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
607 match raw_rate_limits.as_array() {
608 Some(arr) if arr.is_empty() => None,
609 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
610 None => None,
611 };
612
613 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
614
615 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
616
617
618 let response_rate_limits = response.rate_limits.clone();
619 let response_data = response.data().expect("deserialize data");
620
621 assert_eq!(response_rate_limits, expected_rate_limits);
622 assert_eq!(response_data, expected_data);
623 });
624 }
625
626 #[test]
627 fn user_data_stream_subscribe_signature_error_response() {
628 TOKIO_SHARED_RT.block_on(async {
629 let (ws_api, conn, mut rx) = setup().await;
630 let client = UserDataStreamApiClient::new(ws_api.clone());
631
632 let handle = tokio::spawn(async move {
633 let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
634 client.user_data_stream_subscribe_signature(params).await
635 });
636
637 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
638 let Message::Text(text) = sent else { panic!() };
639 let v: Value = serde_json::from_str(&text).unwrap();
640 let id = v["id"].as_str().unwrap().to_string();
641
642 let resp_json = json!({
643 "id": id,
644 "status": 400,
645 "error": {
646 "code": -2010,
647 "msg": "Account has insufficient balance for requested action.",
648 },
649 "rateLimits": [
650 {
651 "rateLimitType": "ORDERS",
652 "interval": "SECOND",
653 "intervalNum": 10,
654 "limit": 50,
655 "count": 13
656 },
657 ],
658 });
659 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
660
661 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
662 match join {
663 Ok(Err(e)) => {
664 let msg = e.to_string();
665 assert!(
666 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
667 "Expected error msg to contain server error, got: {msg}"
668 );
669 }
670 Ok(Ok(_)) => panic!("Expected error"),
671 Err(_) => panic!("Task panicked"),
672 }
673 });
674 }
675
676 #[test]
677 fn user_data_stream_subscribe_signature_request_timeout() {
678 TOKIO_SHARED_RT.block_on(async {
679 let (ws_api, _conn, mut rx) = setup().await;
680 let client = UserDataStreamApiClient::new(ws_api.clone());
681
682 let handle = spawn(async move {
683 let params = UserDataStreamSubscribeSignatureParams::builder()
684 .build()
685 .unwrap();
686 client.user_data_stream_subscribe_signature(params).await
687 });
688
689 let sent = timeout(Duration::from_secs(1), rx.recv())
690 .await
691 .expect("send should occur")
692 .expect("channel closed");
693 let Message::Text(text) = sent else {
694 panic!("expected Message Text")
695 };
696
697 let _: Value = serde_json::from_str(&text).unwrap();
698
699 let result = handle.await.expect("task completed");
700 match result {
701 Err(e) => {
702 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
703 assert!(matches!(inner, WebsocketError::Timeout));
704 } else {
705 panic!("Unexpected error type: {:?}", e);
706 }
707 }
708 Ok(_) => panic!("Expected timeout error"),
709 }
710 });
711 }
712
713 #[test]
714 fn user_data_stream_unsubscribe_success() {
715 TOKIO_SHARED_RT.block_on(async {
716 let (ws_api, conn, mut rx) = setup().await;
717 let client = UserDataStreamApiClient::new(ws_api.clone());
718
719 let handle = spawn(async move {
720 let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
721 client.user_data_stream_unsubscribe(params).await
722 });
723
724 let sent = timeout(Duration::from_secs(1), rx.recv())
725 .await
726 .expect("send should occur")
727 .expect("channel closed");
728 let Message::Text(text) = sent else { panic!() };
729 let v: Value = serde_json::from_str(&text).unwrap();
730 let id = v["id"].as_str().unwrap();
731 assert_eq!(
732 v["method"],
733 "/userDataStream.unsubscribe".trim_start_matches('/')
734 );
735
736 let mut resp_json: Value = serde_json::from_str(
737 r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{}}"#,
738 )
739 .unwrap();
740 resp_json["id"] = id.into();
741
742 let raw_data = resp_json
743 .get("result")
744 .or_else(|| resp_json.get("response"))
745 .expect("no response in JSON");
746 let expected_data: serde_json::Value =
747 serde_json::from_value(raw_data.clone()).expect("should parse raw response");
748 let empty_array = Value::Array(vec![]);
749 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
750 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
751 match raw_rate_limits.as_array() {
752 Some(arr) if arr.is_empty() => None,
753 Some(_) => Some(
754 serde_json::from_value(raw_rate_limits.clone())
755 .expect("should parse rateLimits array"),
756 ),
757 None => None,
758 };
759
760 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
761
762 let response = timeout(Duration::from_secs(1), handle)
763 .await
764 .expect("task done")
765 .expect("no panic")
766 .expect("no error");
767
768 let response_rate_limits = response.rate_limits.clone();
769 let response_data = response.data().expect("deserialize data");
770
771 assert_eq!(response_rate_limits, expected_rate_limits);
772 assert_eq!(response_data, expected_data);
773 });
774 }
775
776 #[test]
777 fn user_data_stream_unsubscribe_error_response() {
778 TOKIO_SHARED_RT.block_on(async {
779 let (ws_api, conn, mut rx) = setup().await;
780 let client = UserDataStreamApiClient::new(ws_api.clone());
781
782 let handle = tokio::spawn(async move {
783 let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
784 client.user_data_stream_unsubscribe(params).await
785 });
786
787 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
788 let Message::Text(text) = sent else { panic!() };
789 let v: Value = serde_json::from_str(&text).unwrap();
790 let id = v["id"].as_str().unwrap().to_string();
791
792 let resp_json = json!({
793 "id": id,
794 "status": 400,
795 "error": {
796 "code": -2010,
797 "msg": "Account has insufficient balance for requested action.",
798 },
799 "rateLimits": [
800 {
801 "rateLimitType": "ORDERS",
802 "interval": "SECOND",
803 "intervalNum": 10,
804 "limit": 50,
805 "count": 13
806 },
807 ],
808 });
809 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
810
811 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
812 match join {
813 Ok(Err(e)) => {
814 let msg = e.to_string();
815 assert!(
816 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
817 "Expected error msg to contain server error, got: {msg}"
818 );
819 }
820 Ok(Ok(_)) => panic!("Expected error"),
821 Err(_) => panic!("Task panicked"),
822 }
823 });
824 }
825
826 #[test]
827 fn user_data_stream_unsubscribe_request_timeout() {
828 TOKIO_SHARED_RT.block_on(async {
829 let (ws_api, _conn, mut rx) = setup().await;
830 let client = UserDataStreamApiClient::new(ws_api.clone());
831
832 let handle = spawn(async move {
833 let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
834 client.user_data_stream_unsubscribe(params).await
835 });
836
837 let sent = timeout(Duration::from_secs(1), rx.recv())
838 .await
839 .expect("send should occur")
840 .expect("channel closed");
841 let Message::Text(text) = sent else {
842 panic!("expected Message Text")
843 };
844
845 let _: Value = serde_json::from_str(&text).unwrap();
846
847 let result = handle.await.expect("task completed");
848 match result {
849 Err(e) => {
850 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
851 assert!(matches!(inner, WebsocketError::Timeout));
852 } else {
853 panic!("Unexpected error type: {:?}", e);
854 }
855 }
856 Ok(_) => panic!("Expected timeout error"),
857 }
858 });
859 }
860}