Skip to main content

binance_sdk/spot/websocket_api/apis/
user_data_stream_api.rs

1/*
2 * Binance Spot WebSocket API
3 *
4 * OpenAPI Specifications for the Binance Spot WebSocket API
5 *
6 * API documents:
7 * - [Github web-socket-api documentation file](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-api.md)
8 * - [General API information for web-socket-api on website](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-api/general-api-information)
9 *
10 *
11 * The version of the OpenAPI document: 1.0.0
12 *
13 *
14 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
15 * https://openapi-generator.tech
16 * Do not edit the class manually.
17 */
18
19#![allow(unused_imports)]
20use anyhow::Context;
21use async_trait::async_trait;
22use derive_builder::Builder;
23use rust_decimal::prelude::*;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{collections::BTreeMap, sync::Arc};
27
28use crate::common::{
29    errors::WebsocketError,
30    models::{ParamBuildError, WebsocketApiResponse},
31    utils::remove_empty_value,
32    websocket::{WebsocketApi, WebsocketMessageSendOptions},
33};
34use crate::spot::websocket_api::models;
35
36#[async_trait]
37pub trait UserDataStreamApi: Send + Sync {
38    async fn session_subscriptions(
39        &self,
40        params: SessionSubscriptionsParams,
41    ) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>;
42    async fn user_data_stream_subscribe(
43        &self,
44        params: UserDataStreamSubscribeParams,
45    ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
46    async fn user_data_stream_subscribe_signature(
47        &self,
48        params: UserDataStreamSubscribeSignatureParams,
49    ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
50    async fn user_data_stream_unsubscribe(
51        &self,
52        params: UserDataStreamUnsubscribeParams,
53    ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>>;
54}
55
56#[derive(Clone)]
57pub struct UserDataStreamApiClient {
58    websocket_api_base: Arc<WebsocketApi>,
59}
60
61impl UserDataStreamApiClient {
62    pub fn new(websocket_api_base: Arc<WebsocketApi>) -> Self {
63        Self { websocket_api_base }
64    }
65}
66
67/// Request parameters for the [`session_subscriptions`] operation.
68///
69/// This struct holds all of the inputs you can pass when calling
70/// [`session_subscriptions`](#method.session_subscriptions).
71#[derive(Clone, Debug, Builder, Default)]
72#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
73pub struct SessionSubscriptionsParams {
74    /// Unique WebSocket request ID.
75    ///
76    /// This field is **optional.
77    #[builder(setter(into), default)]
78    pub id: Option<String>,
79}
80
81impl SessionSubscriptionsParams {
82    /// Create a builder for [`session_subscriptions`].
83    ///
84    #[must_use]
85    pub fn builder() -> SessionSubscriptionsParamsBuilder {
86        SessionSubscriptionsParamsBuilder::default()
87    }
88}
89/// Request parameters for the [`user_data_stream_subscribe`] operation.
90///
91/// This struct holds all of the inputs you can pass when calling
92/// [`user_data_stream_subscribe`](#method.user_data_stream_subscribe).
93#[derive(Clone, Debug, Builder, Default)]
94#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
95pub struct UserDataStreamSubscribeParams {
96    /// Unique WebSocket request ID.
97    ///
98    /// This field is **optional.
99    #[builder(setter(into), default)]
100    pub id: Option<String>,
101}
102
103impl UserDataStreamSubscribeParams {
104    /// Create a builder for [`user_data_stream_subscribe`].
105    ///
106    #[must_use]
107    pub fn builder() -> UserDataStreamSubscribeParamsBuilder {
108        UserDataStreamSubscribeParamsBuilder::default()
109    }
110}
111/// Request parameters for the [`user_data_stream_subscribe_signature`] operation.
112///
113/// This struct holds all of the inputs you can pass when calling
114/// [`user_data_stream_subscribe_signature`](#method.user_data_stream_subscribe_signature).
115#[derive(Clone, Debug, Builder, Default)]
116#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
117pub struct UserDataStreamSubscribeSignatureParams {
118    /// Unique WebSocket request ID.
119    ///
120    /// This field is **optional.
121    #[builder(setter(into), default)]
122    pub id: Option<String>,
123    /// The value cannot be greater than `60000`. <br> Supports up to three decimal places of precision (e.g., 6000.346) so that microseconds may be specified.
124    ///
125    /// This field is **optional.
126    #[builder(setter(into), default)]
127    pub recv_window: Option<rust_decimal::Decimal>,
128}
129
130impl UserDataStreamSubscribeSignatureParams {
131    /// Create a builder for [`user_data_stream_subscribe_signature`].
132    ///
133    #[must_use]
134    pub fn builder() -> UserDataStreamSubscribeSignatureParamsBuilder {
135        UserDataStreamSubscribeSignatureParamsBuilder::default()
136    }
137}
138/// Request parameters for the [`user_data_stream_unsubscribe`] operation.
139///
140/// This struct holds all of the inputs you can pass when calling
141/// [`user_data_stream_unsubscribe`](#method.user_data_stream_unsubscribe).
142#[derive(Clone, Debug, Builder, Default)]
143#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
144pub struct UserDataStreamUnsubscribeParams {
145    /// Unique WebSocket request ID.
146    ///
147    /// This field is **optional.
148    #[builder(setter(into), default)]
149    pub id: Option<String>,
150    /// When called with no parameter, this will close all subscriptions. <br>When called with the `subscriptionId` parameter, this will attempt to close the subscription with that subscription id, if it exists.
151    ///
152    /// This field is **optional.
153    #[builder(setter(into), default)]
154    pub subscription_id: Option<i32>,
155}
156
157impl UserDataStreamUnsubscribeParams {
158    /// Create a builder for [`user_data_stream_unsubscribe`].
159    ///
160    #[must_use]
161    pub fn builder() -> UserDataStreamUnsubscribeParamsBuilder {
162        UserDataStreamUnsubscribeParamsBuilder::default()
163    }
164}
165
166#[async_trait]
167impl UserDataStreamApi for UserDataStreamApiClient {
168    async fn session_subscriptions(
169        &self,
170        params: SessionSubscriptionsParams,
171    ) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>
172    {
173        let SessionSubscriptionsParams { id } = params;
174
175        let mut payload: BTreeMap<String, Value> = BTreeMap::new();
176        if let Some(value) = id {
177            payload.insert("id".to_string(), serde_json::json!(value));
178        }
179        let payload = remove_empty_value(payload);
180
181        self.websocket_api_base
182            .send_message::<Vec<models::SessionSubscriptionsResponseResultInner>>(
183                "/session.subscriptions".trim_start_matches('/'),
184                payload,
185                WebsocketMessageSendOptions::new(),
186            )
187            .await
188            .map_err(anyhow::Error::from)?
189            .into_iter()
190            .next()
191            .ok_or(WebsocketError::NoResponse)
192            .map_err(anyhow::Error::from)
193    }
194
195    async fn user_data_stream_subscribe(
196        &self,
197        params: UserDataStreamSubscribeParams,
198    ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
199    {
200        let UserDataStreamSubscribeParams { id } = params;
201
202        let mut payload: BTreeMap<String, Value> = BTreeMap::new();
203        if let Some(value) = id {
204            payload.insert("id".to_string(), serde_json::json!(value));
205        }
206        let payload = remove_empty_value(payload);
207
208        self.websocket_api_base
209            .send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
210                "/userDataStream.subscribe".trim_start_matches('/'),
211                payload,
212                WebsocketMessageSendOptions::new(),
213            )
214            .await
215            .map_err(anyhow::Error::from)?
216            .into_iter()
217            .next()
218            .ok_or(WebsocketError::NoResponse)
219            .map_err(anyhow::Error::from)
220    }
221
222    async fn user_data_stream_subscribe_signature(
223        &self,
224        params: UserDataStreamSubscribeSignatureParams,
225    ) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
226    {
227        let UserDataStreamSubscribeSignatureParams { id, recv_window } = params;
228
229        let mut payload: BTreeMap<String, Value> = BTreeMap::new();
230        if let Some(value) = id {
231            payload.insert("id".to_string(), serde_json::json!(value));
232        }
233        if let Some(value) = recv_window {
234            payload.insert("recvWindow".to_string(), serde_json::json!(value));
235        }
236        let payload = remove_empty_value(payload);
237
238        self.websocket_api_base
239            .send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
240                "/userDataStream.subscribe.signature".trim_start_matches('/'),
241                payload,
242                WebsocketMessageSendOptions::new().signed(),
243            )
244            .await
245            .map_err(anyhow::Error::from)?
246            .into_iter()
247            .next()
248            .ok_or(WebsocketError::NoResponse)
249            .map_err(anyhow::Error::from)
250    }
251
252    async fn user_data_stream_unsubscribe(
253        &self,
254        params: UserDataStreamUnsubscribeParams,
255    ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>> {
256        let UserDataStreamUnsubscribeParams {
257            id,
258            subscription_id,
259        } = params;
260
261        let mut payload: BTreeMap<String, Value> = BTreeMap::new();
262        if let Some(value) = id {
263            payload.insert("id".to_string(), serde_json::json!(value));
264        }
265        if let Some(value) = subscription_id {
266            payload.insert("subscriptionId".to_string(), serde_json::json!(value));
267        }
268        let payload = remove_empty_value(payload);
269
270        self.websocket_api_base
271            .send_message::<serde_json::Value>(
272                "/userDataStream.unsubscribe".trim_start_matches('/'),
273                payload,
274                WebsocketMessageSendOptions::new(),
275            )
276            .await
277            .map_err(anyhow::Error::from)?
278            .into_iter()
279            .next()
280            .ok_or(WebsocketError::NoResponse)
281            .map_err(anyhow::Error::from)
282    }
283}
284
285#[cfg(all(test, feature = "spot"))]
286mod tests {
287    use super::*;
288    use crate::TOKIO_SHARED_RT;
289    use crate::common::websocket::{WebsocketApi, WebsocketConnection, WebsocketHandler};
290    use crate::config::ConfigurationWebsocketApi;
291    use crate::errors::WebsocketError;
292    use crate::models::WebsocketApiRateLimit;
293    use serde_json::{Value, json};
294    use tokio::spawn;
295    use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
296    use tokio::time::{Duration, timeout};
297    use tokio_tungstenite::tungstenite::Message;
298
299    async fn setup() -> (
300        Arc<WebsocketApi>,
301        Arc<WebsocketConnection>,
302        UnboundedReceiver<Message>,
303    ) {
304        let conn = WebsocketConnection::new("test-conn");
305        let (tx, rx) = unbounded_channel::<Message>();
306        {
307            let mut conn_state = conn.state.lock().await;
308            conn_state.ws_write_tx = Some(tx);
309        }
310
311        let config = ConfigurationWebsocketApi::builder()
312            .api_key("key")
313            .api_secret("secret")
314            .build()
315            .expect("Failed to build configuration");
316        let ws_api = WebsocketApi::new(config, vec![conn.clone()]);
317        conn.set_handler(ws_api.clone() as Arc<dyn WebsocketHandler>)
318            .await;
319        ws_api.clone().connect().await.unwrap();
320
321        (ws_api, conn, rx)
322    }
323
324    #[test]
325    fn session_subscriptions_success() {
326        TOKIO_SHARED_RT.block_on(async {
327            let (ws_api, conn, mut rx) = setup().await;
328            let client = UserDataStreamApiClient::new(ws_api.clone());
329
330            let handle = spawn(async move {
331                let params = SessionSubscriptionsParams::builder().build().unwrap();
332                client.session_subscriptions(params).await
333            });
334
335            let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
336            let Message::Text(text) = sent else { panic!() };
337            let v: Value = serde_json::from_str(&text).unwrap();
338            let id = v["id"].as_str().unwrap();
339            assert_eq!(v["method"], "/session.subscriptions".trim_start_matches('/'));
340
341            let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df5a22-88ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":[{"subscriptionId":1},{"subscriptionId":0}]}"#).unwrap();
342            resp_json["id"] = id.into();
343
344            let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
345            let expected_data: Vec<models::SessionSubscriptionsResponseResultInner> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
346            let empty_array = Value::Array(vec![]);
347            let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
348            let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
349                match raw_rate_limits.as_array() {
350                    Some(arr) if arr.is_empty() => None,
351                    Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
352                    None => None,
353                };
354
355            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
356
357            let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
358
359
360            let response_rate_limits = response.rate_limits.clone();
361            let response_data = response.data().expect("deserialize data");
362
363            assert_eq!(response_rate_limits, expected_rate_limits);
364            assert_eq!(response_data, expected_data);
365        });
366    }
367
368    #[test]
369    fn session_subscriptions_error_response() {
370        TOKIO_SHARED_RT.block_on(async {
371            let (ws_api, conn, mut rx) = setup().await;
372            let client = UserDataStreamApiClient::new(ws_api.clone());
373
374            let handle = tokio::spawn(async move {
375                let params = SessionSubscriptionsParams::builder().build().unwrap();
376                client.session_subscriptions(params).await
377            });
378
379            let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
380            let Message::Text(text) = sent else { panic!() };
381            let v: Value = serde_json::from_str(&text).unwrap();
382            let id = v["id"].as_str().unwrap().to_string();
383
384            let resp_json = json!({
385                "id": id,
386                "status": 400,
387                    "error": {
388                        "code": -2010,
389                        "msg": "Account has insufficient balance for requested action.",
390                    },
391                    "rateLimits": [
392                        {
393                            "rateLimitType": "ORDERS",
394                            "interval": "SECOND",
395                            "intervalNum": 10,
396                            "limit": 50,
397                            "count": 13
398                        },
399                    ],
400            });
401            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
402
403            let join = timeout(Duration::from_secs(1), handle).await.unwrap();
404            match join {
405                Ok(Err(e)) => {
406                    let msg = e.to_string();
407                    assert!(
408                        msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
409                        "Expected error msg to contain server error, got: {msg}"
410                    );
411                }
412                Ok(Ok(_)) => panic!("Expected error"),
413                Err(_) => panic!("Task panicked"),
414            }
415        });
416    }
417
418    #[test]
419    fn session_subscriptions_request_timeout() {
420        TOKIO_SHARED_RT.block_on(async {
421            let (ws_api, _conn, mut rx) = setup().await;
422            let client = UserDataStreamApiClient::new(ws_api.clone());
423
424            let handle = spawn(async move {
425                let params = SessionSubscriptionsParams::builder().build().unwrap();
426                client.session_subscriptions(params).await
427            });
428
429            let sent = timeout(Duration::from_secs(1), rx.recv())
430                .await
431                .expect("send should occur")
432                .expect("channel closed");
433            let Message::Text(text) = sent else {
434                panic!("expected Message Text")
435            };
436
437            let _: Value = serde_json::from_str(&text).unwrap();
438
439            let result = handle.await.expect("task completed");
440            match result {
441                Err(e) => {
442                    if let Some(inner) = e.downcast_ref::<WebsocketError>() {
443                        assert!(matches!(inner, WebsocketError::Timeout));
444                    } else {
445                        panic!("Unexpected error type: {:?}", e);
446                    }
447                }
448                Ok(_) => panic!("Expected timeout error"),
449            }
450        });
451    }
452
453    #[test]
454    fn user_data_stream_subscribe_success() {
455        TOKIO_SHARED_RT.block_on(async {
456            let (ws_api, conn, mut rx) = setup().await;
457            let client = UserDataStreamApiClient::new(ws_api.clone());
458
459            let handle = spawn(async move {
460                let params = UserDataStreamSubscribeParams::builder().build().unwrap();
461                client.user_data_stream_subscribe(params).await
462            });
463
464            let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
465            let Message::Text(text) = sent else { panic!() };
466            let v: Value = serde_json::from_str(&text).unwrap();
467            let id = v["id"].as_str().unwrap();
468            assert_eq!(v["method"], "/userDataStream.subscribe".trim_start_matches('/'));
469
470            let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
471            resp_json["id"] = id.into();
472
473            let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
474            let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
475            let empty_array = Value::Array(vec![]);
476            let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
477            let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
478                match raw_rate_limits.as_array() {
479                    Some(arr) if arr.is_empty() => None,
480                    Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
481                    None => None,
482                };
483
484            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
485
486            let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
487
488
489            let response_rate_limits = response.rate_limits.clone();
490            let response_data = response.data().expect("deserialize data");
491
492            assert_eq!(response_rate_limits, expected_rate_limits);
493            assert_eq!(response_data, expected_data);
494        });
495    }
496
497    #[test]
498    fn user_data_stream_subscribe_error_response() {
499        TOKIO_SHARED_RT.block_on(async {
500            let (ws_api, conn, mut rx) = setup().await;
501            let client = UserDataStreamApiClient::new(ws_api.clone());
502
503            let handle = tokio::spawn(async move {
504                let params = UserDataStreamSubscribeParams::builder().build().unwrap();
505                client.user_data_stream_subscribe(params).await
506            });
507
508            let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
509            let Message::Text(text) = sent else { panic!() };
510            let v: Value = serde_json::from_str(&text).unwrap();
511            let id = v["id"].as_str().unwrap().to_string();
512
513            let resp_json = json!({
514                "id": id,
515                "status": 400,
516                    "error": {
517                        "code": -2010,
518                        "msg": "Account has insufficient balance for requested action.",
519                    },
520                    "rateLimits": [
521                        {
522                            "rateLimitType": "ORDERS",
523                            "interval": "SECOND",
524                            "intervalNum": 10,
525                            "limit": 50,
526                            "count": 13
527                        },
528                    ],
529            });
530            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
531
532            let join = timeout(Duration::from_secs(1), handle).await.unwrap();
533            match join {
534                Ok(Err(e)) => {
535                    let msg = e.to_string();
536                    assert!(
537                        msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
538                        "Expected error msg to contain server error, got: {msg}"
539                    );
540                }
541                Ok(Ok(_)) => panic!("Expected error"),
542                Err(_) => panic!("Task panicked"),
543            }
544        });
545    }
546
547    #[test]
548    fn user_data_stream_subscribe_request_timeout() {
549        TOKIO_SHARED_RT.block_on(async {
550            let (ws_api, _conn, mut rx) = setup().await;
551            let client = UserDataStreamApiClient::new(ws_api.clone());
552
553            let handle = spawn(async move {
554                let params = UserDataStreamSubscribeParams::builder().build().unwrap();
555                client.user_data_stream_subscribe(params).await
556            });
557
558            let sent = timeout(Duration::from_secs(1), rx.recv())
559                .await
560                .expect("send should occur")
561                .expect("channel closed");
562            let Message::Text(text) = sent else {
563                panic!("expected Message Text")
564            };
565
566            let _: Value = serde_json::from_str(&text).unwrap();
567
568            let result = handle.await.expect("task completed");
569            match result {
570                Err(e) => {
571                    if let Some(inner) = e.downcast_ref::<WebsocketError>() {
572                        assert!(matches!(inner, WebsocketError::Timeout));
573                    } else {
574                        panic!("Unexpected error type: {:?}", e);
575                    }
576                }
577                Ok(_) => panic!("Expected timeout error"),
578            }
579        });
580    }
581
582    #[test]
583    fn user_data_stream_subscribe_signature_success() {
584        TOKIO_SHARED_RT.block_on(async {
585            let (ws_api, conn, mut rx) = setup().await;
586            let client = UserDataStreamApiClient::new(ws_api.clone());
587
588            let handle = spawn(async move {
589                let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
590                client.user_data_stream_subscribe_signature(params).await
591            });
592
593            let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
594            let Message::Text(text) = sent else { panic!() };
595            let v: Value = serde_json::from_str(&text).unwrap();
596            let id = v["id"].as_str().unwrap();
597            assert_eq!(v["method"], "/userDataStream.subscribe.signature".trim_start_matches('/'));
598
599            let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a22-98ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
600            resp_json["id"] = id.into();
601
602            let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
603            let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
604            let empty_array = Value::Array(vec![]);
605            let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
606            let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
607                match raw_rate_limits.as_array() {
608                    Some(arr) if arr.is_empty() => None,
609                    Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
610                    None => None,
611                };
612
613            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
614
615            let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
616
617
618            let response_rate_limits = response.rate_limits.clone();
619            let response_data = response.data().expect("deserialize data");
620
621            assert_eq!(response_rate_limits, expected_rate_limits);
622            assert_eq!(response_data, expected_data);
623        });
624    }
625
626    #[test]
627    fn user_data_stream_subscribe_signature_error_response() {
628        TOKIO_SHARED_RT.block_on(async {
629            let (ws_api, conn, mut rx) = setup().await;
630            let client = UserDataStreamApiClient::new(ws_api.clone());
631
632            let handle = tokio::spawn(async move {
633                let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
634                client.user_data_stream_subscribe_signature(params).await
635            });
636
637            let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
638            let Message::Text(text) = sent else { panic!() };
639            let v: Value = serde_json::from_str(&text).unwrap();
640            let id = v["id"].as_str().unwrap().to_string();
641
642            let resp_json = json!({
643                "id": id,
644                "status": 400,
645                    "error": {
646                        "code": -2010,
647                        "msg": "Account has insufficient balance for requested action.",
648                    },
649                    "rateLimits": [
650                        {
651                            "rateLimitType": "ORDERS",
652                            "interval": "SECOND",
653                            "intervalNum": 10,
654                            "limit": 50,
655                            "count": 13
656                        },
657                    ],
658            });
659            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
660
661            let join = timeout(Duration::from_secs(1), handle).await.unwrap();
662            match join {
663                Ok(Err(e)) => {
664                    let msg = e.to_string();
665                    assert!(
666                        msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
667                        "Expected error msg to contain server error, got: {msg}"
668                    );
669                }
670                Ok(Ok(_)) => panic!("Expected error"),
671                Err(_) => panic!("Task panicked"),
672            }
673        });
674    }
675
676    #[test]
677    fn user_data_stream_subscribe_signature_request_timeout() {
678        TOKIO_SHARED_RT.block_on(async {
679            let (ws_api, _conn, mut rx) = setup().await;
680            let client = UserDataStreamApiClient::new(ws_api.clone());
681
682            let handle = spawn(async move {
683                let params = UserDataStreamSubscribeSignatureParams::builder()
684                    .build()
685                    .unwrap();
686                client.user_data_stream_subscribe_signature(params).await
687            });
688
689            let sent = timeout(Duration::from_secs(1), rx.recv())
690                .await
691                .expect("send should occur")
692                .expect("channel closed");
693            let Message::Text(text) = sent else {
694                panic!("expected Message Text")
695            };
696
697            let _: Value = serde_json::from_str(&text).unwrap();
698
699            let result = handle.await.expect("task completed");
700            match result {
701                Err(e) => {
702                    if let Some(inner) = e.downcast_ref::<WebsocketError>() {
703                        assert!(matches!(inner, WebsocketError::Timeout));
704                    } else {
705                        panic!("Unexpected error type: {:?}", e);
706                    }
707                }
708                Ok(_) => panic!("Expected timeout error"),
709            }
710        });
711    }
712
713    #[test]
714    fn user_data_stream_unsubscribe_success() {
715        TOKIO_SHARED_RT.block_on(async {
716            let (ws_api, conn, mut rx) = setup().await;
717            let client = UserDataStreamApiClient::new(ws_api.clone());
718
719            let handle = spawn(async move {
720                let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
721                client.user_data_stream_unsubscribe(params).await
722            });
723
724            let sent = timeout(Duration::from_secs(1), rx.recv())
725                .await
726                .expect("send should occur")
727                .expect("channel closed");
728            let Message::Text(text) = sent else { panic!() };
729            let v: Value = serde_json::from_str(&text).unwrap();
730            let id = v["id"].as_str().unwrap();
731            assert_eq!(
732                v["method"],
733                "/userDataStream.unsubscribe".trim_start_matches('/')
734            );
735
736            let mut resp_json: Value = serde_json::from_str(
737                r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{}}"#,
738            )
739            .unwrap();
740            resp_json["id"] = id.into();
741
742            let raw_data = resp_json
743                .get("result")
744                .or_else(|| resp_json.get("response"))
745                .expect("no response in JSON");
746            let expected_data: serde_json::Value =
747                serde_json::from_value(raw_data.clone()).expect("should parse raw response");
748            let empty_array = Value::Array(vec![]);
749            let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
750            let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
751                match raw_rate_limits.as_array() {
752                    Some(arr) if arr.is_empty() => None,
753                    Some(_) => Some(
754                        serde_json::from_value(raw_rate_limits.clone())
755                            .expect("should parse rateLimits array"),
756                    ),
757                    None => None,
758                };
759
760            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
761
762            let response = timeout(Duration::from_secs(1), handle)
763                .await
764                .expect("task done")
765                .expect("no panic")
766                .expect("no error");
767
768            let response_rate_limits = response.rate_limits.clone();
769            let response_data = response.data().expect("deserialize data");
770
771            assert_eq!(response_rate_limits, expected_rate_limits);
772            assert_eq!(response_data, expected_data);
773        });
774    }
775
776    #[test]
777    fn user_data_stream_unsubscribe_error_response() {
778        TOKIO_SHARED_RT.block_on(async {
779            let (ws_api, conn, mut rx) = setup().await;
780            let client = UserDataStreamApiClient::new(ws_api.clone());
781
782            let handle = tokio::spawn(async move {
783                let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
784                client.user_data_stream_unsubscribe(params).await
785            });
786
787            let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
788            let Message::Text(text) = sent else { panic!() };
789            let v: Value = serde_json::from_str(&text).unwrap();
790            let id = v["id"].as_str().unwrap().to_string();
791
792            let resp_json = json!({
793                "id": id,
794                "status": 400,
795                    "error": {
796                        "code": -2010,
797                        "msg": "Account has insufficient balance for requested action.",
798                    },
799                    "rateLimits": [
800                        {
801                            "rateLimitType": "ORDERS",
802                            "interval": "SECOND",
803                            "intervalNum": 10,
804                            "limit": 50,
805                            "count": 13
806                        },
807                    ],
808            });
809            WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
810
811            let join = timeout(Duration::from_secs(1), handle).await.unwrap();
812            match join {
813                Ok(Err(e)) => {
814                    let msg = e.to_string();
815                    assert!(
816                        msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
817                        "Expected error msg to contain server error, got: {msg}"
818                    );
819                }
820                Ok(Ok(_)) => panic!("Expected error"),
821                Err(_) => panic!("Task panicked"),
822            }
823        });
824    }
825
826    #[test]
827    fn user_data_stream_unsubscribe_request_timeout() {
828        TOKIO_SHARED_RT.block_on(async {
829            let (ws_api, _conn, mut rx) = setup().await;
830            let client = UserDataStreamApiClient::new(ws_api.clone());
831
832            let handle = spawn(async move {
833                let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
834                client.user_data_stream_unsubscribe(params).await
835            });
836
837            let sent = timeout(Duration::from_secs(1), rx.recv())
838                .await
839                .expect("send should occur")
840                .expect("channel closed");
841            let Message::Text(text) = sent else {
842                panic!("expected Message Text")
843            };
844
845            let _: Value = serde_json::from_str(&text).unwrap();
846
847            let result = handle.await.expect("task completed");
848            match result {
849                Err(e) => {
850                    if let Some(inner) = e.downcast_ref::<WebsocketError>() {
851                        assert!(matches!(inner, WebsocketError::Timeout));
852                    } else {
853                        panic!("Unexpected error type: {:?}", e);
854                    }
855                }
856                Ok(_) => panic!("Expected timeout error"),
857            }
858        });
859    }
860}