#![allow(unused_imports)]
use async_trait::async_trait;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use crate::common::{
models::ParamBuildError,
utils::replace_websocket_streams_placeholders,
websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
};
use crate::derivatives_trading_options::websocket_streams::models;
use crate::models::StreamId;
#[async_trait]
pub trait PublicApi: Send + Sync {
async fn diff_book_depth_streams(
&self,
params: DiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthStreamsResponse>>>;
async fn individual_symbol_book_ticker_streams(
&self,
params: IndividualSymbolBookTickerStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::IndividualSymbolBookTickerStreamsResponse>>>;
async fn partial_book_depth_streams(
&self,
params: PartialBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthStreamsResponse>>>;
async fn ticker24_hour(
&self,
params: Ticker24HourParams,
) -> anyhow::Result<Arc<WebsocketStream<models::Ticker24HourResponse>>>;
async fn trade_streams(
&self,
params: TradeStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TradeStreamsResponse>>>;
}
pub struct PublicApiClient {
websocket_streams_base: Arc<WebsocketStreams>,
}
impl PublicApiClient {
pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
Self {
websocket_streams_base,
}
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct DiffBookDepthStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<u32>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl DiffBookDepthStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> DiffBookDepthStreamsParamsBuilder {
DiffBookDepthStreamsParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct IndividualSymbolBookTickerStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<u32>,
}
impl IndividualSymbolBookTickerStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> IndividualSymbolBookTickerStreamsParamsBuilder {
IndividualSymbolBookTickerStreamsParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct PartialBookDepthStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub level: String,
#[builder(setter(into), default)]
pub id: Option<u32>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl PartialBookDepthStreamsParams {
#[must_use]
pub fn builder(symbol: String, level: String) -> PartialBookDepthStreamsParamsBuilder {
PartialBookDepthStreamsParamsBuilder::default()
.symbol(symbol)
.level(level)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct Ticker24HourParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<u32>,
}
impl Ticker24HourParams {
#[must_use]
pub fn builder(symbol: String) -> Ticker24HourParamsBuilder {
Ticker24HourParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct TradeStreamsParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<u32>,
}
impl TradeStreamsParams {
#[must_use]
pub fn builder(symbol: String) -> TradeStreamsParamsBuilder {
TradeStreamsParamsBuilder::default().symbol(symbol)
}
}
#[async_trait]
impl PublicApi for PublicApiClient {
async fn diff_book_depth_streams(
&self,
params: DiffBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthStreamsResponse>>> {
let DiffBookDepthStreamsParams {
symbol,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
Ok(
create_stream_handler::<models::DiffBookDepthStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn individual_symbol_book_ticker_streams(
&self,
params: IndividualSymbolBookTickerStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::IndividualSymbolBookTickerStreamsResponse>>>
{
let IndividualSymbolBookTickerStreamsParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
Ok(
create_stream_handler::<models::IndividualSymbolBookTickerStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn partial_book_depth_streams(
&self,
params: PartialBookDepthStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthStreamsResponse>>> {
let PartialBookDepthStreamsParams {
symbol,
level,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("level", Some(level.clone())),
("id", id.map(|v| v.to_string())),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth<level>@<updateSpeed>", &vars);
Ok(
create_stream_handler::<models::PartialBookDepthStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await,
)
}
async fn ticker24_hour(
&self,
params: Ticker24HourParams,
) -> anyhow::Result<Arc<WebsocketStream<models::Ticker24HourResponse>>> {
let Ticker24HourParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTicker", &vars);
Ok(create_stream_handler::<models::Ticker24HourResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await)
}
async fn trade_streams(
&self,
params: TradeStreamsParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TradeStreamsResponse>>> {
let TradeStreamsParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTrade", &vars);
Ok(create_stream_handler::<models::TradeStreamsResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
Some("public".to_string()),
)
.await)
}
}
#[cfg(all(test, feature = "derivatives_trading_options"))]
mod tests {
use super::*;
use crate::TOKIO_SHARED_RT;
use crate::{
common::websocket::{WebsocketConnection, WebsocketHandler},
config::ConfigurationWebsocketStreams,
};
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::yield_now;
async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
let conn = WebsocketConnection::new("test");
let config = ConfigurationWebsocketStreams::builder()
.build()
.expect("Failed to build configuration");
let streams_base =
WebsocketStreams::new(config, vec![conn.clone()], vec!["public".to_string()]);
{
let mut st = conn.state.lock().await;
st.url_path = Some("public".to_string());
}
conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
.await;
(streams_base, conn)
}
#[test]
fn diff_book_depth_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string())
.id(Some(id))
.build()
.unwrap();
let DiffBookDepthStreamsParams {
symbol,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api
.diff_book_depth_streams(params)
.await
.expect("diff_book_depth_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Number(123456u32)));
});
}
#[test]
fn diff_book_depth_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let DiffBookDepthStreamsParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1762866729459,"T":1762866729358,"s":"BTC-251123-126000-C","U":465,"u":465,"pu":464,"b":[["1100.000","0.6000"]],"a":[["1300.000","0.6000"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn diff_book_depth_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = DiffBookDepthStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let DiffBookDepthStreamsParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1762866729459,"T":1762866729358,"s":"BTC-251123-126000-C","U":465,"u":465,"pu":464,"b":[["1100.000","0.6000"]],"a":[["1300.000","0.6000"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string())
.id(Some(id))
.build()
.unwrap();
let IndividualSymbolBookTickerStreamsParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api
.individual_symbol_book_ticker_streams(params)
.await
.expect("individual_symbol_book_ticker_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Number(123456u32)));
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let IndividualSymbolBookTickerStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.individual_symbol_book_ticker_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::IndividualSymbolBookTickerStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":2472,"s":"BTC-251226-110000-C","b":"5000.000","B":"0.2000","a":"5100.000","A":"0.1000","T":1763041762942,"E":1763041762942}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn individual_symbol_book_ticker_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = IndividualSymbolBookTickerStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let IndividualSymbolBookTickerStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.individual_symbol_book_ticker_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::IndividualSymbolBookTickerStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"bookTicker","u":2472,"s":"BTC-251226-110000-C","b":"5000.000","B":"0.2000","a":"5100.000","A":"0.1000","T":1763041762942,"E":1763041762942}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn partial_book_depth_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = PartialBookDepthStreamsParams::builder(
"btcusdt".to_string(),
"example_value".to_string(),
)
.id(Some(id))
.build()
.unwrap();
let PartialBookDepthStreamsParams {
symbol,
level,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("level", Some(level.clone())),
("id", id.map(|v| v.to_string())),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders(
"/<symbol>@depth<level>@<updateSpeed>",
&vars,
);
let ws_stream = api
.partial_book_depth_streams(params)
.await
.expect("partial_book_depth_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Number(123456u32)));
});
}
#[test]
fn partial_book_depth_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = PartialBookDepthStreamsParams::builder("btcusdt".to_string(),"example_value".to_string(),).id(Some(id)).build().unwrap();
let PartialBookDepthStreamsParams {
symbol,level,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("level",
Some(level.clone())
),
("id",
id.map(|v| v.to_string())
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth<level>@<updateSpeed>", &vars);
let ws_stream = api.partial_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1762866729459,"T":1762866729358,"s":"BTC-251123-126000-C","U":465,"u":465,"pu":464,"b":[["1100.000","0.6000"]],"a":[["1300.000","0.6000"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn partial_book_depth_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = PartialBookDepthStreamsParams::builder("btcusdt".to_string(),"example_value".to_string(),).id(Some(id)).build().unwrap();
let PartialBookDepthStreamsParams {
symbol,level,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("level",
Some(level.clone())
),
("id",
id.map(|v| v.to_string())
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth<level>@<updateSpeed>", &vars);
let ws_stream = api.partial_book_depth_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1762866729459,"T":1762866729358,"s":"BTC-251123-126000-C","U":465,"u":465,"pu":464,"b":[["1100.000","0.6000"]],"a":[["1300.000","0.6000"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn ticker24_hour_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = Ticker24HourParams::builder("btcusdt".to_string())
.id(Some(id))
.build()
.unwrap();
let Ticker24HourParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTicker", &vars);
let ws_stream = api
.ticker24_hour(params)
.await
.expect("ticker24_hour should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Number(123456u32)));
});
}
#[test]
fn ticker24_hour_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = Ticker24HourParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let Ticker24HourParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTicker", &vars);
let ws_stream = api.ticker24_hour(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::Ticker24HourResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1764080707933,"s":"ETH-251226-3000-C","p":"0.0000","P":"0.00","w":"200.0000","c":"200.0000","Q":"1.0000","o":"200.0000","h":"200.0000","l":"200.0000","v":"9.0000","q":"1800.0000","O":1764051060000,"C":1764080707933,"F":1,"L":22,"n":9}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn ticker24_hour_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = Ticker24HourParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let Ticker24HourParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTicker", &vars);
let ws_stream = api.ticker24_hour(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::Ticker24HourResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1764080707933,"s":"ETH-251226-3000-C","p":"0.0000","P":"0.00","w":"200.0000","c":"200.0000","Q":"1.0000","o":"200.0000","h":"200.0000","l":"200.0000","v":"9.0000","q":"1800.0000","O":1764051060000,"C":1764080707933,"F":1,"L":22,"n":9}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn trade_streams_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = TradeStreamsParams::builder("btcusdt".to_string())
.id(Some(id))
.build()
.unwrap();
let TradeStreamsParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.map(|v| v.to_string())),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTrade", &vars);
let ws_stream = api
.trade_streams(params)
.await
.expect("trade_streams should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Number(123456u32)));
});
}
#[test]
fn trade_streams_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = TradeStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let TradeStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTrade", &vars);
let ws_stream = api.trade_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::TradeStreamsResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1762856064204,"T":1762856064203,"s":"BTC-251123-126000-C","t":4,"p":"1300.000","q":"0.1000","X":"MARKET","S":"BUY","m":false}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn trade_streams_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = PublicApiClient::new(streams_base.clone());
let id = 123456u32;
let params = TradeStreamsParams::builder("btcusdt".to_string(),).id(Some(id)).build().unwrap();
let TradeStreamsParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.map(|v| v.to_string())
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@optionTrade", &vars);
let ws_stream = api.trade_streams(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::TradeStreamsResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1762856064204,"T":1762856064203,"s":"BTC-251123-126000-C","t":4,"p":"1300.000","q":"0.1000","X":"MARKET","S":"BUY","m":false}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
}