1#![allow(unused_imports)]
20use async_trait::async_trait;
21use derive_builder::Builder;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::{collections::HashMap, sync::Arc};
25
26use crate::common::{
27 models::ParamBuildError,
28 utils::replace_websocket_streams_placeholders,
29 websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
30};
31use crate::models::StreamId;
32use crate::spot::websocket_streams::models;
33
34#[async_trait]
35pub trait WebSocketStreamsApi: Send + Sync {
36 async fn agg_trade(
37 &self,
38 params: AggTradeParams,
39 ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>>;
40 async fn all_market_rolling_window_ticker(
41 &self,
42 params: AllMarketRollingWindowTickerParams,
43 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>;
44 async fn all_mini_ticker(
45 &self,
46 params: AllMiniTickerParams,
47 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>>;
48 async fn avg_price(
49 &self,
50 params: AvgPriceParams,
51 ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>>;
52 async fn block_trade(
53 &self,
54 params: BlockTradeParams,
55 ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>>;
56 async fn book_ticker(
57 &self,
58 params: BookTickerParams,
59 ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>>;
60 async fn diff_book_depth(
61 &self,
62 params: DiffBookDepthParams,
63 ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>>;
64 async fn kline(
65 &self,
66 params: KlineParams,
67 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>>;
68 async fn kline_offset(
69 &self,
70 params: KlineOffsetParams,
71 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>>;
72 async fn mini_ticker(
73 &self,
74 params: MiniTickerParams,
75 ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>>;
76 async fn partial_book_depth(
77 &self,
78 params: PartialBookDepthParams,
79 ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>>;
80 async fn reference_price(
81 &self,
82 params: ReferencePriceParams,
83 ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>>;
84 async fn rolling_window_ticker(
85 &self,
86 params: RollingWindowTickerParams,
87 ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>>;
88 async fn ticker(
89 &self,
90 params: TickerParams,
91 ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>>;
92 async fn trade(
93 &self,
94 params: TradeParams,
95 ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>>;
96}
97
98pub struct WebSocketStreamsApiClient {
99 websocket_streams_base: Arc<WebsocketStreams>,
100}
101
102impl WebSocketStreamsApiClient {
103 pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
104 Self {
105 websocket_streams_base,
106 }
107 }
108}
109
110#[allow(non_camel_case_types)]
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum AllMarketRollingWindowTickerWindowSizeEnum {
113 #[serde(rename = "1h")]
114 WindowSize1h,
115 #[serde(rename = "4h")]
116 WindowSize4h,
117 #[serde(rename = "1d")]
118 WindowSize1d,
119}
120
121impl AllMarketRollingWindowTickerWindowSizeEnum {
122 #[must_use]
123 pub fn as_str(&self) -> &'static str {
124 match self {
125 Self::WindowSize1h => "1h",
126 Self::WindowSize4h => "4h",
127 Self::WindowSize1d => "1d",
128 }
129 }
130}
131
132impl std::str::FromStr for AllMarketRollingWindowTickerWindowSizeEnum {
133 type Err = Box<dyn std::error::Error + Send + Sync>;
134
135 fn from_str(s: &str) -> Result<Self, Self::Err> {
136 match s {
137 "1h" => Ok(Self::WindowSize1h),
138 "4h" => Ok(Self::WindowSize4h),
139 "1d" => Ok(Self::WindowSize1d),
140 other => Err(format!(
141 "invalid AllMarketRollingWindowTickerWindowSizeEnum: {}",
142 other
143 )
144 .into()),
145 }
146 }
147}
148
149#[allow(non_camel_case_types)]
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub enum KlineIntervalEnum {
152 #[serde(rename = "1s")]
153 Interval1s,
154 #[serde(rename = "1m")]
155 Interval1m,
156 #[serde(rename = "3m")]
157 Interval3m,
158 #[serde(rename = "5m")]
159 Interval5m,
160 #[serde(rename = "15m")]
161 Interval15m,
162 #[serde(rename = "30m")]
163 Interval30m,
164 #[serde(rename = "1h")]
165 Interval1h,
166 #[serde(rename = "2h")]
167 Interval2h,
168 #[serde(rename = "4h")]
169 Interval4h,
170 #[serde(rename = "6h")]
171 Interval6h,
172 #[serde(rename = "8h")]
173 Interval8h,
174 #[serde(rename = "12h")]
175 Interval12h,
176 #[serde(rename = "1d")]
177 Interval1d,
178 #[serde(rename = "3d")]
179 Interval3d,
180 #[serde(rename = "1w")]
181 Interval1w,
182 #[serde(rename = "1M")]
183 Interval1M,
184}
185
186impl KlineIntervalEnum {
187 #[must_use]
188 pub fn as_str(&self) -> &'static str {
189 match self {
190 Self::Interval1s => "1s",
191 Self::Interval1m => "1m",
192 Self::Interval3m => "3m",
193 Self::Interval5m => "5m",
194 Self::Interval15m => "15m",
195 Self::Interval30m => "30m",
196 Self::Interval1h => "1h",
197 Self::Interval2h => "2h",
198 Self::Interval4h => "4h",
199 Self::Interval6h => "6h",
200 Self::Interval8h => "8h",
201 Self::Interval12h => "12h",
202 Self::Interval1d => "1d",
203 Self::Interval3d => "3d",
204 Self::Interval1w => "1w",
205 Self::Interval1M => "1M",
206 }
207 }
208}
209
210impl std::str::FromStr for KlineIntervalEnum {
211 type Err = Box<dyn std::error::Error + Send + Sync>;
212
213 fn from_str(s: &str) -> Result<Self, Self::Err> {
214 match s {
215 "1s" => Ok(Self::Interval1s),
216 "1m" => Ok(Self::Interval1m),
217 "3m" => Ok(Self::Interval3m),
218 "5m" => Ok(Self::Interval5m),
219 "15m" => Ok(Self::Interval15m),
220 "30m" => Ok(Self::Interval30m),
221 "1h" => Ok(Self::Interval1h),
222 "2h" => Ok(Self::Interval2h),
223 "4h" => Ok(Self::Interval4h),
224 "6h" => Ok(Self::Interval6h),
225 "8h" => Ok(Self::Interval8h),
226 "12h" => Ok(Self::Interval12h),
227 "1d" => Ok(Self::Interval1d),
228 "3d" => Ok(Self::Interval3d),
229 "1w" => Ok(Self::Interval1w),
230 "1M" => Ok(Self::Interval1M),
231 other => Err(format!("invalid KlineIntervalEnum: {}", other).into()),
232 }
233 }
234}
235
236#[allow(non_camel_case_types)]
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum KlineOffsetIntervalEnum {
239 #[serde(rename = "1s")]
240 Interval1s,
241 #[serde(rename = "1m")]
242 Interval1m,
243 #[serde(rename = "3m")]
244 Interval3m,
245 #[serde(rename = "5m")]
246 Interval5m,
247 #[serde(rename = "15m")]
248 Interval15m,
249 #[serde(rename = "30m")]
250 Interval30m,
251 #[serde(rename = "1h")]
252 Interval1h,
253 #[serde(rename = "2h")]
254 Interval2h,
255 #[serde(rename = "4h")]
256 Interval4h,
257 #[serde(rename = "6h")]
258 Interval6h,
259 #[serde(rename = "8h")]
260 Interval8h,
261 #[serde(rename = "12h")]
262 Interval12h,
263 #[serde(rename = "1d")]
264 Interval1d,
265 #[serde(rename = "3d")]
266 Interval3d,
267 #[serde(rename = "1w")]
268 Interval1w,
269 #[serde(rename = "1M")]
270 Interval1M,
271}
272
273impl KlineOffsetIntervalEnum {
274 #[must_use]
275 pub fn as_str(&self) -> &'static str {
276 match self {
277 Self::Interval1s => "1s",
278 Self::Interval1m => "1m",
279 Self::Interval3m => "3m",
280 Self::Interval5m => "5m",
281 Self::Interval15m => "15m",
282 Self::Interval30m => "30m",
283 Self::Interval1h => "1h",
284 Self::Interval2h => "2h",
285 Self::Interval4h => "4h",
286 Self::Interval6h => "6h",
287 Self::Interval8h => "8h",
288 Self::Interval12h => "12h",
289 Self::Interval1d => "1d",
290 Self::Interval3d => "3d",
291 Self::Interval1w => "1w",
292 Self::Interval1M => "1M",
293 }
294 }
295}
296
297impl std::str::FromStr for KlineOffsetIntervalEnum {
298 type Err = Box<dyn std::error::Error + Send + Sync>;
299
300 fn from_str(s: &str) -> Result<Self, Self::Err> {
301 match s {
302 "1s" => Ok(Self::Interval1s),
303 "1m" => Ok(Self::Interval1m),
304 "3m" => Ok(Self::Interval3m),
305 "5m" => Ok(Self::Interval5m),
306 "15m" => Ok(Self::Interval15m),
307 "30m" => Ok(Self::Interval30m),
308 "1h" => Ok(Self::Interval1h),
309 "2h" => Ok(Self::Interval2h),
310 "4h" => Ok(Self::Interval4h),
311 "6h" => Ok(Self::Interval6h),
312 "8h" => Ok(Self::Interval8h),
313 "12h" => Ok(Self::Interval12h),
314 "1d" => Ok(Self::Interval1d),
315 "3d" => Ok(Self::Interval3d),
316 "1w" => Ok(Self::Interval1w),
317 "1M" => Ok(Self::Interval1M),
318 other => Err(format!("invalid KlineOffsetIntervalEnum: {}", other).into()),
319 }
320 }
321}
322
323#[allow(non_camel_case_types)]
324#[derive(Debug, Clone, Serialize, Deserialize)]
325pub enum PartialBookDepthLevelsEnum {
326 #[serde(rename = "5")]
327 Levels5,
328 #[serde(rename = "10")]
329 Levels10,
330 #[serde(rename = "20")]
331 Levels20,
332}
333
334impl PartialBookDepthLevelsEnum {
335 #[must_use]
336 pub fn as_str(&self) -> &'static str {
337 match self {
338 Self::Levels5 => "5",
339 Self::Levels10 => "10",
340 Self::Levels20 => "20",
341 }
342 }
343}
344
345impl std::str::FromStr for PartialBookDepthLevelsEnum {
346 type Err = Box<dyn std::error::Error + Send + Sync>;
347
348 fn from_str(s: &str) -> Result<Self, Self::Err> {
349 match s {
350 "5" => Ok(Self::Levels5),
351 "10" => Ok(Self::Levels10),
352 "20" => Ok(Self::Levels20),
353 other => Err(format!("invalid PartialBookDepthLevelsEnum: {}", other).into()),
354 }
355 }
356}
357
358#[allow(non_camel_case_types)]
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub enum RollingWindowTickerWindowSizeEnum {
361 #[serde(rename = "1h")]
362 WindowSize1h,
363 #[serde(rename = "4h")]
364 WindowSize4h,
365 #[serde(rename = "1d")]
366 WindowSize1d,
367}
368
369impl RollingWindowTickerWindowSizeEnum {
370 #[must_use]
371 pub fn as_str(&self) -> &'static str {
372 match self {
373 Self::WindowSize1h => "1h",
374 Self::WindowSize4h => "4h",
375 Self::WindowSize1d => "1d",
376 }
377 }
378}
379
380impl std::str::FromStr for RollingWindowTickerWindowSizeEnum {
381 type Err = Box<dyn std::error::Error + Send + Sync>;
382
383 fn from_str(s: &str) -> Result<Self, Self::Err> {
384 match s {
385 "1h" => Ok(Self::WindowSize1h),
386 "4h" => Ok(Self::WindowSize4h),
387 "1d" => Ok(Self::WindowSize1d),
388 other => Err(format!("invalid RollingWindowTickerWindowSizeEnum: {}", other).into()),
389 }
390 }
391}
392
393#[derive(Clone, Debug, Builder)]
398#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
399pub struct AggTradeParams {
400 #[builder(setter(into))]
404 pub symbol: String,
405 #[builder(setter(into), default)]
409 pub id: Option<String>,
410}
411
412impl AggTradeParams {
413 #[must_use]
420 pub fn builder(symbol: String) -> AggTradeParamsBuilder {
421 AggTradeParamsBuilder::default().symbol(symbol)
422 }
423}
424#[derive(Clone, Debug, Builder)]
429#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
430pub struct AllMarketRollingWindowTickerParams {
431 #[builder(setter(into))]
436 pub window_size: AllMarketRollingWindowTickerWindowSizeEnum,
437 #[builder(setter(into), default)]
441 pub id: Option<String>,
442}
443
444impl AllMarketRollingWindowTickerParams {
445 #[must_use]
452 pub fn builder(
453 window_size: AllMarketRollingWindowTickerWindowSizeEnum,
454 ) -> AllMarketRollingWindowTickerParamsBuilder {
455 AllMarketRollingWindowTickerParamsBuilder::default().window_size(window_size)
456 }
457}
458#[derive(Clone, Debug, Builder, Default)]
463#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
464pub struct AllMiniTickerParams {
465 #[builder(setter(into), default)]
469 pub id: Option<String>,
470}
471
472impl AllMiniTickerParams {
473 #[must_use]
476 pub fn builder() -> AllMiniTickerParamsBuilder {
477 AllMiniTickerParamsBuilder::default()
478 }
479}
480#[derive(Clone, Debug, Builder)]
485#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
486pub struct AvgPriceParams {
487 #[builder(setter(into))]
491 pub symbol: String,
492 #[builder(setter(into), default)]
496 pub id: Option<String>,
497}
498
499impl AvgPriceParams {
500 #[must_use]
507 pub fn builder(symbol: String) -> AvgPriceParamsBuilder {
508 AvgPriceParamsBuilder::default().symbol(symbol)
509 }
510}
511#[derive(Clone, Debug, Builder)]
516#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
517pub struct BlockTradeParams {
518 #[builder(setter(into))]
522 pub symbol: String,
523 #[builder(setter(into), default)]
527 pub id: Option<String>,
528}
529
530impl BlockTradeParams {
531 #[must_use]
538 pub fn builder(symbol: String) -> BlockTradeParamsBuilder {
539 BlockTradeParamsBuilder::default().symbol(symbol)
540 }
541}
542#[derive(Clone, Debug, Builder)]
547#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
548pub struct BookTickerParams {
549 #[builder(setter(into))]
553 pub symbol: String,
554 #[builder(setter(into), default)]
558 pub id: Option<String>,
559}
560
561impl BookTickerParams {
562 #[must_use]
569 pub fn builder(symbol: String) -> BookTickerParamsBuilder {
570 BookTickerParamsBuilder::default().symbol(symbol)
571 }
572}
573#[derive(Clone, Debug, Builder)]
578#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
579pub struct DiffBookDepthParams {
580 #[builder(setter(into))]
584 pub symbol: String,
585 #[builder(setter(into), default)]
589 pub id: Option<String>,
590 #[builder(setter(into), default)]
594 pub update_speed: Option<String>,
595}
596
597impl DiffBookDepthParams {
598 #[must_use]
605 pub fn builder(symbol: String) -> DiffBookDepthParamsBuilder {
606 DiffBookDepthParamsBuilder::default().symbol(symbol)
607 }
608}
609#[derive(Clone, Debug, Builder)]
614#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
615pub struct KlineParams {
616 #[builder(setter(into))]
620 pub symbol: String,
621 #[builder(setter(into))]
626 pub interval: KlineIntervalEnum,
627 #[builder(setter(into), default)]
631 pub id: Option<String>,
632}
633
634impl KlineParams {
635 #[must_use]
643 pub fn builder(symbol: String, interval: KlineIntervalEnum) -> KlineParamsBuilder {
644 KlineParamsBuilder::default()
645 .symbol(symbol)
646 .interval(interval)
647 }
648}
649#[derive(Clone, Debug, Builder)]
654#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
655pub struct KlineOffsetParams {
656 #[builder(setter(into))]
660 pub symbol: String,
661 #[builder(setter(into))]
666 pub interval: KlineOffsetIntervalEnum,
667 #[builder(setter(into), default)]
671 pub id: Option<String>,
672}
673
674impl KlineOffsetParams {
675 #[must_use]
683 pub fn builder(symbol: String, interval: KlineOffsetIntervalEnum) -> KlineOffsetParamsBuilder {
684 KlineOffsetParamsBuilder::default()
685 .symbol(symbol)
686 .interval(interval)
687 }
688}
689#[derive(Clone, Debug, Builder)]
694#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
695pub struct MiniTickerParams {
696 #[builder(setter(into))]
700 pub symbol: String,
701 #[builder(setter(into), default)]
705 pub id: Option<String>,
706}
707
708impl MiniTickerParams {
709 #[must_use]
716 pub fn builder(symbol: String) -> MiniTickerParamsBuilder {
717 MiniTickerParamsBuilder::default().symbol(symbol)
718 }
719}
720#[derive(Clone, Debug, Builder)]
725#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
726pub struct PartialBookDepthParams {
727 #[builder(setter(into))]
731 pub symbol: String,
732 #[builder(setter(into))]
737 pub levels: PartialBookDepthLevelsEnum,
738 #[builder(setter(into), default)]
742 pub id: Option<String>,
743 #[builder(setter(into), default)]
747 pub update_speed: Option<String>,
748}
749
750impl PartialBookDepthParams {
751 #[must_use]
759 pub fn builder(
760 symbol: String,
761 levels: PartialBookDepthLevelsEnum,
762 ) -> PartialBookDepthParamsBuilder {
763 PartialBookDepthParamsBuilder::default()
764 .symbol(symbol)
765 .levels(levels)
766 }
767}
768#[derive(Clone, Debug, Builder)]
773#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
774pub struct ReferencePriceParams {
775 #[builder(setter(into))]
779 pub symbol: String,
780 #[builder(setter(into), default)]
784 pub id: Option<String>,
785}
786
787impl ReferencePriceParams {
788 #[must_use]
795 pub fn builder(symbol: String) -> ReferencePriceParamsBuilder {
796 ReferencePriceParamsBuilder::default().symbol(symbol)
797 }
798}
799#[derive(Clone, Debug, Builder)]
804#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
805pub struct RollingWindowTickerParams {
806 #[builder(setter(into))]
810 pub symbol: String,
811 #[builder(setter(into))]
816 pub window_size: RollingWindowTickerWindowSizeEnum,
817 #[builder(setter(into), default)]
821 pub id: Option<String>,
822}
823
824impl RollingWindowTickerParams {
825 #[must_use]
833 pub fn builder(
834 symbol: String,
835 window_size: RollingWindowTickerWindowSizeEnum,
836 ) -> RollingWindowTickerParamsBuilder {
837 RollingWindowTickerParamsBuilder::default()
838 .symbol(symbol)
839 .window_size(window_size)
840 }
841}
842#[derive(Clone, Debug, Builder)]
847#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
848pub struct TickerParams {
849 #[builder(setter(into))]
853 pub symbol: String,
854 #[builder(setter(into), default)]
858 pub id: Option<String>,
859}
860
861impl TickerParams {
862 #[must_use]
869 pub fn builder(symbol: String) -> TickerParamsBuilder {
870 TickerParamsBuilder::default().symbol(symbol)
871 }
872}
873#[derive(Clone, Debug, Builder)]
878#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
879pub struct TradeParams {
880 #[builder(setter(into))]
884 pub symbol: String,
885 #[builder(setter(into), default)]
889 pub id: Option<String>,
890}
891
892impl TradeParams {
893 #[must_use]
900 pub fn builder(symbol: String) -> TradeParamsBuilder {
901 TradeParamsBuilder::default().symbol(symbol)
902 }
903}
904
905#[async_trait]
906impl WebSocketStreamsApi for WebSocketStreamsApiClient {
907 async fn agg_trade(
908 &self,
909 params: AggTradeParams,
910 ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
911 let AggTradeParams { symbol, id } = params;
912
913 let pairs: &[(&str, Option<String>)] =
914 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
915
916 let vars: HashMap<_, _> = pairs
917 .iter()
918 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
919 .collect();
920
921 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
922
923 let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
924
925 Ok(create_stream_handler::<models::AggTradeResponse>(
926 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
927 stream,
928 id_opt.map(|s| {
929 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
930 if let Ok(n) = s.parse::<u32>() {
931 return StreamId::Number(n);
932 }
933 }
934 StreamId::Str(s)
935 }),
936 None,
937 )
938 .await)
939 }
940
941 async fn all_market_rolling_window_ticker(
942 &self,
943 params: AllMarketRollingWindowTickerParams,
944 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
945 {
946 let AllMarketRollingWindowTickerParams { window_size, id } = params;
947
948 let pairs: &[(&str, Option<String>)] = &[
949 ("windowSize", Some(window_size.as_str().to_string())),
950 ("id", id.clone()),
951 ];
952
953 let vars: HashMap<_, _> = pairs
954 .iter()
955 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
956 .collect();
957
958 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
959
960 let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
961
962 Ok(
963 create_stream_handler::<Vec<models::AllMarketRollingWindowTickerResponseInner>>(
964 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
965 stream,
966 id_opt.map(|s| {
967 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
968 if let Ok(n) = s.parse::<u32>() {
969 return StreamId::Number(n);
970 }
971 }
972 StreamId::Str(s)
973 }),
974 None,
975 )
976 .await,
977 )
978 }
979
980 async fn all_mini_ticker(
981 &self,
982 params: AllMiniTickerParams,
983 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
984 let AllMiniTickerParams { id } = params;
985
986 let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
987
988 let vars: HashMap<_, _> = pairs
989 .iter()
990 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
991 .collect();
992
993 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
994
995 let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
996
997 Ok(
998 create_stream_handler::<Vec<models::AllMiniTickerResponseInner>>(
999 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1000 stream,
1001 id_opt.map(|s| {
1002 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1003 if let Ok(n) = s.parse::<u32>() {
1004 return StreamId::Number(n);
1005 }
1006 }
1007 StreamId::Str(s)
1008 }),
1009 None,
1010 )
1011 .await,
1012 )
1013 }
1014
1015 async fn avg_price(
1016 &self,
1017 params: AvgPriceParams,
1018 ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
1019 let AvgPriceParams { symbol, id } = params;
1020
1021 let pairs: &[(&str, Option<String>)] =
1022 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1023
1024 let vars: HashMap<_, _> = pairs
1025 .iter()
1026 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1027 .collect();
1028
1029 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1030
1031 let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1032
1033 Ok(create_stream_handler::<models::AvgPriceResponse>(
1034 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1035 stream,
1036 id_opt.map(|s| {
1037 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1038 if let Ok(n) = s.parse::<u32>() {
1039 return StreamId::Number(n);
1040 }
1041 }
1042 StreamId::Str(s)
1043 }),
1044 None,
1045 )
1046 .await)
1047 }
1048
1049 async fn block_trade(
1050 &self,
1051 params: BlockTradeParams,
1052 ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>> {
1053 let BlockTradeParams { symbol, id } = params;
1054
1055 let pairs: &[(&str, Option<String>)] =
1056 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1057
1058 let vars: HashMap<_, _> = pairs
1059 .iter()
1060 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1061 .collect();
1062
1063 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1064
1065 let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
1066
1067 Ok(create_stream_handler::<models::BlockTradeResponse>(
1068 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1069 stream,
1070 id_opt.map(|s| {
1071 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1072 if let Ok(n) = s.parse::<u32>() {
1073 return StreamId::Number(n);
1074 }
1075 }
1076 StreamId::Str(s)
1077 }),
1078 None,
1079 )
1080 .await)
1081 }
1082
1083 async fn book_ticker(
1084 &self,
1085 params: BookTickerParams,
1086 ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
1087 let BookTickerParams { symbol, id } = params;
1088
1089 let pairs: &[(&str, Option<String>)] =
1090 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1091
1092 let vars: HashMap<_, _> = pairs
1093 .iter()
1094 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1095 .collect();
1096
1097 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1098
1099 let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
1100
1101 Ok(create_stream_handler::<models::BookTickerResponse>(
1102 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1103 stream,
1104 id_opt.map(|s| {
1105 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1106 if let Ok(n) = s.parse::<u32>() {
1107 return StreamId::Number(n);
1108 }
1109 }
1110 StreamId::Str(s)
1111 }),
1112 None,
1113 )
1114 .await)
1115 }
1116
1117 async fn diff_book_depth(
1118 &self,
1119 params: DiffBookDepthParams,
1120 ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
1121 let DiffBookDepthParams {
1122 symbol,
1123 id,
1124 update_speed,
1125 } = params;
1126
1127 let pairs: &[(&str, Option<String>)] = &[
1128 ("symbol", Some(symbol.clone())),
1129 ("id", id.clone()),
1130 ("updateSpeed", update_speed.clone()),
1131 ];
1132
1133 let vars: HashMap<_, _> = pairs
1134 .iter()
1135 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1136 .collect();
1137
1138 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1139
1140 let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
1141
1142 Ok(create_stream_handler::<models::DiffBookDepthResponse>(
1143 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1144 stream,
1145 id_opt.map(|s| {
1146 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1147 if let Ok(n) = s.parse::<u32>() {
1148 return StreamId::Number(n);
1149 }
1150 }
1151 StreamId::Str(s)
1152 }),
1153 None,
1154 )
1155 .await)
1156 }
1157
1158 async fn kline(
1159 &self,
1160 params: KlineParams,
1161 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
1162 let KlineParams {
1163 symbol,
1164 interval,
1165 id,
1166 } = params;
1167
1168 let pairs: &[(&str, Option<String>)] = &[
1169 ("symbol", Some(symbol.clone())),
1170 ("interval", Some(interval.as_str().to_string())),
1171 ("id", id.clone()),
1172 ];
1173
1174 let vars: HashMap<_, _> = pairs
1175 .iter()
1176 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1177 .collect();
1178
1179 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1180
1181 let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
1182
1183 Ok(create_stream_handler::<models::KlineResponse>(
1184 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1185 stream,
1186 id_opt.map(|s| {
1187 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1188 if let Ok(n) = s.parse::<u32>() {
1189 return StreamId::Number(n);
1190 }
1191 }
1192 StreamId::Str(s)
1193 }),
1194 None,
1195 )
1196 .await)
1197 }
1198
1199 async fn kline_offset(
1200 &self,
1201 params: KlineOffsetParams,
1202 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
1203 let KlineOffsetParams {
1204 symbol,
1205 interval,
1206 id,
1207 } = params;
1208
1209 let pairs: &[(&str, Option<String>)] = &[
1210 ("symbol", Some(symbol.clone())),
1211 ("interval", Some(interval.as_str().to_string())),
1212 ("id", id.clone()),
1213 ];
1214
1215 let vars: HashMap<_, _> = pairs
1216 .iter()
1217 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1218 .collect();
1219
1220 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1221
1222 let stream =
1223 replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
1224
1225 Ok(create_stream_handler::<models::KlineOffsetResponse>(
1226 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1227 stream,
1228 id_opt.map(|s| {
1229 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1230 if let Ok(n) = s.parse::<u32>() {
1231 return StreamId::Number(n);
1232 }
1233 }
1234 StreamId::Str(s)
1235 }),
1236 None,
1237 )
1238 .await)
1239 }
1240
1241 async fn mini_ticker(
1242 &self,
1243 params: MiniTickerParams,
1244 ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
1245 let MiniTickerParams { symbol, id } = params;
1246
1247 let pairs: &[(&str, Option<String>)] =
1248 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1249
1250 let vars: HashMap<_, _> = pairs
1251 .iter()
1252 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1253 .collect();
1254
1255 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1256
1257 let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
1258
1259 Ok(create_stream_handler::<models::MiniTickerResponse>(
1260 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1261 stream,
1262 id_opt.map(|s| {
1263 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1264 if let Ok(n) = s.parse::<u32>() {
1265 return StreamId::Number(n);
1266 }
1267 }
1268 StreamId::Str(s)
1269 }),
1270 None,
1271 )
1272 .await)
1273 }
1274
1275 async fn partial_book_depth(
1276 &self,
1277 params: PartialBookDepthParams,
1278 ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
1279 let PartialBookDepthParams {
1280 symbol,
1281 levels,
1282 id,
1283 update_speed,
1284 } = params;
1285
1286 let pairs: &[(&str, Option<String>)] = &[
1287 ("symbol", Some(symbol.clone())),
1288 ("levels", Some(levels.as_str().to_string())),
1289 ("id", id.clone()),
1290 ("updateSpeed", update_speed.clone()),
1291 ];
1292
1293 let vars: HashMap<_, _> = pairs
1294 .iter()
1295 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1296 .collect();
1297
1298 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1299
1300 let stream =
1301 replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
1302
1303 Ok(create_stream_handler::<models::PartialBookDepthResponse>(
1304 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1305 stream,
1306 id_opt.map(|s| {
1307 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1308 if let Ok(n) = s.parse::<u32>() {
1309 return StreamId::Number(n);
1310 }
1311 }
1312 StreamId::Str(s)
1313 }),
1314 None,
1315 )
1316 .await)
1317 }
1318
1319 async fn reference_price(
1320 &self,
1321 params: ReferencePriceParams,
1322 ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>> {
1323 let ReferencePriceParams { symbol, id } = params;
1324
1325 let pairs: &[(&str, Option<String>)] =
1326 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1327
1328 let vars: HashMap<_, _> = pairs
1329 .iter()
1330 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1331 .collect();
1332
1333 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1334
1335 let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
1336
1337 Ok(create_stream_handler::<models::ReferencePriceResponse>(
1338 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1339 stream,
1340 id_opt.map(|s| {
1341 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1342 if let Ok(n) = s.parse::<u32>() {
1343 return StreamId::Number(n);
1344 }
1345 }
1346 StreamId::Str(s)
1347 }),
1348 None,
1349 )
1350 .await)
1351 }
1352
1353 async fn rolling_window_ticker(
1354 &self,
1355 params: RollingWindowTickerParams,
1356 ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
1357 let RollingWindowTickerParams {
1358 symbol,
1359 window_size,
1360 id,
1361 } = params;
1362
1363 let pairs: &[(&str, Option<String>)] = &[
1364 ("symbol", Some(symbol.clone())),
1365 ("windowSize", Some(window_size.as_str().to_string())),
1366 ("id", id.clone()),
1367 ];
1368
1369 let vars: HashMap<_, _> = pairs
1370 .iter()
1371 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1372 .collect();
1373
1374 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1375
1376 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
1377
1378 Ok(
1379 create_stream_handler::<models::RollingWindowTickerResponse>(
1380 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1381 stream,
1382 id_opt.map(|s| {
1383 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1384 if let Ok(n) = s.parse::<u32>() {
1385 return StreamId::Number(n);
1386 }
1387 }
1388 StreamId::Str(s)
1389 }),
1390 None,
1391 )
1392 .await,
1393 )
1394 }
1395
1396 async fn ticker(
1397 &self,
1398 params: TickerParams,
1399 ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
1400 let TickerParams { symbol, id } = params;
1401
1402 let pairs: &[(&str, Option<String>)] =
1403 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1404
1405 let vars: HashMap<_, _> = pairs
1406 .iter()
1407 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1408 .collect();
1409
1410 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1411
1412 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
1413
1414 Ok(create_stream_handler::<models::TickerResponse>(
1415 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1416 stream,
1417 id_opt.map(|s| {
1418 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1419 if let Ok(n) = s.parse::<u32>() {
1420 return StreamId::Number(n);
1421 }
1422 }
1423 StreamId::Str(s)
1424 }),
1425 None,
1426 )
1427 .await)
1428 }
1429
1430 async fn trade(
1431 &self,
1432 params: TradeParams,
1433 ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
1434 let TradeParams { symbol, id } = params;
1435
1436 let pairs: &[(&str, Option<String>)] =
1437 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1438
1439 let vars: HashMap<_, _> = pairs
1440 .iter()
1441 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1442 .collect();
1443
1444 let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1445
1446 let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
1447
1448 Ok(create_stream_handler::<models::TradeResponse>(
1449 WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1450 stream,
1451 id_opt.map(|s| {
1452 if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1453 if let Ok(n) = s.parse::<u32>() {
1454 return StreamId::Number(n);
1455 }
1456 }
1457 StreamId::Str(s)
1458 }),
1459 None,
1460 )
1461 .await)
1462 }
1463}
1464
1465#[cfg(all(test, feature = "spot"))]
1466mod tests {
1467 use super::*;
1468 use crate::TOKIO_SHARED_RT;
1469 use crate::{
1470 common::websocket::{WebsocketConnection, WebsocketHandler},
1471 config::ConfigurationWebsocketStreams,
1472 };
1473 use serde_json::json;
1474 use std::sync::atomic::{AtomicBool, Ordering};
1475 use tokio::task::yield_now;
1476
1477 async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
1478 let conn = WebsocketConnection::new("test");
1479 let config = ConfigurationWebsocketStreams::builder()
1480 .build()
1481 .expect("Failed to build configuration");
1482 let streams_base = WebsocketStreams::new(config, vec![conn.clone()], vec![]);
1483 conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
1484 .await;
1485 (streams_base, conn)
1486 }
1487
1488 #[test]
1489 fn agg_trade_should_execute_successfully() {
1490 TOKIO_SHARED_RT.block_on(async {
1491 let (streams_base, _) = make_streams_base().await;
1492 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1493
1494 let id = "test-id-123".to_string();
1495
1496 let params = AggTradeParams::builder("bnbusdt".to_string())
1497 .id(Some(id.clone()))
1498 .build()
1499 .unwrap();
1500
1501 let AggTradeParams { symbol, id } = params.clone();
1502
1503 let pairs: &[(&str, Option<String>)] =
1504 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1505
1506 let vars: HashMap<_, _> = pairs
1507 .iter()
1508 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1509 .collect();
1510 let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1511 let ws_stream = api
1512 .agg_trade(params)
1513 .await
1514 .expect("agg_trade should return a WebsocketStream");
1515
1516 assert!(
1517 streams_base.is_subscribed(&stream).await,
1518 "expected stream '{stream}' to be subscribed"
1519 );
1520 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1521 });
1522 }
1523
1524 #[test]
1525 fn agg_trade_should_handle_incoming_message() {
1526 TOKIO_SHARED_RT.block_on(async {
1527 let (streams_base, conn) = make_streams_base().await;
1528 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1529
1530 let id = "test-id-123".to_string();
1531
1532 let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1533
1534 let AggTradeParams {
1535 symbol,id,
1536 } = params.clone();
1537
1538 let pairs: &[(&str, Option<String>)] = &[
1539 ("symbol",
1540 Some(symbol.clone())
1541 ),
1542 ("id",
1543 id.clone()
1544 ),
1545 ];
1546
1547 let vars: HashMap<_, _> = pairs
1548 .iter()
1549 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1550 .collect();
1551 let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1552
1553 let ws_stream = api.agg_trade(params).await.unwrap();
1554
1555 let called = Arc::new(AtomicBool::new(false));
1556 let called_with_message = called.clone();
1557 ws_stream.on_message(move |_payload: models::AggTradeResponse| {
1558 called_with_message.store(true, Ordering::SeqCst);
1559 });
1560
1561 let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
1562 let msg = json!({
1563 "stream": stream,
1564 "data": payload,
1565 });
1566
1567 streams_base.on_message(msg.to_string(), conn.clone()).await;
1568 yield_now().await;
1569
1570 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1571 });
1572 }
1573
1574 #[test]
1575 fn agg_trade_should_not_fire_after_unsubscribe() {
1576 TOKIO_SHARED_RT.block_on(async {
1577 let (streams_base, conn) = make_streams_base().await;
1578 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1579
1580 let id = "test-id-123".to_string();
1581
1582 let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1583
1584 let AggTradeParams {
1585 symbol,id,
1586 } = params.clone();
1587
1588 let pairs: &[(&str, Option<String>)] = &[
1589 ("symbol",
1590 Some(symbol.clone())
1591 ),
1592 ("id",
1593 id.clone()
1594 ),
1595 ];
1596
1597 let vars: HashMap<_, _> = pairs
1598 .iter()
1599 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1600 .collect();
1601 let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1602
1603 let ws_stream = api.agg_trade(params).await.unwrap();
1604
1605 let called = Arc::new(AtomicBool::new(false));
1606 let called_clone = called.clone();
1607 ws_stream.on_message(move |_payload: models::AggTradeResponse| {
1608 called_clone.store(true, Ordering::SeqCst);
1609 });
1610
1611 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1612
1613 ws_stream.unsubscribe().await;
1614
1615 let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
1616 let msg = json!({
1617 "stream": stream,
1618 "data": payload,
1619 });
1620
1621 streams_base.on_message(msg.to_string(), conn.clone()).await;
1622
1623 yield_now().await;
1624
1625 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1626 });
1627 }
1628
1629 #[test]
1630 fn all_market_rolling_window_ticker_should_execute_successfully() {
1631 TOKIO_SHARED_RT.block_on(async {
1632 let (streams_base, _) = make_streams_base().await;
1633 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1634
1635 let id = "test-id-123".to_string();
1636
1637 let params = AllMarketRollingWindowTickerParams::builder(
1638 AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,
1639 )
1640 .id(Some(id.clone()))
1641 .build()
1642 .unwrap();
1643
1644 let AllMarketRollingWindowTickerParams { window_size, id } = params.clone();
1645
1646 let pairs: &[(&str, Option<String>)] = &[
1647 ("windowSize", Some(window_size.as_str().to_string())),
1648 ("id", id.clone()),
1649 ];
1650
1651 let vars: HashMap<_, _> = pairs
1652 .iter()
1653 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1654 .collect();
1655 let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1656 let ws_stream = api
1657 .all_market_rolling_window_ticker(params)
1658 .await
1659 .expect("all_market_rolling_window_ticker should return a WebsocketStream");
1660
1661 assert!(
1662 streams_base.is_subscribed(&stream).await,
1663 "expected stream '{stream}' to be subscribed"
1664 );
1665 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1666 });
1667 }
1668
1669 #[test]
1670 fn all_market_rolling_window_ticker_should_handle_incoming_message() {
1671 TOKIO_SHARED_RT.block_on(async {
1672 let (streams_base, conn) = make_streams_base().await;
1673 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1674
1675 let id = "test-id-123".to_string();
1676
1677 let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
1678
1679 let AllMarketRollingWindowTickerParams {
1680 window_size,id,
1681 } = params.clone();
1682
1683 let pairs: &[(&str, Option<String>)] = &[
1684 ("windowSize",
1685 Some(window_size.as_str().to_string())
1686 ),
1687 ("id",
1688 id.clone()
1689 ),
1690 ];
1691
1692 let vars: HashMap<_, _> = pairs
1693 .iter()
1694 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1695 .collect();
1696 let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1697
1698 let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
1699
1700 let called = Arc::new(AtomicBool::new(false));
1701 let called_with_message = called.clone();
1702 ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
1703 called_with_message.store(true, Ordering::SeqCst);
1704 });
1705
1706 let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
1707 let msg = json!({
1708 "stream": stream,
1709 "data": payload,
1710 });
1711
1712 streams_base.on_message(msg.to_string(), conn.clone()).await;
1713 yield_now().await;
1714
1715 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1716 });
1717 }
1718
1719 #[test]
1720 fn all_market_rolling_window_ticker_should_not_fire_after_unsubscribe() {
1721 TOKIO_SHARED_RT.block_on(async {
1722 let (streams_base, conn) = make_streams_base().await;
1723 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1724
1725 let id = "test-id-123".to_string();
1726
1727 let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
1728
1729 let AllMarketRollingWindowTickerParams {
1730 window_size,id,
1731 } = params.clone();
1732
1733 let pairs: &[(&str, Option<String>)] = &[
1734 ("windowSize",
1735 Some(window_size.as_str().to_string())
1736 ),
1737 ("id",
1738 id.clone()
1739 ),
1740 ];
1741
1742 let vars: HashMap<_, _> = pairs
1743 .iter()
1744 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1745 .collect();
1746 let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1747
1748 let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
1749
1750 let called = Arc::new(AtomicBool::new(false));
1751 let called_clone = called.clone();
1752 ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
1753 called_clone.store(true, Ordering::SeqCst);
1754 });
1755
1756 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1757
1758 ws_stream.unsubscribe().await;
1759
1760 let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
1761 let msg = json!({
1762 "stream": stream,
1763 "data": payload,
1764 });
1765
1766 streams_base.on_message(msg.to_string(), conn.clone()).await;
1767
1768 yield_now().await;
1769
1770 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1771 });
1772 }
1773
1774 #[test]
1775 fn all_mini_ticker_should_execute_successfully() {
1776 TOKIO_SHARED_RT.block_on(async {
1777 let (streams_base, _) = make_streams_base().await;
1778 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1779
1780 let id = "test-id-123".to_string();
1781
1782 let params = AllMiniTickerParams::builder()
1783 .id(Some(id.clone()))
1784 .build()
1785 .unwrap();
1786
1787 let AllMiniTickerParams { id } = params.clone();
1788
1789 let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
1790
1791 let vars: HashMap<_, _> = pairs
1792 .iter()
1793 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1794 .collect();
1795 let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1796 let ws_stream = api
1797 .all_mini_ticker(params)
1798 .await
1799 .expect("all_mini_ticker should return a WebsocketStream");
1800
1801 assert!(
1802 streams_base.is_subscribed(&stream).await,
1803 "expected stream '{stream}' to be subscribed"
1804 );
1805 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1806 });
1807 }
1808
1809 #[test]
1810 fn all_mini_ticker_should_handle_incoming_message() {
1811 TOKIO_SHARED_RT.block_on(async {
1812 let (streams_base, conn) = make_streams_base().await;
1813 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1814
1815 let id = "test-id-123".to_string();
1816
1817 let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
1818
1819 let AllMiniTickerParams {
1820 id,
1821 } = params.clone();
1822
1823 let pairs: &[(&str, Option<String>)] = &[
1824 ("id",
1825 id.clone()
1826 ),
1827 ];
1828
1829 let vars: HashMap<_, _> = pairs
1830 .iter()
1831 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1832 .collect();
1833 let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1834
1835 let ws_stream = api.all_mini_ticker(params).await.unwrap();
1836
1837 let called = Arc::new(AtomicBool::new(false));
1838 let called_with_message = called.clone();
1839 ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
1840 called_with_message.store(true, Ordering::SeqCst);
1841 });
1842
1843 let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
1844 let msg = json!({
1845 "stream": stream,
1846 "data": payload,
1847 });
1848
1849 streams_base.on_message(msg.to_string(), conn.clone()).await;
1850 yield_now().await;
1851
1852 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1853 });
1854 }
1855
1856 #[test]
1857 fn all_mini_ticker_should_not_fire_after_unsubscribe() {
1858 TOKIO_SHARED_RT.block_on(async {
1859 let (streams_base, conn) = make_streams_base().await;
1860 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1861
1862 let id = "test-id-123".to_string();
1863
1864 let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
1865
1866 let AllMiniTickerParams {
1867 id,
1868 } = params.clone();
1869
1870 let pairs: &[(&str, Option<String>)] = &[
1871 ("id",
1872 id.clone()
1873 ),
1874 ];
1875
1876 let vars: HashMap<_, _> = pairs
1877 .iter()
1878 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1879 .collect();
1880 let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1881
1882 let ws_stream = api.all_mini_ticker(params).await.unwrap();
1883
1884 let called = Arc::new(AtomicBool::new(false));
1885 let called_clone = called.clone();
1886 ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
1887 called_clone.store(true, Ordering::SeqCst);
1888 });
1889
1890 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1891
1892 ws_stream.unsubscribe().await;
1893
1894 let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
1895 let msg = json!({
1896 "stream": stream,
1897 "data": payload,
1898 });
1899
1900 streams_base.on_message(msg.to_string(), conn.clone()).await;
1901
1902 yield_now().await;
1903
1904 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1905 });
1906 }
1907
1908 #[test]
1909 fn avg_price_should_execute_successfully() {
1910 TOKIO_SHARED_RT.block_on(async {
1911 let (streams_base, _) = make_streams_base().await;
1912 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1913
1914 let id = "test-id-123".to_string();
1915
1916 let params = AvgPriceParams::builder("bnbusdt".to_string())
1917 .id(Some(id.clone()))
1918 .build()
1919 .unwrap();
1920
1921 let AvgPriceParams { symbol, id } = params.clone();
1922
1923 let pairs: &[(&str, Option<String>)] =
1924 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1925
1926 let vars: HashMap<_, _> = pairs
1927 .iter()
1928 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1929 .collect();
1930 let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1931 let ws_stream = api
1932 .avg_price(params)
1933 .await
1934 .expect("avg_price should return a WebsocketStream");
1935
1936 assert!(
1937 streams_base.is_subscribed(&stream).await,
1938 "expected stream '{stream}' to be subscribed"
1939 );
1940 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1941 });
1942 }
1943
1944 #[test]
1945 fn avg_price_should_handle_incoming_message() {
1946 TOKIO_SHARED_RT.block_on(async {
1947 let (streams_base, conn) = make_streams_base().await;
1948 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1949
1950 let id = "test-id-123".to_string();
1951
1952 let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1953
1954 let AvgPriceParams {
1955 symbol,id,
1956 } = params.clone();
1957
1958 let pairs: &[(&str, Option<String>)] = &[
1959 ("symbol",
1960 Some(symbol.clone())
1961 ),
1962 ("id",
1963 id.clone()
1964 ),
1965 ];
1966
1967 let vars: HashMap<_, _> = pairs
1968 .iter()
1969 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1970 .collect();
1971 let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1972
1973 let ws_stream = api.avg_price(params).await.unwrap();
1974
1975 let called = Arc::new(AtomicBool::new(false));
1976 let called_with_message = called.clone();
1977 ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
1978 called_with_message.store(true, Ordering::SeqCst);
1979 });
1980
1981 let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
1982 let msg = json!({
1983 "stream": stream,
1984 "data": payload,
1985 });
1986
1987 streams_base.on_message(msg.to_string(), conn.clone()).await;
1988 yield_now().await;
1989
1990 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1991 });
1992 }
1993
1994 #[test]
1995 fn avg_price_should_not_fire_after_unsubscribe() {
1996 TOKIO_SHARED_RT.block_on(async {
1997 let (streams_base, conn) = make_streams_base().await;
1998 let api = WebSocketStreamsApiClient::new(streams_base.clone());
1999
2000 let id = "test-id-123".to_string();
2001
2002 let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2003
2004 let AvgPriceParams {
2005 symbol,id,
2006 } = params.clone();
2007
2008 let pairs: &[(&str, Option<String>)] = &[
2009 ("symbol",
2010 Some(symbol.clone())
2011 ),
2012 ("id",
2013 id.clone()
2014 ),
2015 ];
2016
2017 let vars: HashMap<_, _> = pairs
2018 .iter()
2019 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2020 .collect();
2021 let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
2022
2023 let ws_stream = api.avg_price(params).await.unwrap();
2024
2025 let called = Arc::new(AtomicBool::new(false));
2026 let called_clone = called.clone();
2027 ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
2028 called_clone.store(true, Ordering::SeqCst);
2029 });
2030
2031 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2032
2033 ws_stream.unsubscribe().await;
2034
2035 let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
2036 let msg = json!({
2037 "stream": stream,
2038 "data": payload,
2039 });
2040
2041 streams_base.on_message(msg.to_string(), conn.clone()).await;
2042
2043 yield_now().await;
2044
2045 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2046 });
2047 }
2048
2049 #[test]
2050 fn block_trade_should_execute_successfully() {
2051 TOKIO_SHARED_RT.block_on(async {
2052 let (streams_base, _) = make_streams_base().await;
2053 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2054
2055 let id = "test-id-123".to_string();
2056
2057 let params = BlockTradeParams::builder("bnbusdt".to_string())
2058 .id(Some(id.clone()))
2059 .build()
2060 .unwrap();
2061
2062 let BlockTradeParams { symbol, id } = params.clone();
2063
2064 let pairs: &[(&str, Option<String>)] =
2065 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2066
2067 let vars: HashMap<_, _> = pairs
2068 .iter()
2069 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2070 .collect();
2071 let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2072 let ws_stream = api
2073 .block_trade(params)
2074 .await
2075 .expect("block_trade should return a WebsocketStream");
2076
2077 assert!(
2078 streams_base.is_subscribed(&stream).await,
2079 "expected stream '{stream}' to be subscribed"
2080 );
2081 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2082 });
2083 }
2084
2085 #[test]
2086 fn block_trade_should_handle_incoming_message() {
2087 TOKIO_SHARED_RT.block_on(async {
2088 let (streams_base, conn) = make_streams_base().await;
2089 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2090
2091 let id = "test-id-123".to_string();
2092
2093 let params = BlockTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2094
2095 let BlockTradeParams {
2096 symbol,id,
2097 } = params.clone();
2098
2099 let pairs: &[(&str, Option<String>)] = &[
2100 ("symbol",
2101 Some(symbol.clone())
2102 ),
2103 ("id",
2104 id.clone()
2105 ),
2106 ];
2107
2108 let vars: HashMap<_, _> = pairs
2109 .iter()
2110 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2111 .collect();
2112 let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2113
2114 let ws_stream = api.block_trade(params).await.unwrap();
2115
2116 let called = Arc::new(AtomicBool::new(false));
2117 let called_with_message = called.clone();
2118 ws_stream.on_message(move |_payload: models::BlockTradeResponse| {
2119 called_with_message.store(true, Ordering::SeqCst);
2120 });
2121
2122 let payload: Value = serde_json::from_str(r#"{"e":"blockTrade","E":1772506983582,"s":"BNBBTC","t":582,"p":"0.052","q":"5838","T":1772506983321,"m":true}"#).unwrap();
2123 let msg = json!({
2124 "stream": stream,
2125 "data": payload,
2126 });
2127
2128 streams_base.on_message(msg.to_string(), conn.clone()).await;
2129 yield_now().await;
2130
2131 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2132 });
2133 }
2134
2135 #[test]
2136 fn block_trade_should_not_fire_after_unsubscribe() {
2137 TOKIO_SHARED_RT.block_on(async {
2138 let (streams_base, conn) = make_streams_base().await;
2139 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2140
2141 let id = "test-id-123".to_string();
2142
2143 let params = BlockTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2144
2145 let BlockTradeParams {
2146 symbol,id,
2147 } = params.clone();
2148
2149 let pairs: &[(&str, Option<String>)] = &[
2150 ("symbol",
2151 Some(symbol.clone())
2152 ),
2153 ("id",
2154 id.clone()
2155 ),
2156 ];
2157
2158 let vars: HashMap<_, _> = pairs
2159 .iter()
2160 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2161 .collect();
2162 let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2163
2164 let ws_stream = api.block_trade(params).await.unwrap();
2165
2166 let called = Arc::new(AtomicBool::new(false));
2167 let called_clone = called.clone();
2168 ws_stream.on_message(move |_payload: models::BlockTradeResponse| {
2169 called_clone.store(true, Ordering::SeqCst);
2170 });
2171
2172 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2173
2174 ws_stream.unsubscribe().await;
2175
2176 let payload: Value = serde_json::from_str(r#"{"e":"blockTrade","E":1772506983582,"s":"BNBBTC","t":582,"p":"0.052","q":"5838","T":1772506983321,"m":true}"#).unwrap();
2177 let msg = json!({
2178 "stream": stream,
2179 "data": payload,
2180 });
2181
2182 streams_base.on_message(msg.to_string(), conn.clone()).await;
2183
2184 yield_now().await;
2185
2186 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2187 });
2188 }
2189
2190 #[test]
2191 fn book_ticker_should_execute_successfully() {
2192 TOKIO_SHARED_RT.block_on(async {
2193 let (streams_base, _) = make_streams_base().await;
2194 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2195
2196 let id = "test-id-123".to_string();
2197
2198 let params = BookTickerParams::builder("bnbusdt".to_string())
2199 .id(Some(id.clone()))
2200 .build()
2201 .unwrap();
2202
2203 let BookTickerParams { symbol, id } = params.clone();
2204
2205 let pairs: &[(&str, Option<String>)] =
2206 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2207
2208 let vars: HashMap<_, _> = pairs
2209 .iter()
2210 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2211 .collect();
2212 let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2213 let ws_stream = api
2214 .book_ticker(params)
2215 .await
2216 .expect("book_ticker should return a WebsocketStream");
2217
2218 assert!(
2219 streams_base.is_subscribed(&stream).await,
2220 "expected stream '{stream}' to be subscribed"
2221 );
2222 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2223 });
2224 }
2225
2226 #[test]
2227 fn book_ticker_should_handle_incoming_message() {
2228 TOKIO_SHARED_RT.block_on(async {
2229 let (streams_base, conn) = make_streams_base().await;
2230 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2231
2232 let id = "test-id-123".to_string();
2233
2234 let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2235
2236 let BookTickerParams {
2237 symbol,id,
2238 } = params.clone();
2239
2240 let pairs: &[(&str, Option<String>)] = &[
2241 ("symbol",
2242 Some(symbol.clone())
2243 ),
2244 ("id",
2245 id.clone()
2246 ),
2247 ];
2248
2249 let vars: HashMap<_, _> = pairs
2250 .iter()
2251 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2252 .collect();
2253 let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2254
2255 let ws_stream = api.book_ticker(params).await.unwrap();
2256
2257 let called = Arc::new(AtomicBool::new(false));
2258 let called_with_message = called.clone();
2259 ws_stream.on_message(move |_payload: models::BookTickerResponse| {
2260 called_with_message.store(true, Ordering::SeqCst);
2261 });
2262
2263 let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
2264 let msg = json!({
2265 "stream": stream,
2266 "data": payload,
2267 });
2268
2269 streams_base.on_message(msg.to_string(), conn.clone()).await;
2270 yield_now().await;
2271
2272 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2273 });
2274 }
2275
2276 #[test]
2277 fn book_ticker_should_not_fire_after_unsubscribe() {
2278 TOKIO_SHARED_RT.block_on(async {
2279 let (streams_base, conn) = make_streams_base().await;
2280 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2281
2282 let id = "test-id-123".to_string();
2283
2284 let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2285
2286 let BookTickerParams {
2287 symbol,id,
2288 } = params.clone();
2289
2290 let pairs: &[(&str, Option<String>)] = &[
2291 ("symbol",
2292 Some(symbol.clone())
2293 ),
2294 ("id",
2295 id.clone()
2296 ),
2297 ];
2298
2299 let vars: HashMap<_, _> = pairs
2300 .iter()
2301 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2302 .collect();
2303 let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2304
2305 let ws_stream = api.book_ticker(params).await.unwrap();
2306
2307 let called = Arc::new(AtomicBool::new(false));
2308 let called_clone = called.clone();
2309 ws_stream.on_message(move |_payload: models::BookTickerResponse| {
2310 called_clone.store(true, Ordering::SeqCst);
2311 });
2312
2313 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2314
2315 ws_stream.unsubscribe().await;
2316
2317 let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
2318 let msg = json!({
2319 "stream": stream,
2320 "data": payload,
2321 });
2322
2323 streams_base.on_message(msg.to_string(), conn.clone()).await;
2324
2325 yield_now().await;
2326
2327 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2328 });
2329 }
2330
2331 #[test]
2332 fn diff_book_depth_should_execute_successfully() {
2333 TOKIO_SHARED_RT.block_on(async {
2334 let (streams_base, _) = make_streams_base().await;
2335 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2336
2337 let id = "test-id-123".to_string();
2338
2339 let params = DiffBookDepthParams::builder("bnbusdt".to_string())
2340 .id(Some(id.clone()))
2341 .build()
2342 .unwrap();
2343
2344 let DiffBookDepthParams {
2345 symbol,
2346 id,
2347 update_speed,
2348 } = params.clone();
2349
2350 let pairs: &[(&str, Option<String>)] = &[
2351 ("symbol", Some(symbol.clone())),
2352 ("id", id.clone()),
2353 ("updateSpeed", update_speed.clone()),
2354 ];
2355
2356 let vars: HashMap<_, _> = pairs
2357 .iter()
2358 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2359 .collect();
2360 let stream =
2361 replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2362 let ws_stream = api
2363 .diff_book_depth(params)
2364 .await
2365 .expect("diff_book_depth should return a WebsocketStream");
2366
2367 assert!(
2368 streams_base.is_subscribed(&stream).await,
2369 "expected stream '{stream}' to be subscribed"
2370 );
2371 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2372 });
2373 }
2374
2375 #[test]
2376 fn diff_book_depth_should_handle_incoming_message() {
2377 TOKIO_SHARED_RT.block_on(async {
2378 let (streams_base, conn) = make_streams_base().await;
2379 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2380
2381 let id = "test-id-123".to_string();
2382
2383 let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2384
2385 let DiffBookDepthParams {
2386 symbol,id,update_speed,
2387 } = params.clone();
2388
2389 let pairs: &[(&str, Option<String>)] = &[
2390 ("symbol",
2391 Some(symbol.clone())
2392 ),
2393 ("id",
2394 id.clone()
2395 ),
2396 ("updateSpeed",
2397 update_speed.clone()
2398 ),
2399 ];
2400
2401 let vars: HashMap<_, _> = pairs
2402 .iter()
2403 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2404 .collect();
2405 let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2406
2407 let ws_stream = api.diff_book_depth(params).await.unwrap();
2408
2409 let called = Arc::new(AtomicBool::new(false));
2410 let called_with_message = called.clone();
2411 ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
2412 called_with_message.store(true, Ordering::SeqCst);
2413 });
2414
2415 let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
2416 let msg = json!({
2417 "stream": stream,
2418 "data": payload,
2419 });
2420
2421 streams_base.on_message(msg.to_string(), conn.clone()).await;
2422 yield_now().await;
2423
2424 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2425 });
2426 }
2427
2428 #[test]
2429 fn diff_book_depth_should_not_fire_after_unsubscribe() {
2430 TOKIO_SHARED_RT.block_on(async {
2431 let (streams_base, conn) = make_streams_base().await;
2432 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2433
2434 let id = "test-id-123".to_string();
2435
2436 let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2437
2438 let DiffBookDepthParams {
2439 symbol,id,update_speed,
2440 } = params.clone();
2441
2442 let pairs: &[(&str, Option<String>)] = &[
2443 ("symbol",
2444 Some(symbol.clone())
2445 ),
2446 ("id",
2447 id.clone()
2448 ),
2449 ("updateSpeed",
2450 update_speed.clone()
2451 ),
2452 ];
2453
2454 let vars: HashMap<_, _> = pairs
2455 .iter()
2456 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2457 .collect();
2458 let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2459
2460 let ws_stream = api.diff_book_depth(params).await.unwrap();
2461
2462 let called = Arc::new(AtomicBool::new(false));
2463 let called_clone = called.clone();
2464 ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
2465 called_clone.store(true, Ordering::SeqCst);
2466 });
2467
2468 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2469
2470 ws_stream.unsubscribe().await;
2471
2472 let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
2473 let msg = json!({
2474 "stream": stream,
2475 "data": payload,
2476 });
2477
2478 streams_base.on_message(msg.to_string(), conn.clone()).await;
2479
2480 yield_now().await;
2481
2482 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2483 });
2484 }
2485
2486 #[test]
2487 fn kline_should_execute_successfully() {
2488 TOKIO_SHARED_RT.block_on(async {
2489 let (streams_base, _) = make_streams_base().await;
2490 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2491
2492 let id = "test-id-123".to_string();
2493
2494 let params = KlineParams::builder("bnbusdt".to_string(), KlineIntervalEnum::Interval1s)
2495 .id(Some(id.clone()))
2496 .build()
2497 .unwrap();
2498
2499 let KlineParams {
2500 symbol,
2501 interval,
2502 id,
2503 } = params.clone();
2504
2505 let pairs: &[(&str, Option<String>)] = &[
2506 ("symbol", Some(symbol.clone())),
2507 ("interval", Some(interval.as_str().to_string())),
2508 ("id", id.clone()),
2509 ];
2510
2511 let vars: HashMap<_, _> = pairs
2512 .iter()
2513 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2514 .collect();
2515 let stream =
2516 replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2517 let ws_stream = api
2518 .kline(params)
2519 .await
2520 .expect("kline should return a WebsocketStream");
2521
2522 assert!(
2523 streams_base.is_subscribed(&stream).await,
2524 "expected stream '{stream}' to be subscribed"
2525 );
2526 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2527 });
2528 }
2529
2530 #[test]
2531 fn kline_should_handle_incoming_message() {
2532 TOKIO_SHARED_RT.block_on(async {
2533 let (streams_base, conn) = make_streams_base().await;
2534 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2535
2536 let id = "test-id-123".to_string();
2537
2538 let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2539
2540 let KlineParams {
2541 symbol,interval,id,
2542 } = params.clone();
2543
2544 let pairs: &[(&str, Option<String>)] = &[
2545 ("symbol",
2546 Some(symbol.clone())
2547 ),
2548 ("interval",
2549 Some(interval.as_str().to_string())
2550 ),
2551 ("id",
2552 id.clone()
2553 ),
2554 ];
2555
2556 let vars: HashMap<_, _> = pairs
2557 .iter()
2558 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2559 .collect();
2560 let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2561
2562 let ws_stream = api.kline(params).await.unwrap();
2563
2564 let called = Arc::new(AtomicBool::new(false));
2565 let called_with_message = called.clone();
2566 ws_stream.on_message(move |_payload: models::KlineResponse| {
2567 called_with_message.store(true, Ordering::SeqCst);
2568 });
2569
2570 let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2571 let msg = json!({
2572 "stream": stream,
2573 "data": payload,
2574 });
2575
2576 streams_base.on_message(msg.to_string(), conn.clone()).await;
2577 yield_now().await;
2578
2579 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2580 });
2581 }
2582
2583 #[test]
2584 fn kline_should_not_fire_after_unsubscribe() {
2585 TOKIO_SHARED_RT.block_on(async {
2586 let (streams_base, conn) = make_streams_base().await;
2587 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2588
2589 let id = "test-id-123".to_string();
2590
2591 let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2592
2593 let KlineParams {
2594 symbol,interval,id,
2595 } = params.clone();
2596
2597 let pairs: &[(&str, Option<String>)] = &[
2598 ("symbol",
2599 Some(symbol.clone())
2600 ),
2601 ("interval",
2602 Some(interval.as_str().to_string())
2603 ),
2604 ("id",
2605 id.clone()
2606 ),
2607 ];
2608
2609 let vars: HashMap<_, _> = pairs
2610 .iter()
2611 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2612 .collect();
2613 let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2614
2615 let ws_stream = api.kline(params).await.unwrap();
2616
2617 let called = Arc::new(AtomicBool::new(false));
2618 let called_clone = called.clone();
2619 ws_stream.on_message(move |_payload: models::KlineResponse| {
2620 called_clone.store(true, Ordering::SeqCst);
2621 });
2622
2623 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2624
2625 ws_stream.unsubscribe().await;
2626
2627 let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2628 let msg = json!({
2629 "stream": stream,
2630 "data": payload,
2631 });
2632
2633 streams_base.on_message(msg.to_string(), conn.clone()).await;
2634
2635 yield_now().await;
2636
2637 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2638 });
2639 }
2640
2641 #[test]
2642 fn kline_offset_should_execute_successfully() {
2643 TOKIO_SHARED_RT.block_on(async {
2644 let (streams_base, _) = make_streams_base().await;
2645 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2646
2647 let id = "test-id-123".to_string();
2648
2649 let params = KlineOffsetParams::builder(
2650 "bnbusdt".to_string(),
2651 KlineOffsetIntervalEnum::Interval1s,
2652 )
2653 .id(Some(id.clone()))
2654 .build()
2655 .unwrap();
2656
2657 let KlineOffsetParams {
2658 symbol,
2659 interval,
2660 id,
2661 } = params.clone();
2662
2663 let pairs: &[(&str, Option<String>)] = &[
2664 ("symbol", Some(symbol.clone())),
2665 ("interval", Some(interval.as_str().to_string())),
2666 ("id", id.clone()),
2667 ];
2668
2669 let vars: HashMap<_, _> = pairs
2670 .iter()
2671 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2672 .collect();
2673 let stream =
2674 replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2675 let ws_stream = api
2676 .kline_offset(params)
2677 .await
2678 .expect("kline_offset should return a WebsocketStream");
2679
2680 assert!(
2681 streams_base.is_subscribed(&stream).await,
2682 "expected stream '{stream}' to be subscribed"
2683 );
2684 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2685 });
2686 }
2687
2688 #[test]
2689 fn kline_offset_should_handle_incoming_message() {
2690 TOKIO_SHARED_RT.block_on(async {
2691 let (streams_base, conn) = make_streams_base().await;
2692 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2693
2694 let id = "test-id-123".to_string();
2695
2696 let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2697
2698 let KlineOffsetParams {
2699 symbol,interval,id,
2700 } = params.clone();
2701
2702 let pairs: &[(&str, Option<String>)] = &[
2703 ("symbol",
2704 Some(symbol.clone())
2705 ),
2706 ("interval",
2707 Some(interval.as_str().to_string())
2708 ),
2709 ("id",
2710 id.clone()
2711 ),
2712 ];
2713
2714 let vars: HashMap<_, _> = pairs
2715 .iter()
2716 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2717 .collect();
2718 let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2719
2720 let ws_stream = api.kline_offset(params).await.unwrap();
2721
2722 let called = Arc::new(AtomicBool::new(false));
2723 let called_with_message = called.clone();
2724 ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
2725 called_with_message.store(true, Ordering::SeqCst);
2726 });
2727
2728 let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2729 let msg = json!({
2730 "stream": stream,
2731 "data": payload,
2732 });
2733
2734 streams_base.on_message(msg.to_string(), conn.clone()).await;
2735 yield_now().await;
2736
2737 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2738 });
2739 }
2740
2741 #[test]
2742 fn kline_offset_should_not_fire_after_unsubscribe() {
2743 TOKIO_SHARED_RT.block_on(async {
2744 let (streams_base, conn) = make_streams_base().await;
2745 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2746
2747 let id = "test-id-123".to_string();
2748
2749 let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2750
2751 let KlineOffsetParams {
2752 symbol,interval,id,
2753 } = params.clone();
2754
2755 let pairs: &[(&str, Option<String>)] = &[
2756 ("symbol",
2757 Some(symbol.clone())
2758 ),
2759 ("interval",
2760 Some(interval.as_str().to_string())
2761 ),
2762 ("id",
2763 id.clone()
2764 ),
2765 ];
2766
2767 let vars: HashMap<_, _> = pairs
2768 .iter()
2769 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2770 .collect();
2771 let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2772
2773 let ws_stream = api.kline_offset(params).await.unwrap();
2774
2775 let called = Arc::new(AtomicBool::new(false));
2776 let called_clone = called.clone();
2777 ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
2778 called_clone.store(true, Ordering::SeqCst);
2779 });
2780
2781 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2782
2783 ws_stream.unsubscribe().await;
2784
2785 let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2786 let msg = json!({
2787 "stream": stream,
2788 "data": payload,
2789 });
2790
2791 streams_base.on_message(msg.to_string(), conn.clone()).await;
2792
2793 yield_now().await;
2794
2795 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2796 });
2797 }
2798
2799 #[test]
2800 fn mini_ticker_should_execute_successfully() {
2801 TOKIO_SHARED_RT.block_on(async {
2802 let (streams_base, _) = make_streams_base().await;
2803 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2804
2805 let id = "test-id-123".to_string();
2806
2807 let params = MiniTickerParams::builder("bnbusdt".to_string())
2808 .id(Some(id.clone()))
2809 .build()
2810 .unwrap();
2811
2812 let MiniTickerParams { symbol, id } = params.clone();
2813
2814 let pairs: &[(&str, Option<String>)] =
2815 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2816
2817 let vars: HashMap<_, _> = pairs
2818 .iter()
2819 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2820 .collect();
2821 let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2822 let ws_stream = api
2823 .mini_ticker(params)
2824 .await
2825 .expect("mini_ticker should return a WebsocketStream");
2826
2827 assert!(
2828 streams_base.is_subscribed(&stream).await,
2829 "expected stream '{stream}' to be subscribed"
2830 );
2831 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2832 });
2833 }
2834
2835 #[test]
2836 fn mini_ticker_should_handle_incoming_message() {
2837 TOKIO_SHARED_RT.block_on(async {
2838 let (streams_base, conn) = make_streams_base().await;
2839 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2840
2841 let id = "test-id-123".to_string();
2842
2843 let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2844
2845 let MiniTickerParams {
2846 symbol,id,
2847 } = params.clone();
2848
2849 let pairs: &[(&str, Option<String>)] = &[
2850 ("symbol",
2851 Some(symbol.clone())
2852 ),
2853 ("id",
2854 id.clone()
2855 ),
2856 ];
2857
2858 let vars: HashMap<_, _> = pairs
2859 .iter()
2860 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2861 .collect();
2862 let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2863
2864 let ws_stream = api.mini_ticker(params).await.unwrap();
2865
2866 let called = Arc::new(AtomicBool::new(false));
2867 let called_with_message = called.clone();
2868 ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
2869 called_with_message.store(true, Ordering::SeqCst);
2870 });
2871
2872 let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
2873 let msg = json!({
2874 "stream": stream,
2875 "data": payload,
2876 });
2877
2878 streams_base.on_message(msg.to_string(), conn.clone()).await;
2879 yield_now().await;
2880
2881 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2882 });
2883 }
2884
2885 #[test]
2886 fn mini_ticker_should_not_fire_after_unsubscribe() {
2887 TOKIO_SHARED_RT.block_on(async {
2888 let (streams_base, conn) = make_streams_base().await;
2889 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2890
2891 let id = "test-id-123".to_string();
2892
2893 let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2894
2895 let MiniTickerParams {
2896 symbol,id,
2897 } = params.clone();
2898
2899 let pairs: &[(&str, Option<String>)] = &[
2900 ("symbol",
2901 Some(symbol.clone())
2902 ),
2903 ("id",
2904 id.clone()
2905 ),
2906 ];
2907
2908 let vars: HashMap<_, _> = pairs
2909 .iter()
2910 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2911 .collect();
2912 let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2913
2914 let ws_stream = api.mini_ticker(params).await.unwrap();
2915
2916 let called = Arc::new(AtomicBool::new(false));
2917 let called_clone = called.clone();
2918 ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
2919 called_clone.store(true, Ordering::SeqCst);
2920 });
2921
2922 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2923
2924 ws_stream.unsubscribe().await;
2925
2926 let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
2927 let msg = json!({
2928 "stream": stream,
2929 "data": payload,
2930 });
2931
2932 streams_base.on_message(msg.to_string(), conn.clone()).await;
2933
2934 yield_now().await;
2935
2936 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2937 });
2938 }
2939
2940 #[test]
2941 fn partial_book_depth_should_execute_successfully() {
2942 TOKIO_SHARED_RT.block_on(async {
2943 let (streams_base, _) = make_streams_base().await;
2944 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2945
2946 let id = "test-id-123".to_string();
2947
2948 let params = PartialBookDepthParams::builder(
2949 "bnbusdt".to_string(),
2950 PartialBookDepthLevelsEnum::Levels5,
2951 )
2952 .id(Some(id.clone()))
2953 .build()
2954 .unwrap();
2955
2956 let PartialBookDepthParams {
2957 symbol,
2958 levels,
2959 id,
2960 update_speed,
2961 } = params.clone();
2962
2963 let pairs: &[(&str, Option<String>)] = &[
2964 ("symbol", Some(symbol.clone())),
2965 ("levels", Some(levels.as_str().to_string())),
2966 ("id", id.clone()),
2967 ("updateSpeed", update_speed.clone()),
2968 ];
2969
2970 let vars: HashMap<_, _> = pairs
2971 .iter()
2972 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2973 .collect();
2974 let stream = replace_websocket_streams_placeholders(
2975 "/<symbol>@depth<levels>@<updateSpeed>",
2976 &vars,
2977 );
2978 let ws_stream = api
2979 .partial_book_depth(params)
2980 .await
2981 .expect("partial_book_depth should return a WebsocketStream");
2982
2983 assert!(
2984 streams_base.is_subscribed(&stream).await,
2985 "expected stream '{stream}' to be subscribed"
2986 );
2987 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2988 });
2989 }
2990
2991 #[test]
2992 fn partial_book_depth_should_handle_incoming_message() {
2993 TOKIO_SHARED_RT.block_on(async {
2994 let (streams_base, conn) = make_streams_base().await;
2995 let api = WebSocketStreamsApiClient::new(streams_base.clone());
2996
2997 let id = "test-id-123".to_string();
2998
2999 let params = PartialBookDepthParams::builder(
3000 "bnbusdt".to_string(),
3001 PartialBookDepthLevelsEnum::Levels5,
3002 )
3003 .id(Some(id.clone()))
3004 .build()
3005 .unwrap();
3006
3007 let PartialBookDepthParams {
3008 symbol,
3009 levels,
3010 id,
3011 update_speed,
3012 } = params.clone();
3013
3014 let pairs: &[(&str, Option<String>)] = &[
3015 ("symbol", Some(symbol.clone())),
3016 ("levels", Some(levels.as_str().to_string())),
3017 ("id", id.clone()),
3018 ("updateSpeed", update_speed.clone()),
3019 ];
3020
3021 let vars: HashMap<_, _> = pairs
3022 .iter()
3023 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3024 .collect();
3025 let stream = replace_websocket_streams_placeholders(
3026 "/<symbol>@depth<levels>@<updateSpeed>",
3027 &vars,
3028 );
3029
3030 let ws_stream = api.partial_book_depth(params).await.unwrap();
3031
3032 let called = Arc::new(AtomicBool::new(false));
3033 let called_with_message = called.clone();
3034 ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
3035 called_with_message.store(true, Ordering::SeqCst);
3036 });
3037
3038 let payload: Value = serde_json::from_str(
3039 r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
3040 )
3041 .unwrap();
3042 let msg = json!({
3043 "stream": stream,
3044 "data": payload,
3045 });
3046
3047 streams_base.on_message(msg.to_string(), conn.clone()).await;
3048 yield_now().await;
3049
3050 assert!(
3051 called.load(Ordering::SeqCst),
3052 "expected our callback to have been invoked"
3053 );
3054 });
3055 }
3056
3057 #[test]
3058 fn partial_book_depth_should_not_fire_after_unsubscribe() {
3059 TOKIO_SHARED_RT.block_on(async {
3060 let (streams_base, conn) = make_streams_base().await;
3061 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3062
3063 let id = "test-id-123".to_string();
3064
3065 let params = PartialBookDepthParams::builder(
3066 "bnbusdt".to_string(),
3067 PartialBookDepthLevelsEnum::Levels5,
3068 )
3069 .id(Some(id.clone()))
3070 .build()
3071 .unwrap();
3072
3073 let PartialBookDepthParams {
3074 symbol,
3075 levels,
3076 id,
3077 update_speed,
3078 } = params.clone();
3079
3080 let pairs: &[(&str, Option<String>)] = &[
3081 ("symbol", Some(symbol.clone())),
3082 ("levels", Some(levels.as_str().to_string())),
3083 ("id", id.clone()),
3084 ("updateSpeed", update_speed.clone()),
3085 ];
3086
3087 let vars: HashMap<_, _> = pairs
3088 .iter()
3089 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3090 .collect();
3091 let stream = replace_websocket_streams_placeholders(
3092 "/<symbol>@depth<levels>@<updateSpeed>",
3093 &vars,
3094 );
3095
3096 let ws_stream = api.partial_book_depth(params).await.unwrap();
3097
3098 let called = Arc::new(AtomicBool::new(false));
3099 let called_clone = called.clone();
3100 ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
3101 called_clone.store(true, Ordering::SeqCst);
3102 });
3103
3104 assert!(
3105 streams_base.is_subscribed(&stream).await,
3106 "should be subscribed before unsubscribe"
3107 );
3108
3109 ws_stream.unsubscribe().await;
3110
3111 let payload: Value = serde_json::from_str(
3112 r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
3113 )
3114 .unwrap();
3115 let msg = json!({
3116 "stream": stream,
3117 "data": payload,
3118 });
3119
3120 streams_base.on_message(msg.to_string(), conn.clone()).await;
3121
3122 yield_now().await;
3123
3124 assert!(
3125 !called.load(Ordering::SeqCst),
3126 "callback should not be invoked after unsubscribe"
3127 );
3128 });
3129 }
3130
3131 #[test]
3132 fn reference_price_should_execute_successfully() {
3133 TOKIO_SHARED_RT.block_on(async {
3134 let (streams_base, _) = make_streams_base().await;
3135 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3136
3137 let id = "test-id-123".to_string();
3138
3139 let params = ReferencePriceParams::builder("bnbusdt".to_string())
3140 .id(Some(id.clone()))
3141 .build()
3142 .unwrap();
3143
3144 let ReferencePriceParams { symbol, id } = params.clone();
3145
3146 let pairs: &[(&str, Option<String>)] =
3147 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3148
3149 let vars: HashMap<_, _> = pairs
3150 .iter()
3151 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3152 .collect();
3153 let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3154 let ws_stream = api
3155 .reference_price(params)
3156 .await
3157 .expect("reference_price should return a WebsocketStream");
3158
3159 assert!(
3160 streams_base.is_subscribed(&stream).await,
3161 "expected stream '{stream}' to be subscribed"
3162 );
3163 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3164 });
3165 }
3166
3167 #[test]
3168 fn reference_price_should_handle_incoming_message() {
3169 TOKIO_SHARED_RT.block_on(async {
3170 let (streams_base, conn) = make_streams_base().await;
3171 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3172
3173 let id = "test-id-123".to_string();
3174
3175 let params = ReferencePriceParams::builder("bnbusdt".to_string())
3176 .id(Some(id.clone()))
3177 .build()
3178 .unwrap();
3179
3180 let ReferencePriceParams { symbol, id } = params.clone();
3181
3182 let pairs: &[(&str, Option<String>)] =
3183 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3184
3185 let vars: HashMap<_, _> = pairs
3186 .iter()
3187 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3188 .collect();
3189 let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3190
3191 let ws_stream = api.reference_price(params).await.unwrap();
3192
3193 let called = Arc::new(AtomicBool::new(false));
3194 let called_with_message = called.clone();
3195 ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
3196 called_with_message.store(true, Ordering::SeqCst);
3197 });
3198
3199 let payload: Value = serde_json::from_str(
3200 r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
3201 )
3202 .unwrap();
3203 let msg = json!({
3204 "stream": stream,
3205 "data": payload,
3206 });
3207
3208 streams_base.on_message(msg.to_string(), conn.clone()).await;
3209 yield_now().await;
3210
3211 assert!(
3212 called.load(Ordering::SeqCst),
3213 "expected our callback to have been invoked"
3214 );
3215 });
3216 }
3217
3218 #[test]
3219 fn reference_price_should_not_fire_after_unsubscribe() {
3220 TOKIO_SHARED_RT.block_on(async {
3221 let (streams_base, conn) = make_streams_base().await;
3222 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3223
3224 let id = "test-id-123".to_string();
3225
3226 let params = ReferencePriceParams::builder("bnbusdt".to_string())
3227 .id(Some(id.clone()))
3228 .build()
3229 .unwrap();
3230
3231 let ReferencePriceParams { symbol, id } = params.clone();
3232
3233 let pairs: &[(&str, Option<String>)] =
3234 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3235
3236 let vars: HashMap<_, _> = pairs
3237 .iter()
3238 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3239 .collect();
3240 let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3241
3242 let ws_stream = api.reference_price(params).await.unwrap();
3243
3244 let called = Arc::new(AtomicBool::new(false));
3245 let called_clone = called.clone();
3246 ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
3247 called_clone.store(true, Ordering::SeqCst);
3248 });
3249
3250 assert!(
3251 streams_base.is_subscribed(&stream).await,
3252 "should be subscribed before unsubscribe"
3253 );
3254
3255 ws_stream.unsubscribe().await;
3256
3257 let payload: Value = serde_json::from_str(
3258 r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
3259 )
3260 .unwrap();
3261 let msg = json!({
3262 "stream": stream,
3263 "data": payload,
3264 });
3265
3266 streams_base.on_message(msg.to_string(), conn.clone()).await;
3267
3268 yield_now().await;
3269
3270 assert!(
3271 !called.load(Ordering::SeqCst),
3272 "callback should not be invoked after unsubscribe"
3273 );
3274 });
3275 }
3276
3277 #[test]
3278 fn rolling_window_ticker_should_execute_successfully() {
3279 TOKIO_SHARED_RT.block_on(async {
3280 let (streams_base, _) = make_streams_base().await;
3281 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3282
3283 let id = "test-id-123".to_string();
3284
3285 let params = RollingWindowTickerParams::builder(
3286 "bnbusdt".to_string(),
3287 RollingWindowTickerWindowSizeEnum::WindowSize1h,
3288 )
3289 .id(Some(id.clone()))
3290 .build()
3291 .unwrap();
3292
3293 let RollingWindowTickerParams {
3294 symbol,
3295 window_size,
3296 id,
3297 } = params.clone();
3298
3299 let pairs: &[(&str, Option<String>)] = &[
3300 ("symbol", Some(symbol.clone())),
3301 ("windowSize", Some(window_size.as_str().to_string())),
3302 ("id", id.clone()),
3303 ];
3304
3305 let vars: HashMap<_, _> = pairs
3306 .iter()
3307 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3308 .collect();
3309 let stream =
3310 replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3311 let ws_stream = api
3312 .rolling_window_ticker(params)
3313 .await
3314 .expect("rolling_window_ticker should return a WebsocketStream");
3315
3316 assert!(
3317 streams_base.is_subscribed(&stream).await,
3318 "expected stream '{stream}' to be subscribed"
3319 );
3320 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3321 });
3322 }
3323
3324 #[test]
3325 fn rolling_window_ticker_should_handle_incoming_message() {
3326 TOKIO_SHARED_RT.block_on(async {
3327 let (streams_base, conn) = make_streams_base().await;
3328 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3329
3330 let id = "test-id-123".to_string();
3331
3332 let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
3333
3334 let RollingWindowTickerParams {
3335 symbol,window_size,id,
3336 } = params.clone();
3337
3338 let pairs: &[(&str, Option<String>)] = &[
3339 ("symbol",
3340 Some(symbol.clone())
3341 ),
3342 ("windowSize",
3343 Some(window_size.as_str().to_string())
3344 ),
3345 ("id",
3346 id.clone()
3347 ),
3348 ];
3349
3350 let vars: HashMap<_, _> = pairs
3351 .iter()
3352 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3353 .collect();
3354 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3355
3356 let ws_stream = api.rolling_window_ticker(params).await.unwrap();
3357
3358 let called = Arc::new(AtomicBool::new(false));
3359 let called_with_message = called.clone();
3360 ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
3361 called_with_message.store(true, Ordering::SeqCst);
3362 });
3363
3364 let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
3365 let msg = json!({
3366 "stream": stream,
3367 "data": payload,
3368 });
3369
3370 streams_base.on_message(msg.to_string(), conn.clone()).await;
3371 yield_now().await;
3372
3373 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3374 });
3375 }
3376
3377 #[test]
3378 fn rolling_window_ticker_should_not_fire_after_unsubscribe() {
3379 TOKIO_SHARED_RT.block_on(async {
3380 let (streams_base, conn) = make_streams_base().await;
3381 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3382
3383 let id = "test-id-123".to_string();
3384
3385 let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
3386
3387 let RollingWindowTickerParams {
3388 symbol,window_size,id,
3389 } = params.clone();
3390
3391 let pairs: &[(&str, Option<String>)] = &[
3392 ("symbol",
3393 Some(symbol.clone())
3394 ),
3395 ("windowSize",
3396 Some(window_size.as_str().to_string())
3397 ),
3398 ("id",
3399 id.clone()
3400 ),
3401 ];
3402
3403 let vars: HashMap<_, _> = pairs
3404 .iter()
3405 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3406 .collect();
3407 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3408
3409 let ws_stream = api.rolling_window_ticker(params).await.unwrap();
3410
3411 let called = Arc::new(AtomicBool::new(false));
3412 let called_clone = called.clone();
3413 ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
3414 called_clone.store(true, Ordering::SeqCst);
3415 });
3416
3417 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3418
3419 ws_stream.unsubscribe().await;
3420
3421 let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
3422 let msg = json!({
3423 "stream": stream,
3424 "data": payload,
3425 });
3426
3427 streams_base.on_message(msg.to_string(), conn.clone()).await;
3428
3429 yield_now().await;
3430
3431 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3432 });
3433 }
3434
3435 #[test]
3436 fn ticker_should_execute_successfully() {
3437 TOKIO_SHARED_RT.block_on(async {
3438 let (streams_base, _) = make_streams_base().await;
3439 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3440
3441 let id = "test-id-123".to_string();
3442
3443 let params = TickerParams::builder("bnbusdt".to_string())
3444 .id(Some(id.clone()))
3445 .build()
3446 .unwrap();
3447
3448 let TickerParams { symbol, id } = params.clone();
3449
3450 let pairs: &[(&str, Option<String>)] =
3451 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3452
3453 let vars: HashMap<_, _> = pairs
3454 .iter()
3455 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3456 .collect();
3457 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3458 let ws_stream = api
3459 .ticker(params)
3460 .await
3461 .expect("ticker should return a WebsocketStream");
3462
3463 assert!(
3464 streams_base.is_subscribed(&stream).await,
3465 "expected stream '{stream}' to be subscribed"
3466 );
3467 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3468 });
3469 }
3470
3471 #[test]
3472 fn ticker_should_handle_incoming_message() {
3473 TOKIO_SHARED_RT.block_on(async {
3474 let (streams_base, conn) = make_streams_base().await;
3475 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3476
3477 let id = "test-id-123".to_string();
3478
3479 let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3480
3481 let TickerParams {
3482 symbol,id,
3483 } = params.clone();
3484
3485 let pairs: &[(&str, Option<String>)] = &[
3486 ("symbol",
3487 Some(symbol.clone())
3488 ),
3489 ("id",
3490 id.clone()
3491 ),
3492 ];
3493
3494 let vars: HashMap<_, _> = pairs
3495 .iter()
3496 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3497 .collect();
3498 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3499
3500 let ws_stream = api.ticker(params).await.unwrap();
3501
3502 let called = Arc::new(AtomicBool::new(false));
3503 let called_with_message = called.clone();
3504 ws_stream.on_message(move |_payload: models::TickerResponse| {
3505 called_with_message.store(true, Ordering::SeqCst);
3506 });
3507
3508 let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
3509 let msg = json!({
3510 "stream": stream,
3511 "data": payload,
3512 });
3513
3514 streams_base.on_message(msg.to_string(), conn.clone()).await;
3515 yield_now().await;
3516
3517 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3518 });
3519 }
3520
3521 #[test]
3522 fn ticker_should_not_fire_after_unsubscribe() {
3523 TOKIO_SHARED_RT.block_on(async {
3524 let (streams_base, conn) = make_streams_base().await;
3525 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3526
3527 let id = "test-id-123".to_string();
3528
3529 let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3530
3531 let TickerParams {
3532 symbol,id,
3533 } = params.clone();
3534
3535 let pairs: &[(&str, Option<String>)] = &[
3536 ("symbol",
3537 Some(symbol.clone())
3538 ),
3539 ("id",
3540 id.clone()
3541 ),
3542 ];
3543
3544 let vars: HashMap<_, _> = pairs
3545 .iter()
3546 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3547 .collect();
3548 let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3549
3550 let ws_stream = api.ticker(params).await.unwrap();
3551
3552 let called = Arc::new(AtomicBool::new(false));
3553 let called_clone = called.clone();
3554 ws_stream.on_message(move |_payload: models::TickerResponse| {
3555 called_clone.store(true, Ordering::SeqCst);
3556 });
3557
3558 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3559
3560 ws_stream.unsubscribe().await;
3561
3562 let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
3563 let msg = json!({
3564 "stream": stream,
3565 "data": payload,
3566 });
3567
3568 streams_base.on_message(msg.to_string(), conn.clone()).await;
3569
3570 yield_now().await;
3571
3572 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3573 });
3574 }
3575
3576 #[test]
3577 fn trade_should_execute_successfully() {
3578 TOKIO_SHARED_RT.block_on(async {
3579 let (streams_base, _) = make_streams_base().await;
3580 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3581
3582 let id = "test-id-123".to_string();
3583
3584 let params = TradeParams::builder("bnbusdt".to_string())
3585 .id(Some(id.clone()))
3586 .build()
3587 .unwrap();
3588
3589 let TradeParams { symbol, id } = params.clone();
3590
3591 let pairs: &[(&str, Option<String>)] =
3592 &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3593
3594 let vars: HashMap<_, _> = pairs
3595 .iter()
3596 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3597 .collect();
3598 let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3599 let ws_stream = api
3600 .trade(params)
3601 .await
3602 .expect("trade should return a WebsocketStream");
3603
3604 assert!(
3605 streams_base.is_subscribed(&stream).await,
3606 "expected stream '{stream}' to be subscribed"
3607 );
3608 assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3609 });
3610 }
3611
3612 #[test]
3613 fn trade_should_handle_incoming_message() {
3614 TOKIO_SHARED_RT.block_on(async {
3615 let (streams_base, conn) = make_streams_base().await;
3616 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3617
3618 let id = "test-id-123".to_string();
3619
3620 let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3621
3622 let TradeParams {
3623 symbol,id,
3624 } = params.clone();
3625
3626 let pairs: &[(&str, Option<String>)] = &[
3627 ("symbol",
3628 Some(symbol.clone())
3629 ),
3630 ("id",
3631 id.clone()
3632 ),
3633 ];
3634
3635 let vars: HashMap<_, _> = pairs
3636 .iter()
3637 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3638 .collect();
3639 let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3640
3641 let ws_stream = api.trade(params).await.unwrap();
3642
3643 let called = Arc::new(AtomicBool::new(false));
3644 let called_with_message = called.clone();
3645 ws_stream.on_message(move |_payload: models::TradeResponse| {
3646 called_with_message.store(true, Ordering::SeqCst);
3647 });
3648
3649 let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
3650 let msg = json!({
3651 "stream": stream,
3652 "data": payload,
3653 });
3654
3655 streams_base.on_message(msg.to_string(), conn.clone()).await;
3656 yield_now().await;
3657
3658 assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3659 });
3660 }
3661
3662 #[test]
3663 fn trade_should_not_fire_after_unsubscribe() {
3664 TOKIO_SHARED_RT.block_on(async {
3665 let (streams_base, conn) = make_streams_base().await;
3666 let api = WebSocketStreamsApiClient::new(streams_base.clone());
3667
3668 let id = "test-id-123".to_string();
3669
3670 let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3671
3672 let TradeParams {
3673 symbol,id,
3674 } = params.clone();
3675
3676 let pairs: &[(&str, Option<String>)] = &[
3677 ("symbol",
3678 Some(symbol.clone())
3679 ),
3680 ("id",
3681 id.clone()
3682 ),
3683 ];
3684
3685 let vars: HashMap<_, _> = pairs
3686 .iter()
3687 .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3688 .collect();
3689 let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3690
3691 let ws_stream = api.trade(params).await.unwrap();
3692
3693 let called = Arc::new(AtomicBool::new(false));
3694 let called_clone = called.clone();
3695 ws_stream.on_message(move |_payload: models::TradeResponse| {
3696 called_clone.store(true, Ordering::SeqCst);
3697 });
3698
3699 assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3700
3701 ws_stream.unsubscribe().await;
3702
3703 let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
3704 let msg = json!({
3705 "stream": stream,
3706 "data": payload,
3707 });
3708
3709 streams_base.on_message(msg.to_string(), conn.clone()).await;
3710
3711 yield_now().await;
3712
3713 assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3714 });
3715 }
3716}