#![allow(unused_imports)]
use async_trait::async_trait;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use crate::common::{
models::ParamBuildError,
utils::replace_websocket_streams_placeholders,
websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
};
use crate::derivatives_trading_usds_futures::websocket_streams::models;
use crate::models::StreamId;
#[async_trait]
pub trait PublicApi: Send + Sync {
async fn all_book_tickers_stream(
&self,
params: AllBookTickersStreamParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AllBookTickersStreamResponse>>>;
async fn diff_book_depth_streams(
&self,
params: DiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthStreamsResponse>>>;
async fn individual_symbol_book_ticker_streams(
&self,
params: IndividualSymbolBookTickerStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::IndividualSymbolBookTickerStreamsResponse>>>;
async fn partial_book_depth_streams(
&self,
params: PartialBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthStreamsResponse>>>;
async fn rpi_diff_book_depth_streams(
&self,
params: RpiDiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::RpiDiffBookDepthStreamsResponse>>>;
}
pub struct PublicApiClient {
websocket_streams_base: Arc<WebsocketStreams>,
}
impl PublicApiClient {
pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
Self {
websocket_streams_base,
}
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllBookTickersStreamParams {
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl AllBookTickersStreamParams {
#[must_use]
pub fn builder() -> AllBookTickersStreamParamsBuilder {
AllBookTickersStreamParamsBuilder::default()
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct DiffBookDepthStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl DiffBookDepthStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> DiffBookDepthStreamsParamsBuilder {
DiffBookDepthStreamsParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct IndividualSymbolBookTickerStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl IndividualSymbolBookTickerStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> IndividualSymbolBookTickerStreamsParamsBuilder {
IndividualSymbolBookTickerStreamsParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct PartialBookDepthStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub levels: i64,
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl PartialBookDepthStreamsParams {
#[must_use]
pub fn builder(symbol: String, levels: i64) -> PartialBookDepthStreamsParamsBuilder {
PartialBookDepthStreamsParamsBuilder::default()
.symbol(symbol)
.levels(levels)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct RpiDiffBookDepthStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl RpiDiffBookDepthStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> RpiDiffBookDepthStreamsParamsBuilder {
RpiDiffBookDepthStreamsParamsBuilder::default().symbol(symbol)
}
}
#[async_trait]
impl PublicApi for PublicApiClient {
async fn all_book_tickers_stream(
&self,
params: AllBookTickersStreamParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AllBookTickersStreamResponse>>> {
let AllBookTickersStreamParams { id } = params;
let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/!bookTicker", &vars);
Ok(
create_stream_handler::<models::AllBookTickersStreamResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn diff_book_depth_streams(
&self,
params: DiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthStreamsResponse>>> {
let DiffBookDepthStreamsParams {
symbol,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
Ok(
create_stream_handler::<models::DiffBookDepthStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn individual_symbol_book_ticker_streams(
&self,
params: IndividualSymbolBookTickerStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::IndividualSymbolBookTickerStreamsResponse>>>
{
let IndividualSymbolBookTickerStreamsParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
Ok(
create_stream_handler::<models::IndividualSymbolBookTickerStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn partial_book_depth_streams(
&self,
params: PartialBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthStreamsResponse>>> {
let PartialBookDepthStreamsParams {
symbol,
levels,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
Ok(
create_stream_handler::<models::PartialBookDepthStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn rpi_diff_book_depth_streams(
&self,
params: RpiDiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::RpiDiffBookDepthStreamsResponse>>> {
let RpiDiffBookDepthStreamsParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@rpiDepth@500ms", &vars);
Ok(
create_stream_handler::<models::RpiDiffBookDepthStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
}
#[cfg(all(test, feature = "derivatives_trading_usds_futures"))]
mod tests {
use super::*;
use crate::TOKIO_SHARED_RT;
use crate::{
common::websocket::{WebsocketConnection, WebsocketHandler},
config::ConfigurationWebsocketStreams,
};
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::yield_now;
async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
let conn = WebsocketConnection::new("test");
let config = ConfigurationWebsocketStreams::builder()
.build()
.expect("Failed to build configuration");
let streams_base =
WebsocketStreams::new(config, vec![conn.clone()], vec!["public".to_string()]);
{
let mut st = conn.state.lock().await;
st.url_path = Some("public".to_string());
}
conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
.await;
(streams_base, conn)
}
#[test]
fn all_book_tickers_stream_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllBookTickersStreamParams::builder()
.id(Some(id.clone()))
.build()
.unwrap();
let AllBookTickersStreamParams { id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!bookTicker", &vars);
let ws_stream = api
.all_book_tickers_stream(params)
.await
.expect("all_book_tickers_stream should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn all_book_tickers_stream_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllBookTickersStreamParams::builder().id(Some(id.clone())).build().unwrap();
let AllBookTickersStreamParams {
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!bookTicker", &vars);
let ws_stream = api.all_book_tickers_stream(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::AllBookTickersStreamResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":400900217,"E":1568014460893,"T":1568014460891,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn all_book_tickers_stream_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllBookTickersStreamParams::builder().id(Some(id.clone())).build().unwrap();
let AllBookTickersStreamParams {
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!bookTicker", &vars);
let ws_stream = api.all_book_tickers_stream(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::AllBookTickersStreamResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":400900217,"E":1568014460893,"T":1568014460891,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn diff_book_depth_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let DiffBookDepthStreamsParams {
symbol,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api
.diff_book_depth_streams(params)
.await
.expect("diff_book_depth_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn diff_book_depth_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let DiffBookDepthStreamsParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":123456789,"T":123456788,"s":"BTCUSDT","U":157,"u":160,"pu":149,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn diff_book_depth_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let DiffBookDepthStreamsParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":123456789,"T":123456788,"s":"BTCUSDT","U":157,"u":160,"pu":149,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let IndividualSymbolBookTickerStreamsParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api
.individual_symbol_book_ticker_streams(params)
.await
.expect("individual_symbol_book_ticker_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let IndividualSymbolBookTickerStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.individual_symbol_book_ticker_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::IndividualSymbolBookTickerStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":400900217,"E":1568014460893,"T":1568014460891,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let IndividualSymbolBookTickerStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.individual_symbol_book_ticker_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::IndividualSymbolBookTickerStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":400900217,"E":1568014460893,"T":1568014460891,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn partial_book_depth_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthStreamsParams::builder("btcusdt".to_string(), 10)
.id(Some(id.clone()))
.build()
.unwrap();
let PartialBookDepthStreamsParams {
symbol,
levels,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders(
"/<symbol>@depth<levels>@<updateSpeed>",
&vars,
);
let ws_stream = api
.partial_book_depth_streams(params)
.await
.expect("partial_book_depth_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn partial_book_depth_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthStreamsParams::builder("btcusdt".to_string(),10,).id(Some(id.clone())).build().unwrap();
let PartialBookDepthStreamsParams {
symbol,levels,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("levels",
Some(levels.to_string().to_string())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
let ws_stream = api.partial_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1571889248277,"T":1571889248276,"s":"BTCUSDT","U":390497796,"u":390497878,"pu":390497794,"b":[["7403.89","0.002"],["7403.90","3.906"],["7404.00","1.428"],["7404.85","5.239"],["7405.43","2.562"]],"a":[["7405.96","3.340"],["7406.63","4.525"],["7407.08","2.475"],["7407.15","4.800"],["7407.20","0.175"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn partial_book_depth_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthStreamsParams::builder("btcusdt".to_string(),10,).id(Some(id.clone())).build().unwrap();
let PartialBookDepthStreamsParams {
symbol,levels,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("levels",
Some(levels.to_string().to_string())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
let ws_stream = api.partial_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1571889248277,"T":1571889248276,"s":"BTCUSDT","U":390497796,"u":390497878,"pu":390497794,"b":[["7403.89","0.002"],["7403.90","3.906"],["7404.00","1.428"],["7404.85","5.239"],["7405.43","2.562"]],"a":[["7405.96","3.340"],["7406.63","4.525"],["7407.08","2.475"],["7407.15","4.800"],["7407.20","0.175"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn rpi_diff_book_depth_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RpiDiffBookDepthStreamsParams::builder("btcusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let RpiDiffBookDepthStreamsParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@rpiDepth@500ms", &vars);
let ws_stream = api
.rpi_diff_book_depth_streams(params)
.await
.expect("rpi_diff_book_depth_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn rpi_diff_book_depth_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RpiDiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let RpiDiffBookDepthStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@rpiDepth@500ms", &vars);
let ws_stream = api.rpi_diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::RpiDiffBookDepthStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":123456789,"T":123456788,"s":"BTCUSDT","U":157,"u":160,"pu":149,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn rpi_diff_book_depth_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RpiDiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let RpiDiffBookDepthStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@rpiDepth@500ms", &vars);
let ws_stream = api.rpi_diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::RpiDiffBookDepthStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":123456789,"T":123456788,"s":"BTCUSDT","U":157,"u":160,"pu":149,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
}