#![allow(unused_imports)]
use async_trait::async_trait;
use derive_builder::Builder;
use reqwest;
use rust_decimal::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::BTreeMap;
use crate::common::{
config::ConfigurationRestApi,
models::{ParamBuildError, RestApiResponse},
utils::send_request,
};
use crate::margin_trading::rest_api::models;
const HAS_TIME_UNIT: bool = false;
#[async_trait]
pub trait TradeDataStreamApi: Send + Sync {
async fn close_isolated_margin_user_data_stream(
&self,
params: CloseIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>>;
async fn close_margin_user_data_stream(
&self,
params: CloseMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>>;
async fn keepalive_isolated_margin_user_data_stream(
&self,
params: KeepaliveIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>>;
async fn keepalive_margin_user_data_stream(
&self,
params: KeepaliveMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>>;
async fn start_isolated_margin_user_data_stream(
&self,
params: StartIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<models::StartIsolatedMarginUserDataStreamResponse>>;
async fn start_margin_user_data_stream(
&self,
) -> anyhow::Result<RestApiResponse<models::StartMarginUserDataStreamResponse>>;
}
#[derive(Debug, Clone)]
pub struct TradeDataStreamApiClient {
configuration: ConfigurationRestApi,
}
impl TradeDataStreamApiClient {
pub fn new(configuration: ConfigurationRestApi) -> Self {
Self { configuration }
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct CloseIsolatedMarginUserDataStreamParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub listenkey: String,
}
impl CloseIsolatedMarginUserDataStreamParams {
#[must_use]
pub fn builder(
symbol: String,
listenkey: String,
) -> CloseIsolatedMarginUserDataStreamParamsBuilder {
CloseIsolatedMarginUserDataStreamParamsBuilder::default()
.symbol(symbol)
.listenkey(listenkey)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct CloseMarginUserDataStreamParams {
#[builder(setter(into))]
pub listenkey: String,
}
impl CloseMarginUserDataStreamParams {
#[must_use]
pub fn builder(listenkey: String) -> CloseMarginUserDataStreamParamsBuilder {
CloseMarginUserDataStreamParamsBuilder::default().listenkey(listenkey)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KeepaliveIsolatedMarginUserDataStreamParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub listen_key: String,
}
impl KeepaliveIsolatedMarginUserDataStreamParams {
#[must_use]
pub fn builder(
symbol: String,
listen_key: String,
) -> KeepaliveIsolatedMarginUserDataStreamParamsBuilder {
KeepaliveIsolatedMarginUserDataStreamParamsBuilder::default()
.symbol(symbol)
.listen_key(listen_key)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KeepaliveMarginUserDataStreamParams {
#[builder(setter(into))]
pub listen_key: String,
}
impl KeepaliveMarginUserDataStreamParams {
#[must_use]
pub fn builder(listen_key: String) -> KeepaliveMarginUserDataStreamParamsBuilder {
KeepaliveMarginUserDataStreamParamsBuilder::default().listen_key(listen_key)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct StartIsolatedMarginUserDataStreamParams {
#[builder(setter(into))]
pub symbol: String,
}
impl StartIsolatedMarginUserDataStreamParams {
#[must_use]
pub fn builder(symbol: String) -> StartIsolatedMarginUserDataStreamParamsBuilder {
StartIsolatedMarginUserDataStreamParamsBuilder::default().symbol(symbol)
}
}
#[async_trait]
impl TradeDataStreamApi for TradeDataStreamApiClient {
async fn close_isolated_margin_user_data_stream(
&self,
params: CloseIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
let CloseIsolatedMarginUserDataStreamParams { symbol, listenkey } = params;
let mut query_params = BTreeMap::new();
query_params.insert("symbol".to_string(), json!(symbol));
query_params.insert("listenkey".to_string(), json!(listenkey));
send_request::<Value>(
&self.configuration,
"/sapi/v1/userDataStream/isolated",
reqwest::Method::DELETE,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
async fn close_margin_user_data_stream(
&self,
params: CloseMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
let CloseMarginUserDataStreamParams { listenkey } = params;
let mut query_params = BTreeMap::new();
query_params.insert("listenkey".to_string(), json!(listenkey));
send_request::<Value>(
&self.configuration,
"/sapi/v1/userDataStream",
reqwest::Method::DELETE,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
async fn keepalive_isolated_margin_user_data_stream(
&self,
params: KeepaliveIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
let KeepaliveIsolatedMarginUserDataStreamParams { symbol, listen_key } = params;
let mut query_params = BTreeMap::new();
query_params.insert("symbol".to_string(), json!(symbol));
query_params.insert("listenKey".to_string(), json!(listen_key));
send_request::<Value>(
&self.configuration,
"/sapi/v1/userDataStream/isolated",
reqwest::Method::PUT,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
async fn keepalive_margin_user_data_stream(
&self,
params: KeepaliveMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
let KeepaliveMarginUserDataStreamParams { listen_key } = params;
let mut query_params = BTreeMap::new();
query_params.insert("listenKey".to_string(), json!(listen_key));
send_request::<Value>(
&self.configuration,
"/sapi/v1/userDataStream",
reqwest::Method::PUT,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
async fn start_isolated_margin_user_data_stream(
&self,
params: StartIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<models::StartIsolatedMarginUserDataStreamResponse>> {
let StartIsolatedMarginUserDataStreamParams { symbol } = params;
let mut query_params = BTreeMap::new();
query_params.insert("symbol".to_string(), json!(symbol));
send_request::<models::StartIsolatedMarginUserDataStreamResponse>(
&self.configuration,
"/sapi/v1/userDataStream/isolated",
reqwest::Method::POST,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
async fn start_margin_user_data_stream(
&self,
) -> anyhow::Result<RestApiResponse<models::StartMarginUserDataStreamResponse>> {
let query_params = BTreeMap::new();
send_request::<models::StartMarginUserDataStreamResponse>(
&self.configuration,
"/sapi/v1/userDataStream",
reqwest::Method::POST,
query_params,
if HAS_TIME_UNIT {
self.configuration.time_unit
} else {
None
},
false,
)
.await
}
}
#[cfg(all(test, feature = "margin_trading"))]
mod tests {
use super::*;
use crate::TOKIO_SHARED_RT;
use crate::{errors::ConnectorError, models::DataFuture, models::RestApiRateLimit};
use async_trait::async_trait;
use std::collections::HashMap;
struct DummyRestApiResponse<T> {
inner: Box<dyn FnOnce() -> DataFuture<Result<T, ConnectorError>> + Send + Sync>,
status: u16,
headers: HashMap<String, String>,
rate_limits: Option<Vec<RestApiRateLimit>>,
}
impl<T> From<DummyRestApiResponse<T>> for RestApiResponse<T> {
fn from(dummy: DummyRestApiResponse<T>) -> Self {
Self {
data_fn: dummy.inner,
status: dummy.status,
headers: dummy.headers,
rate_limits: dummy.rate_limits,
}
}
}
struct MockTradeDataStreamApiClient {
force_error: bool,
}
#[async_trait]
impl TradeDataStreamApi for MockTradeDataStreamApiClient {
async fn close_isolated_margin_user_data_stream(
&self,
_params: CloseIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let dummy_response = Value::Null;
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
async fn close_margin_user_data_stream(
&self,
_params: CloseMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let dummy_response = Value::Null;
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
async fn keepalive_isolated_margin_user_data_stream(
&self,
_params: KeepaliveIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let dummy_response = Value::Null;
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
async fn keepalive_margin_user_data_stream(
&self,
_params: KeepaliveMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<Value>> {
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let dummy_response = Value::Null;
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
async fn start_isolated_margin_user_data_stream(
&self,
_params: StartIsolatedMarginUserDataStreamParams,
) -> anyhow::Result<RestApiResponse<models::StartIsolatedMarginUserDataStreamResponse>>
{
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let dummy_response: models::StartIsolatedMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartIsolatedMarginUserDataStreamResponse");
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
async fn start_margin_user_data_stream(
&self,
) -> anyhow::Result<RestApiResponse<models::StartMarginUserDataStreamResponse>> {
if self.force_error {
return Err(
ConnectorError::ConnectorClientError("ResponseError".to_string()).into(),
);
}
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let dummy_response: models::StartMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartMarginUserDataStreamResponse");
let dummy = DummyRestApiResponse {
inner: Box::new(move || Box::pin(async move { Ok(dummy_response) })),
status: 200,
headers: HashMap::new(),
rate_limits: None,
};
Ok(dummy.into())
}
}
#[test]
fn close_isolated_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = CloseIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listenkey_example".to_string(),
)
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.close_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn close_isolated_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = CloseIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listenkey_example".to_string(),
)
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.close_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn close_isolated_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
let params = CloseIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listenkey_example".to_string(),
)
.build()
.unwrap();
match client.close_isolated_margin_user_data_stream(params).await {
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
#[test]
fn close_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = CloseMarginUserDataStreamParams::builder("listenkey_example".to_string())
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.close_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn close_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = CloseMarginUserDataStreamParams::builder("listenkey_example".to_string())
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.close_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn close_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
let params = CloseMarginUserDataStreamParams::builder("listenkey_example".to_string())
.build()
.unwrap();
match client.close_margin_user_data_stream(params).await {
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
#[test]
fn keepalive_isolated_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = KeepaliveIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listen_key_example".to_string(),
)
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.keepalive_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn keepalive_isolated_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params = KeepaliveIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listen_key_example".to_string(),
)
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.keepalive_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn keepalive_isolated_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
let params = KeepaliveIsolatedMarginUserDataStreamParams::builder(
"symbol_example".to_string(),
"listen_key_example".to_string(),
)
.build()
.unwrap();
match client
.keepalive_isolated_margin_user_data_stream(params)
.await
{
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
#[test]
fn keepalive_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params =
KeepaliveMarginUserDataStreamParams::builder("listen_key_example".to_string())
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.keepalive_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn keepalive_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params =
KeepaliveMarginUserDataStreamParams::builder("listen_key_example".to_string())
.build()
.unwrap();
let expected_response = Value::Null;
let resp = client
.keepalive_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn keepalive_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
let params =
KeepaliveMarginUserDataStreamParams::builder("listen_key_example".to_string())
.build()
.unwrap();
match client.keepalive_margin_user_data_stream(params).await {
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
#[test]
fn start_isolated_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params =
StartIsolatedMarginUserDataStreamParams::builder("symbol_example".to_string())
.build()
.unwrap();
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let expected_response: models::StartIsolatedMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartIsolatedMarginUserDataStreamResponse");
let resp = client
.start_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn start_isolated_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let params =
StartIsolatedMarginUserDataStreamParams::builder("symbol_example".to_string())
.build()
.unwrap();
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let expected_response: models::StartIsolatedMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartIsolatedMarginUserDataStreamResponse");
let resp = client
.start_isolated_margin_user_data_stream(params)
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn start_isolated_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
let params =
StartIsolatedMarginUserDataStreamParams::builder("symbol_example".to_string())
.build()
.unwrap();
match client.start_isolated_margin_user_data_stream(params).await {
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
#[test]
fn start_margin_user_data_stream_required_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let expected_response: models::StartMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartMarginUserDataStreamResponse");
let resp = client
.start_margin_user_data_stream()
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn start_margin_user_data_stream_optional_params_success() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: false };
let resp_json: Value = serde_json::from_str(
r#"{"listenKey":"T3ee22BIYuWqmvne0HNq2A2WsFlEtLhvWCtItw6ffhhdmjifQ2tRbuKkTHhr"}"#,
)
.unwrap();
let expected_response: models::StartMarginUserDataStreamResponse =
serde_json::from_value(resp_json.clone())
.expect("should parse into models::StartMarginUserDataStreamResponse");
let resp = client
.start_margin_user_data_stream()
.await
.expect("Expected a response");
let data_future = resp.data();
let actual_response = data_future.await.unwrap();
assert_eq!(actual_response, expected_response);
});
}
#[test]
fn start_margin_user_data_stream_response_error() {
TOKIO_SHARED_RT.block_on(async {
let client = MockTradeDataStreamApiClient { force_error: true };
match client.start_margin_user_data_stream().await {
Ok(_) => panic!("Expected an error"),
Err(err) => {
assert_eq!(err.to_string(), "Connector client error: ResponseError");
}
}
});
}
}