#![allow(unused_imports)]
use async_trait::async_trait;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use crate::common::{
models::ParamBuildError,
utils::replace_websocket_streams_placeholders,
websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
};
use crate::models::StreamId;
use crate::spot::websocket_streams::models;
#[async_trait]
pub trait WebSocketStreamsApi: Send + Sync {
async fn agg_trade(
&self,
params: AggTradeParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>>;
async fn all_market_rolling_window_ticker(
&self,
params: AllMarketRollingWindowTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>;
async fn all_mini_ticker(
&self,
params: AllMiniTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>>;
async fn avg_price(
&self,
params: AvgPriceParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>>;
async fn book_ticker(
&self,
params: BookTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>>;
async fn diff_book_depth(
&self,
params: DiffBookDepthParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>>;
async fn kline(
&self,
params: KlineParams,
) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>>;
async fn kline_offset(
&self,
params: KlineOffsetParams,
) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>>;
async fn mini_ticker(
&self,
params: MiniTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>>;
async fn partial_book_depth(
&self,
params: PartialBookDepthParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>>;
async fn reference_price(
&self,
params: ReferencePriceParams,
) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>>;
async fn rolling_window_ticker(
&self,
params: RollingWindowTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>>;
async fn ticker(
&self,
params: TickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>>;
async fn trade(
&self,
params: TradeParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>>;
}
pub struct WebSocketStreamsApiClient {
websocket_streams_base: Arc<WebsocketStreams>,
}
impl WebSocketStreamsApiClient {
pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
Self {
websocket_streams_base,
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AllMarketRollingWindowTickerWindowSizeEnum {
#[serde(rename = "1h")]
WindowSize1h,
#[serde(rename = "4h")]
WindowSize4h,
#[serde(rename = "1d")]
WindowSize1d,
}
impl AllMarketRollingWindowTickerWindowSizeEnum {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::WindowSize1h => "1h",
Self::WindowSize4h => "4h",
Self::WindowSize1d => "1d",
}
}
}
impl std::str::FromStr for AllMarketRollingWindowTickerWindowSizeEnum {
type Err = Box<dyn std::error::Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1h" => Ok(Self::WindowSize1h),
"4h" => Ok(Self::WindowSize4h),
"1d" => Ok(Self::WindowSize1d),
other => Err(format!(
"invalid AllMarketRollingWindowTickerWindowSizeEnum: {}",
other
)
.into()),
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KlineIntervalEnum {
#[serde(rename = "1s")]
Interval1s,
#[serde(rename = "1m")]
Interval1m,
#[serde(rename = "3m")]
Interval3m,
#[serde(rename = "5m")]
Interval5m,
#[serde(rename = "15m")]
Interval15m,
#[serde(rename = "30m")]
Interval30m,
#[serde(rename = "1h")]
Interval1h,
#[serde(rename = "2h")]
Interval2h,
#[serde(rename = "4h")]
Interval4h,
#[serde(rename = "6h")]
Interval6h,
#[serde(rename = "8h")]
Interval8h,
#[serde(rename = "12h")]
Interval12h,
#[serde(rename = "1d")]
Interval1d,
#[serde(rename = "3d")]
Interval3d,
#[serde(rename = "1w")]
Interval1w,
#[serde(rename = "1M")]
Interval1M,
}
impl KlineIntervalEnum {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::Interval1s => "1s",
Self::Interval1m => "1m",
Self::Interval3m => "3m",
Self::Interval5m => "5m",
Self::Interval15m => "15m",
Self::Interval30m => "30m",
Self::Interval1h => "1h",
Self::Interval2h => "2h",
Self::Interval4h => "4h",
Self::Interval6h => "6h",
Self::Interval8h => "8h",
Self::Interval12h => "12h",
Self::Interval1d => "1d",
Self::Interval3d => "3d",
Self::Interval1w => "1w",
Self::Interval1M => "1M",
}
}
}
impl std::str::FromStr for KlineIntervalEnum {
type Err = Box<dyn std::error::Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1s" => Ok(Self::Interval1s),
"1m" => Ok(Self::Interval1m),
"3m" => Ok(Self::Interval3m),
"5m" => Ok(Self::Interval5m),
"15m" => Ok(Self::Interval15m),
"30m" => Ok(Self::Interval30m),
"1h" => Ok(Self::Interval1h),
"2h" => Ok(Self::Interval2h),
"4h" => Ok(Self::Interval4h),
"6h" => Ok(Self::Interval6h),
"8h" => Ok(Self::Interval8h),
"12h" => Ok(Self::Interval12h),
"1d" => Ok(Self::Interval1d),
"3d" => Ok(Self::Interval3d),
"1w" => Ok(Self::Interval1w),
"1M" => Ok(Self::Interval1M),
other => Err(format!("invalid KlineIntervalEnum: {}", other).into()),
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KlineOffsetIntervalEnum {
#[serde(rename = "1s")]
Interval1s,
#[serde(rename = "1m")]
Interval1m,
#[serde(rename = "3m")]
Interval3m,
#[serde(rename = "5m")]
Interval5m,
#[serde(rename = "15m")]
Interval15m,
#[serde(rename = "30m")]
Interval30m,
#[serde(rename = "1h")]
Interval1h,
#[serde(rename = "2h")]
Interval2h,
#[serde(rename = "4h")]
Interval4h,
#[serde(rename = "6h")]
Interval6h,
#[serde(rename = "8h")]
Interval8h,
#[serde(rename = "12h")]
Interval12h,
#[serde(rename = "1d")]
Interval1d,
#[serde(rename = "3d")]
Interval3d,
#[serde(rename = "1w")]
Interval1w,
#[serde(rename = "1M")]
Interval1M,
}
impl KlineOffsetIntervalEnum {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::Interval1s => "1s",
Self::Interval1m => "1m",
Self::Interval3m => "3m",
Self::Interval5m => "5m",
Self::Interval15m => "15m",
Self::Interval30m => "30m",
Self::Interval1h => "1h",
Self::Interval2h => "2h",
Self::Interval4h => "4h",
Self::Interval6h => "6h",
Self::Interval8h => "8h",
Self::Interval12h => "12h",
Self::Interval1d => "1d",
Self::Interval3d => "3d",
Self::Interval1w => "1w",
Self::Interval1M => "1M",
}
}
}
impl std::str::FromStr for KlineOffsetIntervalEnum {
type Err = Box<dyn std::error::Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1s" => Ok(Self::Interval1s),
"1m" => Ok(Self::Interval1m),
"3m" => Ok(Self::Interval3m),
"5m" => Ok(Self::Interval5m),
"15m" => Ok(Self::Interval15m),
"30m" => Ok(Self::Interval30m),
"1h" => Ok(Self::Interval1h),
"2h" => Ok(Self::Interval2h),
"4h" => Ok(Self::Interval4h),
"6h" => Ok(Self::Interval6h),
"8h" => Ok(Self::Interval8h),
"12h" => Ok(Self::Interval12h),
"1d" => Ok(Self::Interval1d),
"3d" => Ok(Self::Interval3d),
"1w" => Ok(Self::Interval1w),
"1M" => Ok(Self::Interval1M),
other => Err(format!("invalid KlineOffsetIntervalEnum: {}", other).into()),
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartialBookDepthLevelsEnum {
#[serde(rename = "5")]
Levels5,
#[serde(rename = "10")]
Levels10,
#[serde(rename = "20")]
Levels20,
}
impl PartialBookDepthLevelsEnum {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::Levels5 => "5",
Self::Levels10 => "10",
Self::Levels20 => "20",
}
}
}
impl std::str::FromStr for PartialBookDepthLevelsEnum {
type Err = Box<dyn std::error::Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"5" => Ok(Self::Levels5),
"10" => Ok(Self::Levels10),
"20" => Ok(Self::Levels20),
other => Err(format!("invalid PartialBookDepthLevelsEnum: {}", other).into()),
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RollingWindowTickerWindowSizeEnum {
#[serde(rename = "1h")]
WindowSize1h,
#[serde(rename = "4h")]
WindowSize4h,
#[serde(rename = "1d")]
WindowSize1d,
}
impl RollingWindowTickerWindowSizeEnum {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::WindowSize1h => "1h",
Self::WindowSize4h => "4h",
Self::WindowSize1d => "1d",
}
}
}
impl std::str::FromStr for RollingWindowTickerWindowSizeEnum {
type Err = Box<dyn std::error::Error + Send + Sync>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1h" => Ok(Self::WindowSize1h),
"4h" => Ok(Self::WindowSize4h),
"1d" => Ok(Self::WindowSize1d),
other => Err(format!("invalid RollingWindowTickerWindowSizeEnum: {}", other).into()),
}
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AggTradeParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl AggTradeParams {
#[must_use]
pub fn builder(symbol: String) -> AggTradeParamsBuilder {
AggTradeParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllMarketRollingWindowTickerParams {
#[builder(setter(into))]
pub window_size: AllMarketRollingWindowTickerWindowSizeEnum,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl AllMarketRollingWindowTickerParams {
#[must_use]
pub fn builder(
window_size: AllMarketRollingWindowTickerWindowSizeEnum,
) -> AllMarketRollingWindowTickerParamsBuilder {
AllMarketRollingWindowTickerParamsBuilder::default().window_size(window_size)
}
}
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllMiniTickerParams {
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl AllMiniTickerParams {
#[must_use]
pub fn builder() -> AllMiniTickerParamsBuilder {
AllMiniTickerParamsBuilder::default()
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AvgPriceParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl AvgPriceParams {
#[must_use]
pub fn builder(symbol: String) -> AvgPriceParamsBuilder {
AvgPriceParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct BookTickerParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl BookTickerParams {
#[must_use]
pub fn builder(symbol: String) -> BookTickerParamsBuilder {
BookTickerParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct DiffBookDepthParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl DiffBookDepthParams {
#[must_use]
pub fn builder(symbol: String) -> DiffBookDepthParamsBuilder {
DiffBookDepthParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KlineParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub interval: KlineIntervalEnum,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl KlineParams {
#[must_use]
pub fn builder(symbol: String, interval: KlineIntervalEnum) -> KlineParamsBuilder {
KlineParamsBuilder::default()
.symbol(symbol)
.interval(interval)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KlineOffsetParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub interval: KlineOffsetIntervalEnum,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl KlineOffsetParams {
#[must_use]
pub fn builder(symbol: String, interval: KlineOffsetIntervalEnum) -> KlineOffsetParamsBuilder {
KlineOffsetParamsBuilder::default()
.symbol(symbol)
.interval(interval)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct MiniTickerParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl MiniTickerParams {
#[must_use]
pub fn builder(symbol: String) -> MiniTickerParamsBuilder {
MiniTickerParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct PartialBookDepthParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub levels: PartialBookDepthLevelsEnum,
#[builder(setter(into), default)]
pub id: Option<String>,
#[builder(setter(into), default)]
pub update_speed: Option<String>,
}
impl PartialBookDepthParams {
#[must_use]
pub fn builder(
symbol: String,
levels: PartialBookDepthLevelsEnum,
) -> PartialBookDepthParamsBuilder {
PartialBookDepthParamsBuilder::default()
.symbol(symbol)
.levels(levels)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct ReferencePriceParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl ReferencePriceParams {
#[must_use]
pub fn builder(symbol: String) -> ReferencePriceParamsBuilder {
ReferencePriceParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct RollingWindowTickerParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into))]
pub window_size: RollingWindowTickerWindowSizeEnum,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl RollingWindowTickerParams {
#[must_use]
pub fn builder(
symbol: String,
window_size: RollingWindowTickerWindowSizeEnum,
) -> RollingWindowTickerParamsBuilder {
RollingWindowTickerParamsBuilder::default()
.symbol(symbol)
.window_size(window_size)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct TickerParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl TickerParams {
#[must_use]
pub fn builder(symbol: String) -> TickerParamsBuilder {
TickerParamsBuilder::default().symbol(symbol)
}
}
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct TradeParams {
#[builder(setter(into))]
pub symbol: String,
#[builder(setter(into), default)]
pub id: Option<String>,
}
impl TradeParams {
#[must_use]
pub fn builder(symbol: String) -> TradeParamsBuilder {
TradeParamsBuilder::default().symbol(symbol)
}
}
#[async_trait]
impl WebSocketStreamsApi for WebSocketStreamsApiClient {
async fn agg_trade(
&self,
params: AggTradeParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
let AggTradeParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
Ok(create_stream_handler::<models::AggTradeResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn all_market_rolling_window_ticker(
&self,
params: AllMarketRollingWindowTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
{
let AllMarketRollingWindowTickerParams { window_size, id } = params;
let pairs: &[(&str, Option<String>)] = &[
("windowSize", Some(window_size.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
Ok(
create_stream_handler::<Vec<models::AllMarketRollingWindowTickerResponseInner>>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await,
)
}
async fn all_mini_ticker(
&self,
params: AllMiniTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
let AllMiniTickerParams { id } = params;
let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
Ok(
create_stream_handler::<Vec<models::AllMiniTickerResponseInner>>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await,
)
}
async fn avg_price(
&self,
params: AvgPriceParams,
) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
let AvgPriceParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
Ok(create_stream_handler::<models::AvgPriceResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn book_ticker(
&self,
params: BookTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
let BookTickerParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
Ok(create_stream_handler::<models::BookTickerResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn diff_book_depth(
&self,
params: DiffBookDepthParams,
) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
let DiffBookDepthParams {
symbol,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
Ok(create_stream_handler::<models::DiffBookDepthResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn kline(
&self,
params: KlineParams,
) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
let KlineParams {
symbol,
interval,
id,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("interval", Some(interval.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
Ok(create_stream_handler::<models::KlineResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn kline_offset(
&self,
params: KlineOffsetParams,
) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
let KlineOffsetParams {
symbol,
interval,
id,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("interval", Some(interval.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream =
replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
Ok(create_stream_handler::<models::KlineOffsetResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn mini_ticker(
&self,
params: MiniTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
let MiniTickerParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
Ok(create_stream_handler::<models::MiniTickerResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn partial_book_depth(
&self,
params: PartialBookDepthParams,
) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
let PartialBookDepthParams {
symbol,
levels,
id,
update_speed,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.as_str().to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
Ok(create_stream_handler::<models::PartialBookDepthResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn reference_price(
&self,
params: ReferencePriceParams,
) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>> {
let ReferencePriceParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
Ok(create_stream_handler::<models::ReferencePriceResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn rolling_window_ticker(
&self,
params: RollingWindowTickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
let RollingWindowTickerParams {
symbol,
window_size,
id,
} = params;
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("windowSize", Some(window_size.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
Ok(
create_stream_handler::<models::RollingWindowTickerResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await,
)
}
async fn ticker(
&self,
params: TickerParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
let TickerParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
Ok(create_stream_handler::<models::TickerResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
async fn trade(
&self,
params: TradeParams,
) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
let TradeParams { symbol, id } = params;
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
Ok(create_stream_handler::<models::TradeResponse>(
WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
stream,
id_opt.map(|s| {
if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
if let Ok(n) = s.parse::<u32>() {
return StreamId::Number(n);
}
}
StreamId::Str(s)
}),
None,
)
.await)
}
}
#[cfg(all(test, feature = "spot"))]
mod tests {
use super::*;
use crate::TOKIO_SHARED_RT;
use crate::{
common::websocket::{WebsocketConnection, WebsocketHandler},
config::ConfigurationWebsocketStreams,
};
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::yield_now;
async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
let conn = WebsocketConnection::new("test");
let config = ConfigurationWebsocketStreams::builder()
.build()
.expect("Failed to build configuration");
let streams_base = WebsocketStreams::new(config, vec![conn.clone()], vec![]);
conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
.await;
(streams_base, conn)
}
#[test]
fn agg_trade_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AggTradeParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let AggTradeParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
let ws_stream = api
.agg_trade(params)
.await
.expect("agg_trade should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn agg_trade_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let AggTradeParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
let ws_stream = api.agg_trade(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::AggTradeResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn agg_trade_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let AggTradeParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
let ws_stream = api.agg_trade(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::AggTradeResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn all_market_rolling_window_ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMarketRollingWindowTickerParams::builder(
AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,
)
.id(Some(id.clone()))
.build()
.unwrap();
let AllMarketRollingWindowTickerParams { window_size, id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("windowSize", Some(window_size.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
let ws_stream = api
.all_market_rolling_window_ticker(params)
.await
.expect("all_market_rolling_window_ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn all_market_rolling_window_ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
let AllMarketRollingWindowTickerParams {
window_size,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("windowSize",
Some(window_size.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn all_market_rolling_window_ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
let AllMarketRollingWindowTickerParams {
window_size,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("windowSize",
Some(window_size.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn all_mini_ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMiniTickerParams::builder()
.id(Some(id.clone()))
.build()
.unwrap();
let AllMiniTickerParams { id } = params.clone();
let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
let ws_stream = api
.all_mini_ticker(params)
.await
.expect("all_mini_ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn all_mini_ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
let AllMiniTickerParams {
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
let ws_stream = api.all_mini_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn all_mini_ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
let AllMiniTickerParams {
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
let ws_stream = api.all_mini_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn avg_price_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AvgPriceParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let AvgPriceParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
let ws_stream = api
.avg_price(params)
.await
.expect("avg_price should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn avg_price_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let AvgPriceParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
let ws_stream = api.avg_price(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn avg_price_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let AvgPriceParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
let ws_stream = api.avg_price(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn book_ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = BookTickerParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let BookTickerParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api
.book_ticker(params)
.await
.expect("book_ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn book_ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let BookTickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.book_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::BookTickerResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn book_ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let BookTickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
let ws_stream = api.book_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::BookTickerResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn diff_book_depth_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let DiffBookDepthParams {
symbol,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api
.diff_book_depth(params)
.await
.expect("diff_book_depth should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn diff_book_depth_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let DiffBookDepthParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn diff_book_depth_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let DiffBookDepthParams {
symbol,id,update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
("updateSpeed",
update_speed.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
let ws_stream = api.diff_book_depth(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn kline_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineParams::builder("bnbusdt".to_string(), KlineIntervalEnum::Interval1s)
.id(Some(id.clone()))
.build()
.unwrap();
let KlineParams {
symbol,
interval,
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("interval", Some(interval.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
let ws_stream = api
.kline(params)
.await
.expect("kline should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn kline_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
let KlineParams {
symbol,interval,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("interval",
Some(interval.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
let ws_stream = api.kline(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::KlineResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn kline_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
let KlineParams {
symbol,interval,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("interval",
Some(interval.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
let ws_stream = api.kline(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::KlineResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn kline_offset_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineOffsetParams::builder(
"bnbusdt".to_string(),
KlineOffsetIntervalEnum::Interval1s,
)
.id(Some(id.clone()))
.build()
.unwrap();
let KlineOffsetParams {
symbol,
interval,
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("interval", Some(interval.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
let ws_stream = api
.kline_offset(params)
.await
.expect("kline_offset should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn kline_offset_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
let KlineOffsetParams {
symbol,interval,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("interval",
Some(interval.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
let ws_stream = api.kline_offset(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn kline_offset_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
let KlineOffsetParams {
symbol,interval,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("interval",
Some(interval.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
let ws_stream = api.kline_offset(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn mini_ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = MiniTickerParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let MiniTickerParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
let ws_stream = api
.mini_ticker(params)
.await
.expect("mini_ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn mini_ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let MiniTickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
let ws_stream = api.mini_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn mini_ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let MiniTickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
let ws_stream = api.mini_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn partial_book_depth_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthParams::builder(
"bnbusdt".to_string(),
PartialBookDepthLevelsEnum::Levels5,
)
.id(Some(id.clone()))
.build()
.unwrap();
let PartialBookDepthParams {
symbol,
levels,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.as_str().to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders(
"/<symbol>@depth<levels>@<updateSpeed>",
&vars,
);
let ws_stream = api
.partial_book_depth(params)
.await
.expect("partial_book_depth should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn partial_book_depth_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthParams::builder(
"bnbusdt".to_string(),
PartialBookDepthLevelsEnum::Levels5,
)
.id(Some(id.clone()))
.build()
.unwrap();
let PartialBookDepthParams {
symbol,
levels,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.as_str().to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders(
"/<symbol>@depth<levels>@<updateSpeed>",
&vars,
);
let ws_stream = api.partial_book_depth(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(
r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
)
.unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(
called.load(Ordering::SeqCst),
"expected our callback to have been invoked"
);
});
}
#[test]
fn partial_book_depth_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = PartialBookDepthParams::builder(
"bnbusdt".to_string(),
PartialBookDepthLevelsEnum::Levels5,
)
.id(Some(id.clone()))
.build()
.unwrap();
let PartialBookDepthParams {
symbol,
levels,
id,
update_speed,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("levels", Some(levels.as_str().to_string())),
("id", id.clone()),
("updateSpeed", update_speed.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders(
"/<symbol>@depth<levels>@<updateSpeed>",
&vars,
);
let ws_stream = api.partial_book_depth(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(
streams_base.is_subscribed(&stream).await,
"should be subscribed before unsubscribe"
);
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(
r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
)
.unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(
!called.load(Ordering::SeqCst),
"callback should not be invoked after unsubscribe"
);
});
}
#[test]
fn reference_price_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = ReferencePriceParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let ReferencePriceParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
let ws_stream = api
.reference_price(params)
.await
.expect("reference_price should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn reference_price_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = ReferencePriceParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let ReferencePriceParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
let ws_stream = api.reference_price(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(
r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
)
.unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(
called.load(Ordering::SeqCst),
"expected our callback to have been invoked"
);
});
}
#[test]
fn reference_price_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = ReferencePriceParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let ReferencePriceParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
let ws_stream = api.reference_price(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(
streams_base.is_subscribed(&stream).await,
"should be subscribed before unsubscribe"
);
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(
r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
)
.unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(
!called.load(Ordering::SeqCst),
"callback should not be invoked after unsubscribe"
);
});
}
#[test]
fn rolling_window_ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RollingWindowTickerParams::builder(
"bnbusdt".to_string(),
RollingWindowTickerWindowSizeEnum::WindowSize1h,
)
.id(Some(id.clone()))
.build()
.unwrap();
let RollingWindowTickerParams {
symbol,
window_size,
id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol", Some(symbol.clone())),
("windowSize", Some(window_size.as_str().to_string())),
("id", id.clone()),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream =
replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
let ws_stream = api
.rolling_window_ticker(params)
.await
.expect("rolling_window_ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn rolling_window_ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
let RollingWindowTickerParams {
symbol,window_size,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("windowSize",
Some(window_size.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
let ws_stream = api.rolling_window_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn rolling_window_ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
let RollingWindowTickerParams {
symbol,window_size,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("windowSize",
Some(window_size.as_str().to_string())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
let ws_stream = api.rolling_window_ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn ticker_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TickerParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let TickerParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
let ws_stream = api
.ticker(params)
.await
.expect("ticker should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn ticker_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let TickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
let ws_stream = api.ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::TickerResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn ticker_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let TickerParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
let ws_stream = api.ticker(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::TickerResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
#[test]
fn trade_should_execute_successfully() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, _) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TradeParams::builder("bnbusdt".to_string())
.id(Some(id.clone()))
.build()
.unwrap();
let TradeParams { symbol, id } = params.clone();
let pairs: &[(&str, Option<String>)] =
&[("symbol", Some(symbol.clone())), ("id", id.clone())];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
let ws_stream = api
.trade(params)
.await
.expect("trade should return a WebsocketStream");
assert!(
streams_base.is_subscribed(&stream).await,
"expected stream '{stream}' to be subscribed"
);
assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
});
}
#[test]
fn trade_should_handle_incoming_message() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let TradeParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
let ws_stream = api.trade(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_with_message = called.clone();
ws_stream.on_message(move |_payload: models::TradeResponse| {
called_with_message.store(true, Ordering::SeqCst);
});
let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
});
}
#[test]
fn trade_should_not_fire_after_unsubscribe() {
TOKIO_SHARED_RT.block_on(async {
let (streams_base, conn) = make_streams_base().await;
let api = WebSocketStreamsApiClient::new(streams_base.clone());
let id = "test-id-123".to_string();
let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
let TradeParams {
symbol,id,
} = params.clone();
let pairs: &[(&str, Option<String>)] = &[
("symbol",
Some(symbol.clone())
),
("id",
id.clone()
),
];
let vars: HashMap<_, _> = pairs
.iter()
.filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
.collect();
let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
let ws_stream = api.trade(params).await.unwrap();
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
ws_stream.on_message(move |_payload: models::TradeResponse| {
called_clone.store(true, Ordering::SeqCst);
});
assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
ws_stream.unsubscribe().await;
let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
let msg = json!({
"stream": stream,
"data": payload,
});
streams_base.on_message(msg.to_string(), conn.clone()).await;
yield_now().await;
assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
});
}
}