#![allow(unused_imports)]
use anyhow::Context;
use async_trait::async_trait;
use derive_builder::Builder;
use rust_decimal::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::BTreeMap, sync::Arc};
use crate::common::{
errors::WebsocketError,
models::{ParamBuildError, WebsocketApiResponse},
utils::remove_empty_value,
websocket::{WebsocketApi, WebsocketMessageSendOptions},
};
use crate::spot::websocket_api::models;
#[async_trait]
pub trait UserDataStreamApi: Send + Sync {
async fn session_subscriptions(
&self,
params: SessionSubscriptionsParams,
) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>;
async fn user_data_stream_subscribe(
&self,
params: UserDataStreamSubscribeParams,
) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
async fn user_data_stream_subscribe_signature(
&self,
params: UserDataStreamSubscribeSignatureParams,
) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>;
async fn user_data_stream_unsubscribe(
&self,
params: UserDataStreamUnsubscribeParams,
) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>>;
}
#[derive(Clone)]
pub struct UserDataStreamApiClient {
websocket_api_base: Arc<WebsocketApi>,
}
impl UserDataStreamApiClient {
pub fn new(websocket_api_base: Arc<WebsocketApi>) -> Self {
Self { websocket_api_base }
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct SessionSubscriptionsParams {
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl SessionSubscriptionsParams {
#[must_use]
pub fn builder() -> SessionSubscriptionsParamsBuilder {
SessionSubscriptionsParamsBuilder::default()
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct UserDataStreamSubscribeParams {
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl UserDataStreamSubscribeParams {
#[must_use]
pub fn builder() -> UserDataStreamSubscribeParamsBuilder {
UserDataStreamSubscribeParamsBuilder::default()
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct UserDataStreamSubscribeSignatureParams {
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub recv_window: Option<rust_decimal::Decimal>,
}
impl UserDataStreamSubscribeSignatureParams {
#[must_use]
pub fn builder() -> UserDataStreamSubscribeSignatureParamsBuilder {
UserDataStreamSubscribeSignatureParamsBuilder::default()
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct UserDataStreamUnsubscribeParams {
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub subscription_id: Option<i32>,
}
impl UserDataStreamUnsubscribeParams {
#[must_use]
pub fn builder() -> UserDataStreamUnsubscribeParamsBuilder {
UserDataStreamUnsubscribeParamsBuilder::default()
}
}
#[async_trait]
impl UserDataStreamApi for UserDataStreamApiClient {
async fn session_subscriptions(
&self,
params: SessionSubscriptionsParams,
) -> anyhow::Result<WebsocketApiResponse<Vec<models::SessionSubscriptionsResponseResultInner>>>
{
let SessionSubscriptionsParams { id } = params;
let mut payload: BTreeMap<String, Value> = BTreeMap::new();
if let Some(value) = id {
payload.insert("id".to_string(), serde_json::json!(value));
}
let payload = remove_empty_value(payload);
self.websocket_api_base
.send_message::<Vec<models::SessionSubscriptionsResponseResultInner>>(
"/session.subscriptions".trim_start_matches('/'),
payload,
WebsocketMessageSendOptions::new(),
)
.await
.map_err(anyhow::Error::from)?
.into_iter()
.next()
.ok_or(WebsocketError::NoResponse)
.map_err(anyhow::Error::from)
}
async fn user_data_stream_subscribe(
&self,
params: UserDataStreamSubscribeParams,
) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
{
let UserDataStreamSubscribeParams { id } = params;
let mut payload: BTreeMap<String, Value> = BTreeMap::new();
if let Some(value) = id {
payload.insert("id".to_string(), serde_json::json!(value));
}
let payload = remove_empty_value(payload);
self.websocket_api_base
.send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
"/userDataStream.subscribe".trim_start_matches('/'),
payload,
WebsocketMessageSendOptions::new(),
)
.await
.map_err(anyhow::Error::from)?
.into_iter()
.next()
.ok_or(WebsocketError::NoResponse)
.map_err(anyhow::Error::from)
}
async fn user_data_stream_subscribe_signature(
&self,
params: UserDataStreamSubscribeSignatureParams,
) -> anyhow::Result<WebsocketApiResponse<Box<models::UserDataStreamSubscribeResponseResult>>>
{
let UserDataStreamSubscribeSignatureParams { id, recv_window } = params;
let mut payload: BTreeMap<String, Value> = BTreeMap::new();
if let Some(value) = id {
payload.insert("id".to_string(), serde_json::json!(value));
}
if let Some(value) = recv_window {
payload.insert("recvWindow".to_string(), serde_json::json!(value));
}
let payload = remove_empty_value(payload);
self.websocket_api_base
.send_message::<Box<models::UserDataStreamSubscribeResponseResult>>(
"/userDataStream.subscribe.signature".trim_start_matches('/'),
payload,
WebsocketMessageSendOptions::new().signed(),
)
.await
.map_err(anyhow::Error::from)?
.into_iter()
.next()
.ok_or(WebsocketError::NoResponse)
.map_err(anyhow::Error::from)
}
async fn user_data_stream_unsubscribe(
&self,
params: UserDataStreamUnsubscribeParams,
) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>> {
let UserDataStreamUnsubscribeParams {
id,
subscription_id,
} = params;
let mut payload: BTreeMap<String, Value> = BTreeMap::new();
if let Some(value) = id {
payload.insert("id".to_string(), serde_json::json!(value));
}
if let Some(value) = subscription_id {
payload.insert("subscriptionId".to_string(), serde_json::json!(value));
}
let payload = remove_empty_value(payload);
self.websocket_api_base
.send_message::<serde_json::Value>(
"/userDataStream.unsubscribe".trim_start_matches('/'),
payload,
WebsocketMessageSendOptions::new(),
)
.await
.map_err(anyhow::Error::from)?
.into_iter()
.next()
.ok_or(WebsocketError::NoResponse)
.map_err(anyhow::Error::from)
}
}
#[cfg(all(test, feature = "spot"))]
mod tests {
use super::*;
use crate::TOKIO_SHARED_RT;
use crate::common::websocket::{WebsocketApi, WebsocketConnection, WebsocketHandler};
use crate::config::ConfigurationWebsocketApi;
use crate::errors::WebsocketError;
use crate::models::WebsocketApiRateLimit;
use serde_json::{Value, json};
use tokio::spawn;
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
use tokio::time::{Duration, timeout};
use tokio_tungstenite::tungstenite::Message;
async fn setup() -> (
Arc<WebsocketApi>,
Arc<WebsocketConnection>,
UnboundedReceiver<Message>,
) {
let conn = WebsocketConnection::new("test-conn");
let (tx, rx) = unbounded_channel::<Message>();
{
let mut conn_state = conn.state.lock().await;
conn_state.ws_write_tx = Some(tx);
}
let config = ConfigurationWebsocketApi::builder()
.api_key("key")
.api_secret("secret")
.build()
.expect("Failed to build configuration");
let ws_api = WebsocketApi::new(config, vec![conn.clone()]);
conn.set_handler(ws_api.clone() as Arc<dyn WebsocketHandler>)
.await;
ws_api.clone().connect().await.unwrap();
(ws_api, conn, rx)
}
#[test]
fn session_subscriptions_success() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = SessionSubscriptionsParams::builder().build().unwrap();
client.session_subscriptions(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap();
assert_eq!(v["method"], "/session.subscriptions".trim_start_matches('/'));
let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df5a22-88ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":[{"subscriptionId":1},{"subscriptionId":0}]}"#).unwrap();
resp_json["id"] = id.into();
let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
let expected_data: Vec<models::SessionSubscriptionsResponseResultInner> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
let empty_array = Value::Array(vec![]);
let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
match raw_rate_limits.as_array() {
Some(arr) if arr.is_empty() => None,
Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
None => None,
};
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
let response_rate_limits = response.rate_limits.clone();
let response_data = response.data().expect("deserialize data");
assert_eq!(response_rate_limits, expected_rate_limits);
assert_eq!(response_data, expected_data);
});
}
#[test]
fn session_subscriptions_error_response() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = tokio::spawn(async move {
let params = SessionSubscriptionsParams::builder().build().unwrap();
client.session_subscriptions(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap().to_string();
let resp_json = json!({
"id": id,
"status": 400,
"error": {
"code": -2010,
"msg": "Account has insufficient balance for requested action.",
},
"rateLimits": [
{
"rateLimitType": "ORDERS",
"interval": "SECOND",
"intervalNum": 10,
"limit": 50,
"count": 13
},
],
});
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let join = timeout(Duration::from_secs(1), handle).await.unwrap();
match join {
Ok(Err(e)) => {
let msg = e.to_string();
assert!(
msg.contains("Serverāside response error (code -2010): Account has insufficient balance for requested action."),
"Expected error msg to contain server error, got: {msg}"
);
}
Ok(Ok(_)) => panic!("Expected error"),
Err(_) => panic!("Task panicked"),
}
});
}
#[test]
fn session_subscriptions_request_timeout() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, _conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = SessionSubscriptionsParams::builder().build().unwrap();
client.session_subscriptions(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("send should occur")
.expect("channel closed");
let Message::Text(text) = sent else {
panic!("expected Message Text")
};
let _: Value = serde_json::from_str(&text).unwrap();
let result = handle.await.expect("task completed");
match result {
Err(e) => {
if let Some(inner) = e.downcast_ref::<WebsocketError>() {
assert!(matches!(inner, WebsocketError::Timeout));
} else {
panic!("Unexpected error type: {:?}", e);
}
}
Ok(_) => panic!("Expected timeout error"),
}
});
}
#[test]
fn user_data_stream_subscribe_success() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamSubscribeParams::builder().build().unwrap();
client.user_data_stream_subscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap();
assert_eq!(v["method"], "/userDataStream.subscribe".trim_start_matches('/'));
let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
resp_json["id"] = id.into();
let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
let empty_array = Value::Array(vec![]);
let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
match raw_rate_limits.as_array() {
Some(arr) if arr.is_empty() => None,
Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
None => None,
};
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
let response_rate_limits = response.rate_limits.clone();
let response_data = response.data().expect("deserialize data");
assert_eq!(response_rate_limits, expected_rate_limits);
assert_eq!(response_data, expected_data);
});
}
#[test]
fn user_data_stream_subscribe_error_response() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = tokio::spawn(async move {
let params = UserDataStreamSubscribeParams::builder().build().unwrap();
client.user_data_stream_subscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap().to_string();
let resp_json = json!({
"id": id,
"status": 400,
"error": {
"code": -2010,
"msg": "Account has insufficient balance for requested action.",
},
"rateLimits": [
{
"rateLimitType": "ORDERS",
"interval": "SECOND",
"intervalNum": 10,
"limit": 50,
"count": 13
},
],
});
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let join = timeout(Duration::from_secs(1), handle).await.unwrap();
match join {
Ok(Err(e)) => {
let msg = e.to_string();
assert!(
msg.contains("Serverāside response error (code -2010): Account has insufficient balance for requested action."),
"Expected error msg to contain server error, got: {msg}"
);
}
Ok(Ok(_)) => panic!("Expected error"),
Err(_) => panic!("Task panicked"),
}
});
}
#[test]
fn user_data_stream_subscribe_request_timeout() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, _conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamSubscribeParams::builder().build().unwrap();
client.user_data_stream_subscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("send should occur")
.expect("channel closed");
let Message::Text(text) = sent else {
panic!("expected Message Text")
};
let _: Value = serde_json::from_str(&text).unwrap();
let result = handle.await.expect("task completed");
match result {
Err(e) => {
if let Some(inner) = e.downcast_ref::<WebsocketError>() {
assert!(matches!(inner, WebsocketError::Timeout));
} else {
panic!("Unexpected error type: {:?}", e);
}
}
Ok(_) => panic!("Expected timeout error"),
}
});
}
#[test]
fn user_data_stream_subscribe_signature_success() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
client.user_data_stream_subscribe_signature(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap();
assert_eq!(v["method"], "/userDataStream.subscribe.signature".trim_start_matches('/'));
let mut resp_json: Value = serde_json::from_str(r#"{"id":"d3df8a22-98ea-4fe0-9f4e-0fcea5d418b7","status":200,"result":{"subscriptionId":0}}"#).unwrap();
resp_json["id"] = id.into();
let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
let expected_data: Box<models::UserDataStreamSubscribeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
let empty_array = Value::Array(vec![]);
let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
match raw_rate_limits.as_array() {
Some(arr) if arr.is_empty() => None,
Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
None => None,
};
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
let response_rate_limits = response.rate_limits.clone();
let response_data = response.data().expect("deserialize data");
assert_eq!(response_rate_limits, expected_rate_limits);
assert_eq!(response_data, expected_data);
});
}
#[test]
fn user_data_stream_subscribe_signature_error_response() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = tokio::spawn(async move {
let params = UserDataStreamSubscribeSignatureParams::builder().build().unwrap();
client.user_data_stream_subscribe_signature(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap().to_string();
let resp_json = json!({
"id": id,
"status": 400,
"error": {
"code": -2010,
"msg": "Account has insufficient balance for requested action.",
},
"rateLimits": [
{
"rateLimitType": "ORDERS",
"interval": "SECOND",
"intervalNum": 10,
"limit": 50,
"count": 13
},
],
});
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let join = timeout(Duration::from_secs(1), handle).await.unwrap();
match join {
Ok(Err(e)) => {
let msg = e.to_string();
assert!(
msg.contains("Serverāside response error (code -2010): Account has insufficient balance for requested action."),
"Expected error msg to contain server error, got: {msg}"
);
}
Ok(Ok(_)) => panic!("Expected error"),
Err(_) => panic!("Task panicked"),
}
});
}
#[test]
fn user_data_stream_subscribe_signature_request_timeout() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, _conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamSubscribeSignatureParams::builder()
.build()
.unwrap();
client.user_data_stream_subscribe_signature(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("send should occur")
.expect("channel closed");
let Message::Text(text) = sent else {
panic!("expected Message Text")
};
let _: Value = serde_json::from_str(&text).unwrap();
let result = handle.await.expect("task completed");
match result {
Err(e) => {
if let Some(inner) = e.downcast_ref::<WebsocketError>() {
assert!(matches!(inner, WebsocketError::Timeout));
} else {
panic!("Unexpected error type: {:?}", e);
}
}
Ok(_) => panic!("Expected timeout error"),
}
});
}
#[test]
fn user_data_stream_unsubscribe_success() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
client.user_data_stream_unsubscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("send should occur")
.expect("channel closed");
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap();
assert_eq!(
v["method"],
"/userDataStream.unsubscribe".trim_start_matches('/')
);
let mut resp_json: Value = serde_json::from_str(
r#"{"id":"d3df8a21-98ea-4fe0-8f4e-0fcea5d418b7","status":200,"result":{}}"#,
)
.unwrap();
resp_json["id"] = id.into();
let raw_data = resp_json
.get("result")
.or_else(|| resp_json.get("response"))
.expect("no response in JSON");
let expected_data: serde_json::Value =
serde_json::from_value(raw_data.clone()).expect("should parse raw response");
let empty_array = Value::Array(vec![]);
let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
match raw_rate_limits.as_array() {
Some(arr) if arr.is_empty() => None,
Some(_) => Some(
serde_json::from_value(raw_rate_limits.clone())
.expect("should parse rateLimits array"),
),
None => None,
};
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let response = timeout(Duration::from_secs(1), handle)
.await
.expect("task done")
.expect("no panic")
.expect("no error");
let response_rate_limits = response.rate_limits.clone();
let response_data = response.data().expect("deserialize data");
assert_eq!(response_rate_limits, expected_rate_limits);
assert_eq!(response_data, expected_data);
});
}
#[test]
fn user_data_stream_unsubscribe_error_response() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = tokio::spawn(async move {
let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
client.user_data_stream_unsubscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
let Message::Text(text) = sent else { panic!() };
let v: Value = serde_json::from_str(&text).unwrap();
let id = v["id"].as_str().unwrap().to_string();
let resp_json = json!({
"id": id,
"status": 400,
"error": {
"code": -2010,
"msg": "Account has insufficient balance for requested action.",
},
"rateLimits": [
{
"rateLimitType": "ORDERS",
"interval": "SECOND",
"intervalNum": 10,
"limit": 50,
"count": 13
},
],
});
WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
let join = timeout(Duration::from_secs(1), handle).await.unwrap();
match join {
Ok(Err(e)) => {
let msg = e.to_string();
assert!(
msg.contains("Serverāside response error (code -2010): Account has insufficient balance for requested action."),
"Expected error msg to contain server error, got: {msg}"
);
}
Ok(Ok(_)) => panic!("Expected error"),
Err(_) => panic!("Task panicked"),
}
});
}
#[test]
fn user_data_stream_unsubscribe_request_timeout() {
TOKIO_SHARED_RT.block_on(async {
let (ws_api, _conn, mut rx) = setup().await;
let client = UserDataStreamApiClient::new(ws_api.clone());
let handle = spawn(async move {
let params = UserDataStreamUnsubscribeParams::builder().build().unwrap();
client.user_data_stream_unsubscribe(params).await
});
let sent = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("send should occur")
.expect("channel closed");
let Message::Text(text) = sent else {
panic!("expected Message Text")
};
let _: Value = serde_json::from_str(&text).unwrap();
let result = handle.await.expect("task completed");
match result {
Err(e) => {
if let Some(inner) = e.downcast_ref::<WebsocketError>() {
assert!(matches!(inner, WebsocketError::Timeout));
} else {
panic!("Unexpected error type: {:?}", e);
}
}
Ok(_) => panic!("Expected timeout error"),
}
});
}
}