Skip to main content

binance_sdk/spot/websocket_streams/apis/
web_socket_streams_api.rs

1/*
2 * Binance Spot WebSocket Streams
3 *
4 * OpenAPI Specifications for the Binance Spot WebSocket Streams
5 *
6 * API documents:
7 * - [Github web-socket-streams documentation file](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md)
8 * - [General API information for web-socket-streams on website](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams)
9 *
10 *
11 * The version of the OpenAPI document: 1.0.0
12 *
13 *
14 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
15 * https://openapi-generator.tech
16 * Do not edit the class manually.
17 */
18
19#![allow(unused_imports)]
20use async_trait::async_trait;
21use derive_builder::Builder;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::{collections::HashMap, sync::Arc};
25
26use crate::common::{
27    models::ParamBuildError,
28    utils::replace_websocket_streams_placeholders,
29    websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
30};
31use crate::models::StreamId;
32use crate::spot::websocket_streams::models;
33
34#[async_trait]
35pub trait WebSocketStreamsApi: Send + Sync {
36    async fn agg_trade(
37        &self,
38        params: AggTradeParams,
39    ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>>;
40    async fn all_market_rolling_window_ticker(
41        &self,
42        params: AllMarketRollingWindowTickerParams,
43    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>;
44    async fn all_mini_ticker(
45        &self,
46        params: AllMiniTickerParams,
47    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>>;
48    async fn avg_price(
49        &self,
50        params: AvgPriceParams,
51    ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>>;
52    async fn block_trade(
53        &self,
54        params: BlockTradeParams,
55    ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>>;
56    async fn book_ticker(
57        &self,
58        params: BookTickerParams,
59    ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>>;
60    async fn diff_book_depth(
61        &self,
62        params: DiffBookDepthParams,
63    ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>>;
64    async fn kline(
65        &self,
66        params: KlineParams,
67    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>>;
68    async fn kline_offset(
69        &self,
70        params: KlineOffsetParams,
71    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>>;
72    async fn mini_ticker(
73        &self,
74        params: MiniTickerParams,
75    ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>>;
76    async fn partial_book_depth(
77        &self,
78        params: PartialBookDepthParams,
79    ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>>;
80    async fn reference_price(
81        &self,
82        params: ReferencePriceParams,
83    ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>>;
84    async fn rolling_window_ticker(
85        &self,
86        params: RollingWindowTickerParams,
87    ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>>;
88    async fn ticker(
89        &self,
90        params: TickerParams,
91    ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>>;
92    async fn trade(
93        &self,
94        params: TradeParams,
95    ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>>;
96}
97
98pub struct WebSocketStreamsApiClient {
99    websocket_streams_base: Arc<WebsocketStreams>,
100}
101
102impl WebSocketStreamsApiClient {
103    pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
104        Self {
105            websocket_streams_base,
106        }
107    }
108}
109
110#[allow(non_camel_case_types)]
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum AllMarketRollingWindowTickerWindowSizeEnum {
113    #[serde(rename = "1h")]
114    WindowSize1h,
115    #[serde(rename = "4h")]
116    WindowSize4h,
117    #[serde(rename = "1d")]
118    WindowSize1d,
119}
120
121impl AllMarketRollingWindowTickerWindowSizeEnum {
122    #[must_use]
123    pub fn as_str(&self) -> &'static str {
124        match self {
125            Self::WindowSize1h => "1h",
126            Self::WindowSize4h => "4h",
127            Self::WindowSize1d => "1d",
128        }
129    }
130}
131
132impl std::str::FromStr for AllMarketRollingWindowTickerWindowSizeEnum {
133    type Err = Box<dyn std::error::Error + Send + Sync>;
134
135    fn from_str(s: &str) -> Result<Self, Self::Err> {
136        match s {
137            "1h" => Ok(Self::WindowSize1h),
138            "4h" => Ok(Self::WindowSize4h),
139            "1d" => Ok(Self::WindowSize1d),
140            other => Err(format!(
141                "invalid AllMarketRollingWindowTickerWindowSizeEnum: {}",
142                other
143            )
144            .into()),
145        }
146    }
147}
148
149#[allow(non_camel_case_types)]
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub enum KlineIntervalEnum {
152    #[serde(rename = "1s")]
153    Interval1s,
154    #[serde(rename = "1m")]
155    Interval1m,
156    #[serde(rename = "3m")]
157    Interval3m,
158    #[serde(rename = "5m")]
159    Interval5m,
160    #[serde(rename = "15m")]
161    Interval15m,
162    #[serde(rename = "30m")]
163    Interval30m,
164    #[serde(rename = "1h")]
165    Interval1h,
166    #[serde(rename = "2h")]
167    Interval2h,
168    #[serde(rename = "4h")]
169    Interval4h,
170    #[serde(rename = "6h")]
171    Interval6h,
172    #[serde(rename = "8h")]
173    Interval8h,
174    #[serde(rename = "12h")]
175    Interval12h,
176    #[serde(rename = "1d")]
177    Interval1d,
178    #[serde(rename = "3d")]
179    Interval3d,
180    #[serde(rename = "1w")]
181    Interval1w,
182    #[serde(rename = "1M")]
183    Interval1M,
184}
185
186impl KlineIntervalEnum {
187    #[must_use]
188    pub fn as_str(&self) -> &'static str {
189        match self {
190            Self::Interval1s => "1s",
191            Self::Interval1m => "1m",
192            Self::Interval3m => "3m",
193            Self::Interval5m => "5m",
194            Self::Interval15m => "15m",
195            Self::Interval30m => "30m",
196            Self::Interval1h => "1h",
197            Self::Interval2h => "2h",
198            Self::Interval4h => "4h",
199            Self::Interval6h => "6h",
200            Self::Interval8h => "8h",
201            Self::Interval12h => "12h",
202            Self::Interval1d => "1d",
203            Self::Interval3d => "3d",
204            Self::Interval1w => "1w",
205            Self::Interval1M => "1M",
206        }
207    }
208}
209
210impl std::str::FromStr for KlineIntervalEnum {
211    type Err = Box<dyn std::error::Error + Send + Sync>;
212
213    fn from_str(s: &str) -> Result<Self, Self::Err> {
214        match s {
215            "1s" => Ok(Self::Interval1s),
216            "1m" => Ok(Self::Interval1m),
217            "3m" => Ok(Self::Interval3m),
218            "5m" => Ok(Self::Interval5m),
219            "15m" => Ok(Self::Interval15m),
220            "30m" => Ok(Self::Interval30m),
221            "1h" => Ok(Self::Interval1h),
222            "2h" => Ok(Self::Interval2h),
223            "4h" => Ok(Self::Interval4h),
224            "6h" => Ok(Self::Interval6h),
225            "8h" => Ok(Self::Interval8h),
226            "12h" => Ok(Self::Interval12h),
227            "1d" => Ok(Self::Interval1d),
228            "3d" => Ok(Self::Interval3d),
229            "1w" => Ok(Self::Interval1w),
230            "1M" => Ok(Self::Interval1M),
231            other => Err(format!("invalid KlineIntervalEnum: {}", other).into()),
232        }
233    }
234}
235
236#[allow(non_camel_case_types)]
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum KlineOffsetIntervalEnum {
239    #[serde(rename = "1s")]
240    Interval1s,
241    #[serde(rename = "1m")]
242    Interval1m,
243    #[serde(rename = "3m")]
244    Interval3m,
245    #[serde(rename = "5m")]
246    Interval5m,
247    #[serde(rename = "15m")]
248    Interval15m,
249    #[serde(rename = "30m")]
250    Interval30m,
251    #[serde(rename = "1h")]
252    Interval1h,
253    #[serde(rename = "2h")]
254    Interval2h,
255    #[serde(rename = "4h")]
256    Interval4h,
257    #[serde(rename = "6h")]
258    Interval6h,
259    #[serde(rename = "8h")]
260    Interval8h,
261    #[serde(rename = "12h")]
262    Interval12h,
263    #[serde(rename = "1d")]
264    Interval1d,
265    #[serde(rename = "3d")]
266    Interval3d,
267    #[serde(rename = "1w")]
268    Interval1w,
269    #[serde(rename = "1M")]
270    Interval1M,
271}
272
273impl KlineOffsetIntervalEnum {
274    #[must_use]
275    pub fn as_str(&self) -> &'static str {
276        match self {
277            Self::Interval1s => "1s",
278            Self::Interval1m => "1m",
279            Self::Interval3m => "3m",
280            Self::Interval5m => "5m",
281            Self::Interval15m => "15m",
282            Self::Interval30m => "30m",
283            Self::Interval1h => "1h",
284            Self::Interval2h => "2h",
285            Self::Interval4h => "4h",
286            Self::Interval6h => "6h",
287            Self::Interval8h => "8h",
288            Self::Interval12h => "12h",
289            Self::Interval1d => "1d",
290            Self::Interval3d => "3d",
291            Self::Interval1w => "1w",
292            Self::Interval1M => "1M",
293        }
294    }
295}
296
297impl std::str::FromStr for KlineOffsetIntervalEnum {
298    type Err = Box<dyn std::error::Error + Send + Sync>;
299
300    fn from_str(s: &str) -> Result<Self, Self::Err> {
301        match s {
302            "1s" => Ok(Self::Interval1s),
303            "1m" => Ok(Self::Interval1m),
304            "3m" => Ok(Self::Interval3m),
305            "5m" => Ok(Self::Interval5m),
306            "15m" => Ok(Self::Interval15m),
307            "30m" => Ok(Self::Interval30m),
308            "1h" => Ok(Self::Interval1h),
309            "2h" => Ok(Self::Interval2h),
310            "4h" => Ok(Self::Interval4h),
311            "6h" => Ok(Self::Interval6h),
312            "8h" => Ok(Self::Interval8h),
313            "12h" => Ok(Self::Interval12h),
314            "1d" => Ok(Self::Interval1d),
315            "3d" => Ok(Self::Interval3d),
316            "1w" => Ok(Self::Interval1w),
317            "1M" => Ok(Self::Interval1M),
318            other => Err(format!("invalid KlineOffsetIntervalEnum: {}", other).into()),
319        }
320    }
321}
322
323#[allow(non_camel_case_types)]
324#[derive(Debug, Clone, Serialize, Deserialize)]
325pub enum PartialBookDepthLevelsEnum {
326    #[serde(rename = "5")]
327    Levels5,
328    #[serde(rename = "10")]
329    Levels10,
330    #[serde(rename = "20")]
331    Levels20,
332}
333
334impl PartialBookDepthLevelsEnum {
335    #[must_use]
336    pub fn as_str(&self) -> &'static str {
337        match self {
338            Self::Levels5 => "5",
339            Self::Levels10 => "10",
340            Self::Levels20 => "20",
341        }
342    }
343}
344
345impl std::str::FromStr for PartialBookDepthLevelsEnum {
346    type Err = Box<dyn std::error::Error + Send + Sync>;
347
348    fn from_str(s: &str) -> Result<Self, Self::Err> {
349        match s {
350            "5" => Ok(Self::Levels5),
351            "10" => Ok(Self::Levels10),
352            "20" => Ok(Self::Levels20),
353            other => Err(format!("invalid PartialBookDepthLevelsEnum: {}", other).into()),
354        }
355    }
356}
357
358#[allow(non_camel_case_types)]
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub enum RollingWindowTickerWindowSizeEnum {
361    #[serde(rename = "1h")]
362    WindowSize1h,
363    #[serde(rename = "4h")]
364    WindowSize4h,
365    #[serde(rename = "1d")]
366    WindowSize1d,
367}
368
369impl RollingWindowTickerWindowSizeEnum {
370    #[must_use]
371    pub fn as_str(&self) -> &'static str {
372        match self {
373            Self::WindowSize1h => "1h",
374            Self::WindowSize4h => "4h",
375            Self::WindowSize1d => "1d",
376        }
377    }
378}
379
380impl std::str::FromStr for RollingWindowTickerWindowSizeEnum {
381    type Err = Box<dyn std::error::Error + Send + Sync>;
382
383    fn from_str(s: &str) -> Result<Self, Self::Err> {
384        match s {
385            "1h" => Ok(Self::WindowSize1h),
386            "4h" => Ok(Self::WindowSize4h),
387            "1d" => Ok(Self::WindowSize1d),
388            other => Err(format!("invalid RollingWindowTickerWindowSizeEnum: {}", other).into()),
389        }
390    }
391}
392
393/// Request parameters for the [`agg_trade`] operation.
394///
395/// This struct holds all of the inputs you can pass when calling
396/// [`agg_trade`](#method.agg_trade).
397#[derive(Clone, Debug, Builder)]
398#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
399pub struct AggTradeParams {
400    /// Symbol to query
401    ///
402    /// This field is **required.
403    #[builder(setter(into))]
404    pub symbol: String,
405    /// Unique WebSocket request ID.
406    ///
407    /// This field is **optional.
408    #[builder(setter(into), default)]
409    pub id: Option<String>,
410}
411
412impl AggTradeParams {
413    /// Create a builder for [`agg_trade`].
414    ///
415    /// Required parameters:
416    ///
417    /// * `symbol` — Symbol to query
418    ///
419    #[must_use]
420    pub fn builder(symbol: String) -> AggTradeParamsBuilder {
421        AggTradeParamsBuilder::default().symbol(symbol)
422    }
423}
424/// Request parameters for the [`all_market_rolling_window_ticker`] operation.
425///
426/// This struct holds all of the inputs you can pass when calling
427/// [`all_market_rolling_window_ticker`](#method.all_market_rolling_window_ticker).
428#[derive(Clone, Debug, Builder)]
429#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
430pub struct AllMarketRollingWindowTickerParams {
431    ///
432    /// The `window_size` parameter.
433    ///
434    /// This field is **required.
435    #[builder(setter(into))]
436    pub window_size: AllMarketRollingWindowTickerWindowSizeEnum,
437    /// Unique WebSocket request ID.
438    ///
439    /// This field is **optional.
440    #[builder(setter(into), default)]
441    pub id: Option<String>,
442}
443
444impl AllMarketRollingWindowTickerParams {
445    /// Create a builder for [`all_market_rolling_window_ticker`].
446    ///
447    /// Required parameters:
448    ///
449    /// * `window_size` — String
450    ///
451    #[must_use]
452    pub fn builder(
453        window_size: AllMarketRollingWindowTickerWindowSizeEnum,
454    ) -> AllMarketRollingWindowTickerParamsBuilder {
455        AllMarketRollingWindowTickerParamsBuilder::default().window_size(window_size)
456    }
457}
458/// Request parameters for the [`all_mini_ticker`] operation.
459///
460/// This struct holds all of the inputs you can pass when calling
461/// [`all_mini_ticker`](#method.all_mini_ticker).
462#[derive(Clone, Debug, Builder, Default)]
463#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
464pub struct AllMiniTickerParams {
465    /// Unique WebSocket request ID.
466    ///
467    /// This field is **optional.
468    #[builder(setter(into), default)]
469    pub id: Option<String>,
470}
471
472impl AllMiniTickerParams {
473    /// Create a builder for [`all_mini_ticker`].
474    ///
475    #[must_use]
476    pub fn builder() -> AllMiniTickerParamsBuilder {
477        AllMiniTickerParamsBuilder::default()
478    }
479}
480/// Request parameters for the [`avg_price`] operation.
481///
482/// This struct holds all of the inputs you can pass when calling
483/// [`avg_price`](#method.avg_price).
484#[derive(Clone, Debug, Builder)]
485#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
486pub struct AvgPriceParams {
487    /// Symbol to query
488    ///
489    /// This field is **required.
490    #[builder(setter(into))]
491    pub symbol: String,
492    /// Unique WebSocket request ID.
493    ///
494    /// This field is **optional.
495    #[builder(setter(into), default)]
496    pub id: Option<String>,
497}
498
499impl AvgPriceParams {
500    /// Create a builder for [`avg_price`].
501    ///
502    /// Required parameters:
503    ///
504    /// * `symbol` — Symbol to query
505    ///
506    #[must_use]
507    pub fn builder(symbol: String) -> AvgPriceParamsBuilder {
508        AvgPriceParamsBuilder::default().symbol(symbol)
509    }
510}
511/// Request parameters for the [`block_trade`] operation.
512///
513/// This struct holds all of the inputs you can pass when calling
514/// [`block_trade`](#method.block_trade).
515#[derive(Clone, Debug, Builder)]
516#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
517pub struct BlockTradeParams {
518    /// Symbol to query
519    ///
520    /// This field is **required.
521    #[builder(setter(into))]
522    pub symbol: String,
523    /// Unique WebSocket request ID.
524    ///
525    /// This field is **optional.
526    #[builder(setter(into), default)]
527    pub id: Option<String>,
528}
529
530impl BlockTradeParams {
531    /// Create a builder for [`block_trade`].
532    ///
533    /// Required parameters:
534    ///
535    /// * `symbol` — Symbol to query
536    ///
537    #[must_use]
538    pub fn builder(symbol: String) -> BlockTradeParamsBuilder {
539        BlockTradeParamsBuilder::default().symbol(symbol)
540    }
541}
542/// Request parameters for the [`book_ticker`] operation.
543///
544/// This struct holds all of the inputs you can pass when calling
545/// [`book_ticker`](#method.book_ticker).
546#[derive(Clone, Debug, Builder)]
547#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
548pub struct BookTickerParams {
549    /// Symbol to query
550    ///
551    /// This field is **required.
552    #[builder(setter(into))]
553    pub symbol: String,
554    /// Unique WebSocket request ID.
555    ///
556    /// This field is **optional.
557    #[builder(setter(into), default)]
558    pub id: Option<String>,
559}
560
561impl BookTickerParams {
562    /// Create a builder for [`book_ticker`].
563    ///
564    /// Required parameters:
565    ///
566    /// * `symbol` — Symbol to query
567    ///
568    #[must_use]
569    pub fn builder(symbol: String) -> BookTickerParamsBuilder {
570        BookTickerParamsBuilder::default().symbol(symbol)
571    }
572}
573/// Request parameters for the [`diff_book_depth`] operation.
574///
575/// This struct holds all of the inputs you can pass when calling
576/// [`diff_book_depth`](#method.diff_book_depth).
577#[derive(Clone, Debug, Builder)]
578#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
579pub struct DiffBookDepthParams {
580    /// Symbol to query
581    ///
582    /// This field is **required.
583    #[builder(setter(into))]
584    pub symbol: String,
585    /// Unique WebSocket request ID.
586    ///
587    /// This field is **optional.
588    #[builder(setter(into), default)]
589    pub id: Option<String>,
590    /// 1000ms or 100ms
591    ///
592    /// This field is **optional.
593    #[builder(setter(into), default)]
594    pub update_speed: Option<String>,
595}
596
597impl DiffBookDepthParams {
598    /// Create a builder for [`diff_book_depth`].
599    ///
600    /// Required parameters:
601    ///
602    /// * `symbol` — Symbol to query
603    ///
604    #[must_use]
605    pub fn builder(symbol: String) -> DiffBookDepthParamsBuilder {
606        DiffBookDepthParamsBuilder::default().symbol(symbol)
607    }
608}
609/// Request parameters for the [`kline`] operation.
610///
611/// This struct holds all of the inputs you can pass when calling
612/// [`kline`](#method.kline).
613#[derive(Clone, Debug, Builder)]
614#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
615pub struct KlineParams {
616    /// Symbol to query
617    ///
618    /// This field is **required.
619    #[builder(setter(into))]
620    pub symbol: String,
621    ///
622    /// The `interval` parameter.
623    ///
624    /// This field is **required.
625    #[builder(setter(into))]
626    pub interval: KlineIntervalEnum,
627    /// Unique WebSocket request ID.
628    ///
629    /// This field is **optional.
630    #[builder(setter(into), default)]
631    pub id: Option<String>,
632}
633
634impl KlineParams {
635    /// Create a builder for [`kline`].
636    ///
637    /// Required parameters:
638    ///
639    /// * `symbol` — Symbol to query
640    /// * `interval` — String
641    ///
642    #[must_use]
643    pub fn builder(symbol: String, interval: KlineIntervalEnum) -> KlineParamsBuilder {
644        KlineParamsBuilder::default()
645            .symbol(symbol)
646            .interval(interval)
647    }
648}
649/// Request parameters for the [`kline_offset`] operation.
650///
651/// This struct holds all of the inputs you can pass when calling
652/// [`kline_offset`](#method.kline_offset).
653#[derive(Clone, Debug, Builder)]
654#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
655pub struct KlineOffsetParams {
656    /// Symbol to query
657    ///
658    /// This field is **required.
659    #[builder(setter(into))]
660    pub symbol: String,
661    ///
662    /// The `interval` parameter.
663    ///
664    /// This field is **required.
665    #[builder(setter(into))]
666    pub interval: KlineOffsetIntervalEnum,
667    /// Unique WebSocket request ID.
668    ///
669    /// This field is **optional.
670    #[builder(setter(into), default)]
671    pub id: Option<String>,
672}
673
674impl KlineOffsetParams {
675    /// Create a builder for [`kline_offset`].
676    ///
677    /// Required parameters:
678    ///
679    /// * `symbol` — Symbol to query
680    /// * `interval` — String
681    ///
682    #[must_use]
683    pub fn builder(symbol: String, interval: KlineOffsetIntervalEnum) -> KlineOffsetParamsBuilder {
684        KlineOffsetParamsBuilder::default()
685            .symbol(symbol)
686            .interval(interval)
687    }
688}
689/// Request parameters for the [`mini_ticker`] operation.
690///
691/// This struct holds all of the inputs you can pass when calling
692/// [`mini_ticker`](#method.mini_ticker).
693#[derive(Clone, Debug, Builder)]
694#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
695pub struct MiniTickerParams {
696    /// Symbol to query
697    ///
698    /// This field is **required.
699    #[builder(setter(into))]
700    pub symbol: String,
701    /// Unique WebSocket request ID.
702    ///
703    /// This field is **optional.
704    #[builder(setter(into), default)]
705    pub id: Option<String>,
706}
707
708impl MiniTickerParams {
709    /// Create a builder for [`mini_ticker`].
710    ///
711    /// Required parameters:
712    ///
713    /// * `symbol` — Symbol to query
714    ///
715    #[must_use]
716    pub fn builder(symbol: String) -> MiniTickerParamsBuilder {
717        MiniTickerParamsBuilder::default().symbol(symbol)
718    }
719}
720/// Request parameters for the [`partial_book_depth`] operation.
721///
722/// This struct holds all of the inputs you can pass when calling
723/// [`partial_book_depth`](#method.partial_book_depth).
724#[derive(Clone, Debug, Builder)]
725#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
726pub struct PartialBookDepthParams {
727    /// Symbol to query
728    ///
729    /// This field is **required.
730    #[builder(setter(into))]
731    pub symbol: String,
732    ///
733    /// The `levels` parameter.
734    ///
735    /// This field is **required.
736    #[builder(setter(into))]
737    pub levels: PartialBookDepthLevelsEnum,
738    /// Unique WebSocket request ID.
739    ///
740    /// This field is **optional.
741    #[builder(setter(into), default)]
742    pub id: Option<String>,
743    /// 1000ms or 100ms
744    ///
745    /// This field is **optional.
746    #[builder(setter(into), default)]
747    pub update_speed: Option<String>,
748}
749
750impl PartialBookDepthParams {
751    /// Create a builder for [`partial_book_depth`].
752    ///
753    /// Required parameters:
754    ///
755    /// * `symbol` — Symbol to query
756    /// * `levels` — String
757    ///
758    #[must_use]
759    pub fn builder(
760        symbol: String,
761        levels: PartialBookDepthLevelsEnum,
762    ) -> PartialBookDepthParamsBuilder {
763        PartialBookDepthParamsBuilder::default()
764            .symbol(symbol)
765            .levels(levels)
766    }
767}
768/// Request parameters for the [`reference_price`] operation.
769///
770/// This struct holds all of the inputs you can pass when calling
771/// [`reference_price`](#method.reference_price).
772#[derive(Clone, Debug, Builder)]
773#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
774pub struct ReferencePriceParams {
775    /// Symbol to query
776    ///
777    /// This field is **required.
778    #[builder(setter(into))]
779    pub symbol: String,
780    /// Unique WebSocket request ID.
781    ///
782    /// This field is **optional.
783    #[builder(setter(into), default)]
784    pub id: Option<String>,
785}
786
787impl ReferencePriceParams {
788    /// Create a builder for [`reference_price`].
789    ///
790    /// Required parameters:
791    ///
792    /// * `symbol` — Symbol to query
793    ///
794    #[must_use]
795    pub fn builder(symbol: String) -> ReferencePriceParamsBuilder {
796        ReferencePriceParamsBuilder::default().symbol(symbol)
797    }
798}
799/// Request parameters for the [`rolling_window_ticker`] operation.
800///
801/// This struct holds all of the inputs you can pass when calling
802/// [`rolling_window_ticker`](#method.rolling_window_ticker).
803#[derive(Clone, Debug, Builder)]
804#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
805pub struct RollingWindowTickerParams {
806    /// Symbol to query
807    ///
808    /// This field is **required.
809    #[builder(setter(into))]
810    pub symbol: String,
811    ///
812    /// The `window_size` parameter.
813    ///
814    /// This field is **required.
815    #[builder(setter(into))]
816    pub window_size: RollingWindowTickerWindowSizeEnum,
817    /// Unique WebSocket request ID.
818    ///
819    /// This field is **optional.
820    #[builder(setter(into), default)]
821    pub id: Option<String>,
822}
823
824impl RollingWindowTickerParams {
825    /// Create a builder for [`rolling_window_ticker`].
826    ///
827    /// Required parameters:
828    ///
829    /// * `symbol` — Symbol to query
830    /// * `window_size` — String
831    ///
832    #[must_use]
833    pub fn builder(
834        symbol: String,
835        window_size: RollingWindowTickerWindowSizeEnum,
836    ) -> RollingWindowTickerParamsBuilder {
837        RollingWindowTickerParamsBuilder::default()
838            .symbol(symbol)
839            .window_size(window_size)
840    }
841}
842/// Request parameters for the [`ticker`] operation.
843///
844/// This struct holds all of the inputs you can pass when calling
845/// [`ticker`](#method.ticker).
846#[derive(Clone, Debug, Builder)]
847#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
848pub struct TickerParams {
849    /// Symbol to query
850    ///
851    /// This field is **required.
852    #[builder(setter(into))]
853    pub symbol: String,
854    /// Unique WebSocket request ID.
855    ///
856    /// This field is **optional.
857    #[builder(setter(into), default)]
858    pub id: Option<String>,
859}
860
861impl TickerParams {
862    /// Create a builder for [`ticker`].
863    ///
864    /// Required parameters:
865    ///
866    /// * `symbol` — Symbol to query
867    ///
868    #[must_use]
869    pub fn builder(symbol: String) -> TickerParamsBuilder {
870        TickerParamsBuilder::default().symbol(symbol)
871    }
872}
873/// Request parameters for the [`trade`] operation.
874///
875/// This struct holds all of the inputs you can pass when calling
876/// [`trade`](#method.trade).
877#[derive(Clone, Debug, Builder)]
878#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
879pub struct TradeParams {
880    /// Symbol to query
881    ///
882    /// This field is **required.
883    #[builder(setter(into))]
884    pub symbol: String,
885    /// Unique WebSocket request ID.
886    ///
887    /// This field is **optional.
888    #[builder(setter(into), default)]
889    pub id: Option<String>,
890}
891
892impl TradeParams {
893    /// Create a builder for [`trade`].
894    ///
895    /// Required parameters:
896    ///
897    /// * `symbol` — Symbol to query
898    ///
899    #[must_use]
900    pub fn builder(symbol: String) -> TradeParamsBuilder {
901        TradeParamsBuilder::default().symbol(symbol)
902    }
903}
904
905#[async_trait]
906impl WebSocketStreamsApi for WebSocketStreamsApiClient {
907    async fn agg_trade(
908        &self,
909        params: AggTradeParams,
910    ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
911        let AggTradeParams { symbol, id } = params;
912
913        let pairs: &[(&str, Option<String>)] =
914            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
915
916        let vars: HashMap<_, _> = pairs
917            .iter()
918            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
919            .collect();
920
921        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
922
923        let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
924
925        Ok(create_stream_handler::<models::AggTradeResponse>(
926            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
927            stream,
928            id_opt.map(|s| {
929                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
930                    if let Ok(n) = s.parse::<u32>() {
931                        return StreamId::Number(n);
932                    }
933                }
934                StreamId::Str(s)
935            }),
936            None,
937        )
938        .await)
939    }
940
941    async fn all_market_rolling_window_ticker(
942        &self,
943        params: AllMarketRollingWindowTickerParams,
944    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
945    {
946        let AllMarketRollingWindowTickerParams { window_size, id } = params;
947
948        let pairs: &[(&str, Option<String>)] = &[
949            ("windowSize", Some(window_size.as_str().to_string())),
950            ("id", id.clone()),
951        ];
952
953        let vars: HashMap<_, _> = pairs
954            .iter()
955            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
956            .collect();
957
958        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
959
960        let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
961
962        Ok(
963            create_stream_handler::<Vec<models::AllMarketRollingWindowTickerResponseInner>>(
964                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
965                stream,
966                id_opt.map(|s| {
967                    if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
968                        if let Ok(n) = s.parse::<u32>() {
969                            return StreamId::Number(n);
970                        }
971                    }
972                    StreamId::Str(s)
973                }),
974                None,
975            )
976            .await,
977        )
978    }
979
980    async fn all_mini_ticker(
981        &self,
982        params: AllMiniTickerParams,
983    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
984        let AllMiniTickerParams { id } = params;
985
986        let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
987
988        let vars: HashMap<_, _> = pairs
989            .iter()
990            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
991            .collect();
992
993        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
994
995        let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
996
997        Ok(
998            create_stream_handler::<Vec<models::AllMiniTickerResponseInner>>(
999                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1000                stream,
1001                id_opt.map(|s| {
1002                    if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1003                        if let Ok(n) = s.parse::<u32>() {
1004                            return StreamId::Number(n);
1005                        }
1006                    }
1007                    StreamId::Str(s)
1008                }),
1009                None,
1010            )
1011            .await,
1012        )
1013    }
1014
1015    async fn avg_price(
1016        &self,
1017        params: AvgPriceParams,
1018    ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
1019        let AvgPriceParams { symbol, id } = params;
1020
1021        let pairs: &[(&str, Option<String>)] =
1022            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1023
1024        let vars: HashMap<_, _> = pairs
1025            .iter()
1026            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1027            .collect();
1028
1029        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1030
1031        let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1032
1033        Ok(create_stream_handler::<models::AvgPriceResponse>(
1034            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1035            stream,
1036            id_opt.map(|s| {
1037                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1038                    if let Ok(n) = s.parse::<u32>() {
1039                        return StreamId::Number(n);
1040                    }
1041                }
1042                StreamId::Str(s)
1043            }),
1044            None,
1045        )
1046        .await)
1047    }
1048
1049    async fn block_trade(
1050        &self,
1051        params: BlockTradeParams,
1052    ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>> {
1053        let BlockTradeParams { symbol, id } = params;
1054
1055        let pairs: &[(&str, Option<String>)] =
1056            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1057
1058        let vars: HashMap<_, _> = pairs
1059            .iter()
1060            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1061            .collect();
1062
1063        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1064
1065        let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
1066
1067        Ok(create_stream_handler::<models::BlockTradeResponse>(
1068            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1069            stream,
1070            id_opt.map(|s| {
1071                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1072                    if let Ok(n) = s.parse::<u32>() {
1073                        return StreamId::Number(n);
1074                    }
1075                }
1076                StreamId::Str(s)
1077            }),
1078            None,
1079        )
1080        .await)
1081    }
1082
1083    async fn book_ticker(
1084        &self,
1085        params: BookTickerParams,
1086    ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
1087        let BookTickerParams { symbol, id } = params;
1088
1089        let pairs: &[(&str, Option<String>)] =
1090            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1091
1092        let vars: HashMap<_, _> = pairs
1093            .iter()
1094            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1095            .collect();
1096
1097        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1098
1099        let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
1100
1101        Ok(create_stream_handler::<models::BookTickerResponse>(
1102            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1103            stream,
1104            id_opt.map(|s| {
1105                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1106                    if let Ok(n) = s.parse::<u32>() {
1107                        return StreamId::Number(n);
1108                    }
1109                }
1110                StreamId::Str(s)
1111            }),
1112            None,
1113        )
1114        .await)
1115    }
1116
1117    async fn diff_book_depth(
1118        &self,
1119        params: DiffBookDepthParams,
1120    ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
1121        let DiffBookDepthParams {
1122            symbol,
1123            id,
1124            update_speed,
1125        } = params;
1126
1127        let pairs: &[(&str, Option<String>)] = &[
1128            ("symbol", Some(symbol.clone())),
1129            ("id", id.clone()),
1130            ("updateSpeed", update_speed.clone()),
1131        ];
1132
1133        let vars: HashMap<_, _> = pairs
1134            .iter()
1135            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1136            .collect();
1137
1138        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1139
1140        let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
1141
1142        Ok(create_stream_handler::<models::DiffBookDepthResponse>(
1143            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1144            stream,
1145            id_opt.map(|s| {
1146                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1147                    if let Ok(n) = s.parse::<u32>() {
1148                        return StreamId::Number(n);
1149                    }
1150                }
1151                StreamId::Str(s)
1152            }),
1153            None,
1154        )
1155        .await)
1156    }
1157
1158    async fn kline(
1159        &self,
1160        params: KlineParams,
1161    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
1162        let KlineParams {
1163            symbol,
1164            interval,
1165            id,
1166        } = params;
1167
1168        let pairs: &[(&str, Option<String>)] = &[
1169            ("symbol", Some(symbol.clone())),
1170            ("interval", Some(interval.as_str().to_string())),
1171            ("id", id.clone()),
1172        ];
1173
1174        let vars: HashMap<_, _> = pairs
1175            .iter()
1176            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1177            .collect();
1178
1179        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1180
1181        let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
1182
1183        Ok(create_stream_handler::<models::KlineResponse>(
1184            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1185            stream,
1186            id_opt.map(|s| {
1187                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1188                    if let Ok(n) = s.parse::<u32>() {
1189                        return StreamId::Number(n);
1190                    }
1191                }
1192                StreamId::Str(s)
1193            }),
1194            None,
1195        )
1196        .await)
1197    }
1198
1199    async fn kline_offset(
1200        &self,
1201        params: KlineOffsetParams,
1202    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
1203        let KlineOffsetParams {
1204            symbol,
1205            interval,
1206            id,
1207        } = params;
1208
1209        let pairs: &[(&str, Option<String>)] = &[
1210            ("symbol", Some(symbol.clone())),
1211            ("interval", Some(interval.as_str().to_string())),
1212            ("id", id.clone()),
1213        ];
1214
1215        let vars: HashMap<_, _> = pairs
1216            .iter()
1217            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1218            .collect();
1219
1220        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1221
1222        let stream =
1223            replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
1224
1225        Ok(create_stream_handler::<models::KlineOffsetResponse>(
1226            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1227            stream,
1228            id_opt.map(|s| {
1229                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1230                    if let Ok(n) = s.parse::<u32>() {
1231                        return StreamId::Number(n);
1232                    }
1233                }
1234                StreamId::Str(s)
1235            }),
1236            None,
1237        )
1238        .await)
1239    }
1240
1241    async fn mini_ticker(
1242        &self,
1243        params: MiniTickerParams,
1244    ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
1245        let MiniTickerParams { symbol, id } = params;
1246
1247        let pairs: &[(&str, Option<String>)] =
1248            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1249
1250        let vars: HashMap<_, _> = pairs
1251            .iter()
1252            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1253            .collect();
1254
1255        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1256
1257        let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
1258
1259        Ok(create_stream_handler::<models::MiniTickerResponse>(
1260            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1261            stream,
1262            id_opt.map(|s| {
1263                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1264                    if let Ok(n) = s.parse::<u32>() {
1265                        return StreamId::Number(n);
1266                    }
1267                }
1268                StreamId::Str(s)
1269            }),
1270            None,
1271        )
1272        .await)
1273    }
1274
1275    async fn partial_book_depth(
1276        &self,
1277        params: PartialBookDepthParams,
1278    ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
1279        let PartialBookDepthParams {
1280            symbol,
1281            levels,
1282            id,
1283            update_speed,
1284        } = params;
1285
1286        let pairs: &[(&str, Option<String>)] = &[
1287            ("symbol", Some(symbol.clone())),
1288            ("levels", Some(levels.as_str().to_string())),
1289            ("id", id.clone()),
1290            ("updateSpeed", update_speed.clone()),
1291        ];
1292
1293        let vars: HashMap<_, _> = pairs
1294            .iter()
1295            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1296            .collect();
1297
1298        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1299
1300        let stream =
1301            replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);
1302
1303        Ok(create_stream_handler::<models::PartialBookDepthResponse>(
1304            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1305            stream,
1306            id_opt.map(|s| {
1307                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1308                    if let Ok(n) = s.parse::<u32>() {
1309                        return StreamId::Number(n);
1310                    }
1311                }
1312                StreamId::Str(s)
1313            }),
1314            None,
1315        )
1316        .await)
1317    }
1318
1319    async fn reference_price(
1320        &self,
1321        params: ReferencePriceParams,
1322    ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>> {
1323        let ReferencePriceParams { symbol, id } = params;
1324
1325        let pairs: &[(&str, Option<String>)] =
1326            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1327
1328        let vars: HashMap<_, _> = pairs
1329            .iter()
1330            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1331            .collect();
1332
1333        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1334
1335        let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
1336
1337        Ok(create_stream_handler::<models::ReferencePriceResponse>(
1338            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1339            stream,
1340            id_opt.map(|s| {
1341                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1342                    if let Ok(n) = s.parse::<u32>() {
1343                        return StreamId::Number(n);
1344                    }
1345                }
1346                StreamId::Str(s)
1347            }),
1348            None,
1349        )
1350        .await)
1351    }
1352
1353    async fn rolling_window_ticker(
1354        &self,
1355        params: RollingWindowTickerParams,
1356    ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
1357        let RollingWindowTickerParams {
1358            symbol,
1359            window_size,
1360            id,
1361        } = params;
1362
1363        let pairs: &[(&str, Option<String>)] = &[
1364            ("symbol", Some(symbol.clone())),
1365            ("windowSize", Some(window_size.as_str().to_string())),
1366            ("id", id.clone()),
1367        ];
1368
1369        let vars: HashMap<_, _> = pairs
1370            .iter()
1371            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1372            .collect();
1373
1374        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1375
1376        let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
1377
1378        Ok(
1379            create_stream_handler::<models::RollingWindowTickerResponse>(
1380                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1381                stream,
1382                id_opt.map(|s| {
1383                    if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1384                        if let Ok(n) = s.parse::<u32>() {
1385                            return StreamId::Number(n);
1386                        }
1387                    }
1388                    StreamId::Str(s)
1389                }),
1390                None,
1391            )
1392            .await,
1393        )
1394    }
1395
1396    async fn ticker(
1397        &self,
1398        params: TickerParams,
1399    ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
1400        let TickerParams { symbol, id } = params;
1401
1402        let pairs: &[(&str, Option<String>)] =
1403            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1404
1405        let vars: HashMap<_, _> = pairs
1406            .iter()
1407            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1408            .collect();
1409
1410        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1411
1412        let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
1413
1414        Ok(create_stream_handler::<models::TickerResponse>(
1415            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1416            stream,
1417            id_opt.map(|s| {
1418                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1419                    if let Ok(n) = s.parse::<u32>() {
1420                        return StreamId::Number(n);
1421                    }
1422                }
1423                StreamId::Str(s)
1424            }),
1425            None,
1426        )
1427        .await)
1428    }
1429
1430    async fn trade(
1431        &self,
1432        params: TradeParams,
1433    ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
1434        let TradeParams { symbol, id } = params;
1435
1436        let pairs: &[(&str, Option<String>)] =
1437            &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1438
1439        let vars: HashMap<_, _> = pairs
1440            .iter()
1441            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1442            .collect();
1443
1444        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);
1445
1446        let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
1447
1448        Ok(create_stream_handler::<models::TradeResponse>(
1449            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
1450            stream,
1451            id_opt.map(|s| {
1452                if !s.is_empty() && s.bytes().all(|b| b.is_ascii_digit()) {
1453                    if let Ok(n) = s.parse::<u32>() {
1454                        return StreamId::Number(n);
1455                    }
1456                }
1457                StreamId::Str(s)
1458            }),
1459            None,
1460        )
1461        .await)
1462    }
1463}
1464
1465#[cfg(all(test, feature = "spot"))]
1466mod tests {
1467    use super::*;
1468    use crate::TOKIO_SHARED_RT;
1469    use crate::{
1470        common::websocket::{WebsocketConnection, WebsocketHandler},
1471        config::ConfigurationWebsocketStreams,
1472    };
1473    use serde_json::json;
1474    use std::sync::atomic::{AtomicBool, Ordering};
1475    use tokio::task::yield_now;
1476
1477    async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
1478        let conn = WebsocketConnection::new("test");
1479        let config = ConfigurationWebsocketStreams::builder()
1480            .build()
1481            .expect("Failed to build configuration");
1482        let streams_base = WebsocketStreams::new(config, vec![conn.clone()], vec![]);
1483        conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
1484            .await;
1485        (streams_base, conn)
1486    }
1487
1488    #[test]
1489    fn agg_trade_should_execute_successfully() {
1490        TOKIO_SHARED_RT.block_on(async {
1491            let (streams_base, _) = make_streams_base().await;
1492            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1493
1494            let id = "test-id-123".to_string();
1495
1496            let params = AggTradeParams::builder("bnbusdt".to_string())
1497                .id(Some(id.clone()))
1498                .build()
1499                .unwrap();
1500
1501            let AggTradeParams { symbol, id } = params.clone();
1502
1503            let pairs: &[(&str, Option<String>)] =
1504                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1505
1506            let vars: HashMap<_, _> = pairs
1507                .iter()
1508                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1509                .collect();
1510            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1511            let ws_stream = api
1512                .agg_trade(params)
1513                .await
1514                .expect("agg_trade should return a WebsocketStream");
1515
1516            assert!(
1517                streams_base.is_subscribed(&stream).await,
1518                "expected stream '{stream}' to be subscribed"
1519            );
1520            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1521        });
1522    }
1523
1524    #[test]
1525    fn agg_trade_should_handle_incoming_message() {
1526        TOKIO_SHARED_RT.block_on(async {
1527            let (streams_base, conn) = make_streams_base().await;
1528            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1529
1530            let id = "test-id-123".to_string();
1531
1532            let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1533
1534            let AggTradeParams {
1535                symbol,id,
1536            } = params.clone();
1537
1538            let pairs: &[(&str, Option<String>)] = &[
1539                ("symbol",
1540                        Some(symbol.clone())
1541                ),
1542                ("id",
1543                        id.clone()
1544                ),
1545            ];
1546
1547            let vars: HashMap<_, _> = pairs
1548                .iter()
1549                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1550                .collect();
1551            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1552
1553            let ws_stream = api.agg_trade(params).await.unwrap();
1554
1555            let called = Arc::new(AtomicBool::new(false));
1556            let called_with_message = called.clone();
1557            ws_stream.on_message(move |_payload: models::AggTradeResponse| {
1558                called_with_message.store(true, Ordering::SeqCst);
1559            });
1560
1561            let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
1562            let msg = json!({
1563                "stream": stream,
1564                "data": payload,
1565            });
1566
1567            streams_base.on_message(msg.to_string(), conn.clone()).await;
1568            yield_now().await;
1569
1570            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1571        });
1572    }
1573
1574    #[test]
1575    fn agg_trade_should_not_fire_after_unsubscribe() {
1576        TOKIO_SHARED_RT.block_on(async {
1577            let (streams_base, conn) = make_streams_base().await;
1578            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1579
1580            let id = "test-id-123".to_string();
1581
1582            let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1583
1584            let AggTradeParams {
1585                symbol,id,
1586            } = params.clone();
1587
1588            let pairs: &[(&str, Option<String>)] = &[
1589                ("symbol",
1590                        Some(symbol.clone())
1591                ),
1592                ("id",
1593                        id.clone()
1594                ),
1595            ];
1596
1597            let vars: HashMap<_, _> = pairs
1598                .iter()
1599                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1600                .collect();
1601            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
1602
1603            let ws_stream = api.agg_trade(params).await.unwrap();
1604
1605            let called = Arc::new(AtomicBool::new(false));
1606            let called_clone = called.clone();
1607            ws_stream.on_message(move |_payload: models::AggTradeResponse| {
1608                called_clone.store(true, Ordering::SeqCst);
1609            });
1610
1611            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1612
1613            ws_stream.unsubscribe().await;
1614
1615            let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
1616            let msg = json!({
1617                "stream": stream,
1618                "data": payload,
1619            });
1620
1621            streams_base.on_message(msg.to_string(), conn.clone()).await;
1622
1623            yield_now().await;
1624
1625            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1626        });
1627    }
1628
1629    #[test]
1630    fn all_market_rolling_window_ticker_should_execute_successfully() {
1631        TOKIO_SHARED_RT.block_on(async {
1632            let (streams_base, _) = make_streams_base().await;
1633            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1634
1635            let id = "test-id-123".to_string();
1636
1637            let params = AllMarketRollingWindowTickerParams::builder(
1638                AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,
1639            )
1640            .id(Some(id.clone()))
1641            .build()
1642            .unwrap();
1643
1644            let AllMarketRollingWindowTickerParams { window_size, id } = params.clone();
1645
1646            let pairs: &[(&str, Option<String>)] = &[
1647                ("windowSize", Some(window_size.as_str().to_string())),
1648                ("id", id.clone()),
1649            ];
1650
1651            let vars: HashMap<_, _> = pairs
1652                .iter()
1653                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1654                .collect();
1655            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1656            let ws_stream = api
1657                .all_market_rolling_window_ticker(params)
1658                .await
1659                .expect("all_market_rolling_window_ticker should return a WebsocketStream");
1660
1661            assert!(
1662                streams_base.is_subscribed(&stream).await,
1663                "expected stream '{stream}' to be subscribed"
1664            );
1665            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1666        });
1667    }
1668
1669    #[test]
1670    fn all_market_rolling_window_ticker_should_handle_incoming_message() {
1671        TOKIO_SHARED_RT.block_on(async {
1672            let (streams_base, conn) = make_streams_base().await;
1673            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1674
1675            let id = "test-id-123".to_string();
1676
1677            let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
1678
1679            let AllMarketRollingWindowTickerParams {
1680                window_size,id,
1681            } = params.clone();
1682
1683            let pairs: &[(&str, Option<String>)] = &[
1684                ("windowSize",
1685                        Some(window_size.as_str().to_string())
1686                ),
1687                ("id",
1688                        id.clone()
1689                ),
1690            ];
1691
1692            let vars: HashMap<_, _> = pairs
1693                .iter()
1694                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1695                .collect();
1696            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1697
1698            let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
1699
1700            let called = Arc::new(AtomicBool::new(false));
1701            let called_with_message = called.clone();
1702            ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
1703                called_with_message.store(true, Ordering::SeqCst);
1704            });
1705
1706            let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
1707            let msg = json!({
1708                "stream": stream,
1709                "data": payload,
1710            });
1711
1712            streams_base.on_message(msg.to_string(), conn.clone()).await;
1713            yield_now().await;
1714
1715            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1716        });
1717    }
1718
1719    #[test]
1720    fn all_market_rolling_window_ticker_should_not_fire_after_unsubscribe() {
1721        TOKIO_SHARED_RT.block_on(async {
1722            let (streams_base, conn) = make_streams_base().await;
1723            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1724
1725            let id = "test-id-123".to_string();
1726
1727            let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
1728
1729            let AllMarketRollingWindowTickerParams {
1730                window_size,id,
1731            } = params.clone();
1732
1733            let pairs: &[(&str, Option<String>)] = &[
1734                ("windowSize",
1735                        Some(window_size.as_str().to_string())
1736                ),
1737                ("id",
1738                        id.clone()
1739                ),
1740            ];
1741
1742            let vars: HashMap<_, _> = pairs
1743                .iter()
1744                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1745                .collect();
1746            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
1747
1748            let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();
1749
1750            let called = Arc::new(AtomicBool::new(false));
1751            let called_clone = called.clone();
1752            ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
1753                called_clone.store(true, Ordering::SeqCst);
1754            });
1755
1756            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1757
1758            ws_stream.unsubscribe().await;
1759
1760            let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
1761            let msg = json!({
1762                "stream": stream,
1763                "data": payload,
1764            });
1765
1766            streams_base.on_message(msg.to_string(), conn.clone()).await;
1767
1768            yield_now().await;
1769
1770            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1771        });
1772    }
1773
1774    #[test]
1775    fn all_mini_ticker_should_execute_successfully() {
1776        TOKIO_SHARED_RT.block_on(async {
1777            let (streams_base, _) = make_streams_base().await;
1778            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1779
1780            let id = "test-id-123".to_string();
1781
1782            let params = AllMiniTickerParams::builder()
1783                .id(Some(id.clone()))
1784                .build()
1785                .unwrap();
1786
1787            let AllMiniTickerParams { id } = params.clone();
1788
1789            let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];
1790
1791            let vars: HashMap<_, _> = pairs
1792                .iter()
1793                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1794                .collect();
1795            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1796            let ws_stream = api
1797                .all_mini_ticker(params)
1798                .await
1799                .expect("all_mini_ticker should return a WebsocketStream");
1800
1801            assert!(
1802                streams_base.is_subscribed(&stream).await,
1803                "expected stream '{stream}' to be subscribed"
1804            );
1805            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1806        });
1807    }
1808
1809    #[test]
1810    fn all_mini_ticker_should_handle_incoming_message() {
1811        TOKIO_SHARED_RT.block_on(async {
1812            let (streams_base, conn) = make_streams_base().await;
1813            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1814
1815            let id = "test-id-123".to_string();
1816
1817            let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
1818
1819            let AllMiniTickerParams {
1820                id,
1821            } = params.clone();
1822
1823            let pairs: &[(&str, Option<String>)] = &[
1824                ("id",
1825                        id.clone()
1826                ),
1827            ];
1828
1829            let vars: HashMap<_, _> = pairs
1830                .iter()
1831                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1832                .collect();
1833            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1834
1835            let ws_stream = api.all_mini_ticker(params).await.unwrap();
1836
1837            let called = Arc::new(AtomicBool::new(false));
1838            let called_with_message = called.clone();
1839            ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
1840                called_with_message.store(true, Ordering::SeqCst);
1841            });
1842
1843            let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
1844            let msg = json!({
1845                "stream": stream,
1846                "data": payload,
1847            });
1848
1849            streams_base.on_message(msg.to_string(), conn.clone()).await;
1850            yield_now().await;
1851
1852            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1853        });
1854    }
1855
1856    #[test]
1857    fn all_mini_ticker_should_not_fire_after_unsubscribe() {
1858        TOKIO_SHARED_RT.block_on(async {
1859            let (streams_base, conn) = make_streams_base().await;
1860            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1861
1862            let id = "test-id-123".to_string();
1863
1864            let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();
1865
1866            let AllMiniTickerParams {
1867                id,
1868            } = params.clone();
1869
1870            let pairs: &[(&str, Option<String>)] = &[
1871                ("id",
1872                        id.clone()
1873                ),
1874            ];
1875
1876            let vars: HashMap<_, _> = pairs
1877                .iter()
1878                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1879                .collect();
1880            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
1881
1882            let ws_stream = api.all_mini_ticker(params).await.unwrap();
1883
1884            let called = Arc::new(AtomicBool::new(false));
1885            let called_clone = called.clone();
1886            ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
1887                called_clone.store(true, Ordering::SeqCst);
1888            });
1889
1890            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
1891
1892            ws_stream.unsubscribe().await;
1893
1894            let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
1895            let msg = json!({
1896                "stream": stream,
1897                "data": payload,
1898            });
1899
1900            streams_base.on_message(msg.to_string(), conn.clone()).await;
1901
1902            yield_now().await;
1903
1904            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
1905        });
1906    }
1907
1908    #[test]
1909    fn avg_price_should_execute_successfully() {
1910        TOKIO_SHARED_RT.block_on(async {
1911            let (streams_base, _) = make_streams_base().await;
1912            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1913
1914            let id = "test-id-123".to_string();
1915
1916            let params = AvgPriceParams::builder("bnbusdt".to_string())
1917                .id(Some(id.clone()))
1918                .build()
1919                .unwrap();
1920
1921            let AvgPriceParams { symbol, id } = params.clone();
1922
1923            let pairs: &[(&str, Option<String>)] =
1924                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
1925
1926            let vars: HashMap<_, _> = pairs
1927                .iter()
1928                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1929                .collect();
1930            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1931            let ws_stream = api
1932                .avg_price(params)
1933                .await
1934                .expect("avg_price should return a WebsocketStream");
1935
1936            assert!(
1937                streams_base.is_subscribed(&stream).await,
1938                "expected stream '{stream}' to be subscribed"
1939            );
1940            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
1941        });
1942    }
1943
1944    #[test]
1945    fn avg_price_should_handle_incoming_message() {
1946        TOKIO_SHARED_RT.block_on(async {
1947            let (streams_base, conn) = make_streams_base().await;
1948            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1949
1950            let id = "test-id-123".to_string();
1951
1952            let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
1953
1954            let AvgPriceParams {
1955                symbol,id,
1956            } = params.clone();
1957
1958            let pairs: &[(&str, Option<String>)] = &[
1959                ("symbol",
1960                        Some(symbol.clone())
1961                ),
1962                ("id",
1963                        id.clone()
1964                ),
1965            ];
1966
1967            let vars: HashMap<_, _> = pairs
1968                .iter()
1969                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
1970                .collect();
1971            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
1972
1973            let ws_stream = api.avg_price(params).await.unwrap();
1974
1975            let called = Arc::new(AtomicBool::new(false));
1976            let called_with_message = called.clone();
1977            ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
1978                called_with_message.store(true, Ordering::SeqCst);
1979            });
1980
1981            let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
1982            let msg = json!({
1983                "stream": stream,
1984                "data": payload,
1985            });
1986
1987            streams_base.on_message(msg.to_string(), conn.clone()).await;
1988            yield_now().await;
1989
1990            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
1991        });
1992    }
1993
1994    #[test]
1995    fn avg_price_should_not_fire_after_unsubscribe() {
1996        TOKIO_SHARED_RT.block_on(async {
1997            let (streams_base, conn) = make_streams_base().await;
1998            let api = WebSocketStreamsApiClient::new(streams_base.clone());
1999
2000            let id = "test-id-123".to_string();
2001
2002            let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2003
2004            let AvgPriceParams {
2005                symbol,id,
2006            } = params.clone();
2007
2008            let pairs: &[(&str, Option<String>)] = &[
2009                ("symbol",
2010                        Some(symbol.clone())
2011                ),
2012                ("id",
2013                        id.clone()
2014                ),
2015            ];
2016
2017            let vars: HashMap<_, _> = pairs
2018                .iter()
2019                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2020                .collect();
2021            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
2022
2023            let ws_stream = api.avg_price(params).await.unwrap();
2024
2025            let called = Arc::new(AtomicBool::new(false));
2026            let called_clone = called.clone();
2027            ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
2028                called_clone.store(true, Ordering::SeqCst);
2029            });
2030
2031            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2032
2033            ws_stream.unsubscribe().await;
2034
2035            let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
2036            let msg = json!({
2037                "stream": stream,
2038                "data": payload,
2039            });
2040
2041            streams_base.on_message(msg.to_string(), conn.clone()).await;
2042
2043            yield_now().await;
2044
2045            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2046        });
2047    }
2048
2049    #[test]
2050    fn block_trade_should_execute_successfully() {
2051        TOKIO_SHARED_RT.block_on(async {
2052            let (streams_base, _) = make_streams_base().await;
2053            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2054
2055            let id = "test-id-123".to_string();
2056
2057            let params = BlockTradeParams::builder("bnbusdt".to_string())
2058                .id(Some(id.clone()))
2059                .build()
2060                .unwrap();
2061
2062            let BlockTradeParams { symbol, id } = params.clone();
2063
2064            let pairs: &[(&str, Option<String>)] =
2065                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2066
2067            let vars: HashMap<_, _> = pairs
2068                .iter()
2069                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2070                .collect();
2071            let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2072            let ws_stream = api
2073                .block_trade(params)
2074                .await
2075                .expect("block_trade should return a WebsocketStream");
2076
2077            assert!(
2078                streams_base.is_subscribed(&stream).await,
2079                "expected stream '{stream}' to be subscribed"
2080            );
2081            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2082        });
2083    }
2084
2085    #[test]
2086    fn block_trade_should_handle_incoming_message() {
2087        TOKIO_SHARED_RT.block_on(async {
2088            let (streams_base, conn) = make_streams_base().await;
2089            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2090
2091            let id = "test-id-123".to_string();
2092
2093            let params = BlockTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2094
2095            let BlockTradeParams {
2096                symbol,id,
2097            } = params.clone();
2098
2099            let pairs: &[(&str, Option<String>)] = &[
2100                ("symbol",
2101                        Some(symbol.clone())
2102                ),
2103                ("id",
2104                        id.clone()
2105                ),
2106            ];
2107
2108            let vars: HashMap<_, _> = pairs
2109                .iter()
2110                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2111                .collect();
2112            let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2113
2114            let ws_stream = api.block_trade(params).await.unwrap();
2115
2116            let called = Arc::new(AtomicBool::new(false));
2117            let called_with_message = called.clone();
2118            ws_stream.on_message(move |_payload: models::BlockTradeResponse| {
2119                called_with_message.store(true, Ordering::SeqCst);
2120            });
2121
2122            let payload: Value = serde_json::from_str(r#"{"e":"blockTrade","E":1772506983582,"s":"BNBBTC","t":582,"p":"0.052","q":"5838","T":1772506983321,"m":true}"#).unwrap();
2123            let msg = json!({
2124                "stream": stream,
2125                "data": payload,
2126            });
2127
2128            streams_base.on_message(msg.to_string(), conn.clone()).await;
2129            yield_now().await;
2130
2131            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2132        });
2133    }
2134
2135    #[test]
2136    fn block_trade_should_not_fire_after_unsubscribe() {
2137        TOKIO_SHARED_RT.block_on(async {
2138            let (streams_base, conn) = make_streams_base().await;
2139            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2140
2141            let id = "test-id-123".to_string();
2142
2143            let params = BlockTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2144
2145            let BlockTradeParams {
2146                symbol,id,
2147            } = params.clone();
2148
2149            let pairs: &[(&str, Option<String>)] = &[
2150                ("symbol",
2151                        Some(symbol.clone())
2152                ),
2153                ("id",
2154                        id.clone()
2155                ),
2156            ];
2157
2158            let vars: HashMap<_, _> = pairs
2159                .iter()
2160                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2161                .collect();
2162            let stream = replace_websocket_streams_placeholders("/<symbol>@blockTrade", &vars);
2163
2164            let ws_stream = api.block_trade(params).await.unwrap();
2165
2166            let called = Arc::new(AtomicBool::new(false));
2167            let called_clone = called.clone();
2168            ws_stream.on_message(move |_payload: models::BlockTradeResponse| {
2169                called_clone.store(true, Ordering::SeqCst);
2170            });
2171
2172            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2173
2174            ws_stream.unsubscribe().await;
2175
2176            let payload: Value = serde_json::from_str(r#"{"e":"blockTrade","E":1772506983582,"s":"BNBBTC","t":582,"p":"0.052","q":"5838","T":1772506983321,"m":true}"#).unwrap();
2177            let msg = json!({
2178                "stream": stream,
2179                "data": payload,
2180            });
2181
2182            streams_base.on_message(msg.to_string(), conn.clone()).await;
2183
2184            yield_now().await;
2185
2186            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2187        });
2188    }
2189
2190    #[test]
2191    fn book_ticker_should_execute_successfully() {
2192        TOKIO_SHARED_RT.block_on(async {
2193            let (streams_base, _) = make_streams_base().await;
2194            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2195
2196            let id = "test-id-123".to_string();
2197
2198            let params = BookTickerParams::builder("bnbusdt".to_string())
2199                .id(Some(id.clone()))
2200                .build()
2201                .unwrap();
2202
2203            let BookTickerParams { symbol, id } = params.clone();
2204
2205            let pairs: &[(&str, Option<String>)] =
2206                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2207
2208            let vars: HashMap<_, _> = pairs
2209                .iter()
2210                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2211                .collect();
2212            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2213            let ws_stream = api
2214                .book_ticker(params)
2215                .await
2216                .expect("book_ticker should return a WebsocketStream");
2217
2218            assert!(
2219                streams_base.is_subscribed(&stream).await,
2220                "expected stream '{stream}' to be subscribed"
2221            );
2222            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2223        });
2224    }
2225
2226    #[test]
2227    fn book_ticker_should_handle_incoming_message() {
2228        TOKIO_SHARED_RT.block_on(async {
2229            let (streams_base, conn) = make_streams_base().await;
2230            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2231
2232            let id = "test-id-123".to_string();
2233
2234            let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2235
2236            let BookTickerParams {
2237                symbol,id,
2238            } = params.clone();
2239
2240            let pairs: &[(&str, Option<String>)] = &[
2241                ("symbol",
2242                        Some(symbol.clone())
2243                ),
2244                ("id",
2245                        id.clone()
2246                ),
2247            ];
2248
2249            let vars: HashMap<_, _> = pairs
2250                .iter()
2251                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2252                .collect();
2253            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2254
2255            let ws_stream = api.book_ticker(params).await.unwrap();
2256
2257            let called = Arc::new(AtomicBool::new(false));
2258            let called_with_message = called.clone();
2259            ws_stream.on_message(move |_payload: models::BookTickerResponse| {
2260                called_with_message.store(true, Ordering::SeqCst);
2261            });
2262
2263            let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
2264            let msg = json!({
2265                "stream": stream,
2266                "data": payload,
2267            });
2268
2269            streams_base.on_message(msg.to_string(), conn.clone()).await;
2270            yield_now().await;
2271
2272            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2273        });
2274    }
2275
2276    #[test]
2277    fn book_ticker_should_not_fire_after_unsubscribe() {
2278        TOKIO_SHARED_RT.block_on(async {
2279            let (streams_base, conn) = make_streams_base().await;
2280            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2281
2282            let id = "test-id-123".to_string();
2283
2284            let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2285
2286            let BookTickerParams {
2287                symbol,id,
2288            } = params.clone();
2289
2290            let pairs: &[(&str, Option<String>)] = &[
2291                ("symbol",
2292                        Some(symbol.clone())
2293                ),
2294                ("id",
2295                        id.clone()
2296                ),
2297            ];
2298
2299            let vars: HashMap<_, _> = pairs
2300                .iter()
2301                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2302                .collect();
2303            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
2304
2305            let ws_stream = api.book_ticker(params).await.unwrap();
2306
2307            let called = Arc::new(AtomicBool::new(false));
2308            let called_clone = called.clone();
2309            ws_stream.on_message(move |_payload: models::BookTickerResponse| {
2310                called_clone.store(true, Ordering::SeqCst);
2311            });
2312
2313            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2314
2315            ws_stream.unsubscribe().await;
2316
2317            let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
2318            let msg = json!({
2319                "stream": stream,
2320                "data": payload,
2321            });
2322
2323            streams_base.on_message(msg.to_string(), conn.clone()).await;
2324
2325            yield_now().await;
2326
2327            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2328        });
2329    }
2330
2331    #[test]
2332    fn diff_book_depth_should_execute_successfully() {
2333        TOKIO_SHARED_RT.block_on(async {
2334            let (streams_base, _) = make_streams_base().await;
2335            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2336
2337            let id = "test-id-123".to_string();
2338
2339            let params = DiffBookDepthParams::builder("bnbusdt".to_string())
2340                .id(Some(id.clone()))
2341                .build()
2342                .unwrap();
2343
2344            let DiffBookDepthParams {
2345                symbol,
2346                id,
2347                update_speed,
2348            } = params.clone();
2349
2350            let pairs: &[(&str, Option<String>)] = &[
2351                ("symbol", Some(symbol.clone())),
2352                ("id", id.clone()),
2353                ("updateSpeed", update_speed.clone()),
2354            ];
2355
2356            let vars: HashMap<_, _> = pairs
2357                .iter()
2358                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2359                .collect();
2360            let stream =
2361                replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2362            let ws_stream = api
2363                .diff_book_depth(params)
2364                .await
2365                .expect("diff_book_depth should return a WebsocketStream");
2366
2367            assert!(
2368                streams_base.is_subscribed(&stream).await,
2369                "expected stream '{stream}' to be subscribed"
2370            );
2371            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2372        });
2373    }
2374
2375    #[test]
2376    fn diff_book_depth_should_handle_incoming_message() {
2377        TOKIO_SHARED_RT.block_on(async {
2378            let (streams_base, conn) = make_streams_base().await;
2379            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2380
2381            let id = "test-id-123".to_string();
2382
2383            let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2384
2385            let DiffBookDepthParams {
2386                symbol,id,update_speed,
2387            } = params.clone();
2388
2389            let pairs: &[(&str, Option<String>)] = &[
2390                ("symbol",
2391                        Some(symbol.clone())
2392                ),
2393                ("id",
2394                        id.clone()
2395                ),
2396                ("updateSpeed",
2397                        update_speed.clone()
2398                ),
2399            ];
2400
2401            let vars: HashMap<_, _> = pairs
2402                .iter()
2403                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2404                .collect();
2405            let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2406
2407            let ws_stream = api.diff_book_depth(params).await.unwrap();
2408
2409            let called = Arc::new(AtomicBool::new(false));
2410            let called_with_message = called.clone();
2411            ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
2412                called_with_message.store(true, Ordering::SeqCst);
2413            });
2414
2415            let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
2416            let msg = json!({
2417                "stream": stream,
2418                "data": payload,
2419            });
2420
2421            streams_base.on_message(msg.to_string(), conn.clone()).await;
2422            yield_now().await;
2423
2424            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2425        });
2426    }
2427
2428    #[test]
2429    fn diff_book_depth_should_not_fire_after_unsubscribe() {
2430        TOKIO_SHARED_RT.block_on(async {
2431            let (streams_base, conn) = make_streams_base().await;
2432            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2433
2434            let id = "test-id-123".to_string();
2435
2436            let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2437
2438            let DiffBookDepthParams {
2439                symbol,id,update_speed,
2440            } = params.clone();
2441
2442            let pairs: &[(&str, Option<String>)] = &[
2443                ("symbol",
2444                        Some(symbol.clone())
2445                ),
2446                ("id",
2447                        id.clone()
2448                ),
2449                ("updateSpeed",
2450                        update_speed.clone()
2451                ),
2452            ];
2453
2454            let vars: HashMap<_, _> = pairs
2455                .iter()
2456                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2457                .collect();
2458            let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
2459
2460            let ws_stream = api.diff_book_depth(params).await.unwrap();
2461
2462            let called = Arc::new(AtomicBool::new(false));
2463            let called_clone = called.clone();
2464            ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
2465                called_clone.store(true, Ordering::SeqCst);
2466            });
2467
2468            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2469
2470            ws_stream.unsubscribe().await;
2471
2472            let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
2473            let msg = json!({
2474                "stream": stream,
2475                "data": payload,
2476            });
2477
2478            streams_base.on_message(msg.to_string(), conn.clone()).await;
2479
2480            yield_now().await;
2481
2482            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2483        });
2484    }
2485
2486    #[test]
2487    fn kline_should_execute_successfully() {
2488        TOKIO_SHARED_RT.block_on(async {
2489            let (streams_base, _) = make_streams_base().await;
2490            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2491
2492            let id = "test-id-123".to_string();
2493
2494            let params = KlineParams::builder("bnbusdt".to_string(), KlineIntervalEnum::Interval1s)
2495                .id(Some(id.clone()))
2496                .build()
2497                .unwrap();
2498
2499            let KlineParams {
2500                symbol,
2501                interval,
2502                id,
2503            } = params.clone();
2504
2505            let pairs: &[(&str, Option<String>)] = &[
2506                ("symbol", Some(symbol.clone())),
2507                ("interval", Some(interval.as_str().to_string())),
2508                ("id", id.clone()),
2509            ];
2510
2511            let vars: HashMap<_, _> = pairs
2512                .iter()
2513                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2514                .collect();
2515            let stream =
2516                replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2517            let ws_stream = api
2518                .kline(params)
2519                .await
2520                .expect("kline should return a WebsocketStream");
2521
2522            assert!(
2523                streams_base.is_subscribed(&stream).await,
2524                "expected stream '{stream}' to be subscribed"
2525            );
2526            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2527        });
2528    }
2529
2530    #[test]
2531    fn kline_should_handle_incoming_message() {
2532        TOKIO_SHARED_RT.block_on(async {
2533            let (streams_base, conn) = make_streams_base().await;
2534            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2535
2536            let id = "test-id-123".to_string();
2537
2538            let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2539
2540            let KlineParams {
2541                symbol,interval,id,
2542            } = params.clone();
2543
2544            let pairs: &[(&str, Option<String>)] = &[
2545                ("symbol",
2546                        Some(symbol.clone())
2547                ),
2548                ("interval",
2549                        Some(interval.as_str().to_string())
2550                ),
2551                ("id",
2552                        id.clone()
2553                ),
2554            ];
2555
2556            let vars: HashMap<_, _> = pairs
2557                .iter()
2558                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2559                .collect();
2560            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2561
2562            let ws_stream = api.kline(params).await.unwrap();
2563
2564            let called = Arc::new(AtomicBool::new(false));
2565            let called_with_message = called.clone();
2566            ws_stream.on_message(move |_payload: models::KlineResponse| {
2567                called_with_message.store(true, Ordering::SeqCst);
2568            });
2569
2570            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2571            let msg = json!({
2572                "stream": stream,
2573                "data": payload,
2574            });
2575
2576            streams_base.on_message(msg.to_string(), conn.clone()).await;
2577            yield_now().await;
2578
2579            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2580        });
2581    }
2582
2583    #[test]
2584    fn kline_should_not_fire_after_unsubscribe() {
2585        TOKIO_SHARED_RT.block_on(async {
2586            let (streams_base, conn) = make_streams_base().await;
2587            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2588
2589            let id = "test-id-123".to_string();
2590
2591            let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2592
2593            let KlineParams {
2594                symbol,interval,id,
2595            } = params.clone();
2596
2597            let pairs: &[(&str, Option<String>)] = &[
2598                ("symbol",
2599                        Some(symbol.clone())
2600                ),
2601                ("interval",
2602                        Some(interval.as_str().to_string())
2603                ),
2604                ("id",
2605                        id.clone()
2606                ),
2607            ];
2608
2609            let vars: HashMap<_, _> = pairs
2610                .iter()
2611                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2612                .collect();
2613            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
2614
2615            let ws_stream = api.kline(params).await.unwrap();
2616
2617            let called = Arc::new(AtomicBool::new(false));
2618            let called_clone = called.clone();
2619            ws_stream.on_message(move |_payload: models::KlineResponse| {
2620                called_clone.store(true, Ordering::SeqCst);
2621            });
2622
2623            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2624
2625            ws_stream.unsubscribe().await;
2626
2627            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2628            let msg = json!({
2629                "stream": stream,
2630                "data": payload,
2631            });
2632
2633            streams_base.on_message(msg.to_string(), conn.clone()).await;
2634
2635            yield_now().await;
2636
2637            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2638        });
2639    }
2640
2641    #[test]
2642    fn kline_offset_should_execute_successfully() {
2643        TOKIO_SHARED_RT.block_on(async {
2644            let (streams_base, _) = make_streams_base().await;
2645            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2646
2647            let id = "test-id-123".to_string();
2648
2649            let params = KlineOffsetParams::builder(
2650                "bnbusdt".to_string(),
2651                KlineOffsetIntervalEnum::Interval1s,
2652            )
2653            .id(Some(id.clone()))
2654            .build()
2655            .unwrap();
2656
2657            let KlineOffsetParams {
2658                symbol,
2659                interval,
2660                id,
2661            } = params.clone();
2662
2663            let pairs: &[(&str, Option<String>)] = &[
2664                ("symbol", Some(symbol.clone())),
2665                ("interval", Some(interval.as_str().to_string())),
2666                ("id", id.clone()),
2667            ];
2668
2669            let vars: HashMap<_, _> = pairs
2670                .iter()
2671                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2672                .collect();
2673            let stream =
2674                replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2675            let ws_stream = api
2676                .kline_offset(params)
2677                .await
2678                .expect("kline_offset should return a WebsocketStream");
2679
2680            assert!(
2681                streams_base.is_subscribed(&stream).await,
2682                "expected stream '{stream}' to be subscribed"
2683            );
2684            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2685        });
2686    }
2687
2688    #[test]
2689    fn kline_offset_should_handle_incoming_message() {
2690        TOKIO_SHARED_RT.block_on(async {
2691            let (streams_base, conn) = make_streams_base().await;
2692            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2693
2694            let id = "test-id-123".to_string();
2695
2696            let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2697
2698            let KlineOffsetParams {
2699                symbol,interval,id,
2700            } = params.clone();
2701
2702            let pairs: &[(&str, Option<String>)] = &[
2703                ("symbol",
2704                        Some(symbol.clone())
2705                ),
2706                ("interval",
2707                        Some(interval.as_str().to_string())
2708                ),
2709                ("id",
2710                        id.clone()
2711                ),
2712            ];
2713
2714            let vars: HashMap<_, _> = pairs
2715                .iter()
2716                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2717                .collect();
2718            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2719
2720            let ws_stream = api.kline_offset(params).await.unwrap();
2721
2722            let called = Arc::new(AtomicBool::new(false));
2723            let called_with_message = called.clone();
2724            ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
2725                called_with_message.store(true, Ordering::SeqCst);
2726            });
2727
2728            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2729            let msg = json!({
2730                "stream": stream,
2731                "data": payload,
2732            });
2733
2734            streams_base.on_message(msg.to_string(), conn.clone()).await;
2735            yield_now().await;
2736
2737            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2738        });
2739    }
2740
2741    #[test]
2742    fn kline_offset_should_not_fire_after_unsubscribe() {
2743        TOKIO_SHARED_RT.block_on(async {
2744            let (streams_base, conn) = make_streams_base().await;
2745            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2746
2747            let id = "test-id-123".to_string();
2748
2749            let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();
2750
2751            let KlineOffsetParams {
2752                symbol,interval,id,
2753            } = params.clone();
2754
2755            let pairs: &[(&str, Option<String>)] = &[
2756                ("symbol",
2757                        Some(symbol.clone())
2758                ),
2759                ("interval",
2760                        Some(interval.as_str().to_string())
2761                ),
2762                ("id",
2763                        id.clone()
2764                ),
2765            ];
2766
2767            let vars: HashMap<_, _> = pairs
2768                .iter()
2769                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2770                .collect();
2771            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
2772
2773            let ws_stream = api.kline_offset(params).await.unwrap();
2774
2775            let called = Arc::new(AtomicBool::new(false));
2776            let called_clone = called.clone();
2777            ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
2778                called_clone.store(true, Ordering::SeqCst);
2779            });
2780
2781            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2782
2783            ws_stream.unsubscribe().await;
2784
2785            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
2786            let msg = json!({
2787                "stream": stream,
2788                "data": payload,
2789            });
2790
2791            streams_base.on_message(msg.to_string(), conn.clone()).await;
2792
2793            yield_now().await;
2794
2795            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2796        });
2797    }
2798
2799    #[test]
2800    fn mini_ticker_should_execute_successfully() {
2801        TOKIO_SHARED_RT.block_on(async {
2802            let (streams_base, _) = make_streams_base().await;
2803            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2804
2805            let id = "test-id-123".to_string();
2806
2807            let params = MiniTickerParams::builder("bnbusdt".to_string())
2808                .id(Some(id.clone()))
2809                .build()
2810                .unwrap();
2811
2812            let MiniTickerParams { symbol, id } = params.clone();
2813
2814            let pairs: &[(&str, Option<String>)] =
2815                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
2816
2817            let vars: HashMap<_, _> = pairs
2818                .iter()
2819                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2820                .collect();
2821            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2822            let ws_stream = api
2823                .mini_ticker(params)
2824                .await
2825                .expect("mini_ticker should return a WebsocketStream");
2826
2827            assert!(
2828                streams_base.is_subscribed(&stream).await,
2829                "expected stream '{stream}' to be subscribed"
2830            );
2831            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2832        });
2833    }
2834
2835    #[test]
2836    fn mini_ticker_should_handle_incoming_message() {
2837        TOKIO_SHARED_RT.block_on(async {
2838            let (streams_base, conn) = make_streams_base().await;
2839            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2840
2841            let id = "test-id-123".to_string();
2842
2843            let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2844
2845            let MiniTickerParams {
2846                symbol,id,
2847            } = params.clone();
2848
2849            let pairs: &[(&str, Option<String>)] = &[
2850                ("symbol",
2851                        Some(symbol.clone())
2852                ),
2853                ("id",
2854                        id.clone()
2855                ),
2856            ];
2857
2858            let vars: HashMap<_, _> = pairs
2859                .iter()
2860                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2861                .collect();
2862            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2863
2864            let ws_stream = api.mini_ticker(params).await.unwrap();
2865
2866            let called = Arc::new(AtomicBool::new(false));
2867            let called_with_message = called.clone();
2868            ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
2869                called_with_message.store(true, Ordering::SeqCst);
2870            });
2871
2872            let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
2873            let msg = json!({
2874                "stream": stream,
2875                "data": payload,
2876            });
2877
2878            streams_base.on_message(msg.to_string(), conn.clone()).await;
2879            yield_now().await;
2880
2881            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
2882        });
2883    }
2884
2885    #[test]
2886    fn mini_ticker_should_not_fire_after_unsubscribe() {
2887        TOKIO_SHARED_RT.block_on(async {
2888            let (streams_base, conn) = make_streams_base().await;
2889            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2890
2891            let id = "test-id-123".to_string();
2892
2893            let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
2894
2895            let MiniTickerParams {
2896                symbol,id,
2897            } = params.clone();
2898
2899            let pairs: &[(&str, Option<String>)] = &[
2900                ("symbol",
2901                        Some(symbol.clone())
2902                ),
2903                ("id",
2904                        id.clone()
2905                ),
2906            ];
2907
2908            let vars: HashMap<_, _> = pairs
2909                .iter()
2910                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2911                .collect();
2912            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
2913
2914            let ws_stream = api.mini_ticker(params).await.unwrap();
2915
2916            let called = Arc::new(AtomicBool::new(false));
2917            let called_clone = called.clone();
2918            ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
2919                called_clone.store(true, Ordering::SeqCst);
2920            });
2921
2922            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
2923
2924            ws_stream.unsubscribe().await;
2925
2926            let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
2927            let msg = json!({
2928                "stream": stream,
2929                "data": payload,
2930            });
2931
2932            streams_base.on_message(msg.to_string(), conn.clone()).await;
2933
2934            yield_now().await;
2935
2936            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
2937        });
2938    }
2939
2940    #[test]
2941    fn partial_book_depth_should_execute_successfully() {
2942        TOKIO_SHARED_RT.block_on(async {
2943            let (streams_base, _) = make_streams_base().await;
2944            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2945
2946            let id = "test-id-123".to_string();
2947
2948            let params = PartialBookDepthParams::builder(
2949                "bnbusdt".to_string(),
2950                PartialBookDepthLevelsEnum::Levels5,
2951            )
2952            .id(Some(id.clone()))
2953            .build()
2954            .unwrap();
2955
2956            let PartialBookDepthParams {
2957                symbol,
2958                levels,
2959                id,
2960                update_speed,
2961            } = params.clone();
2962
2963            let pairs: &[(&str, Option<String>)] = &[
2964                ("symbol", Some(symbol.clone())),
2965                ("levels", Some(levels.as_str().to_string())),
2966                ("id", id.clone()),
2967                ("updateSpeed", update_speed.clone()),
2968            ];
2969
2970            let vars: HashMap<_, _> = pairs
2971                .iter()
2972                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
2973                .collect();
2974            let stream = replace_websocket_streams_placeholders(
2975                "/<symbol>@depth<levels>@<updateSpeed>",
2976                &vars,
2977            );
2978            let ws_stream = api
2979                .partial_book_depth(params)
2980                .await
2981                .expect("partial_book_depth should return a WebsocketStream");
2982
2983            assert!(
2984                streams_base.is_subscribed(&stream).await,
2985                "expected stream '{stream}' to be subscribed"
2986            );
2987            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
2988        });
2989    }
2990
2991    #[test]
2992    fn partial_book_depth_should_handle_incoming_message() {
2993        TOKIO_SHARED_RT.block_on(async {
2994            let (streams_base, conn) = make_streams_base().await;
2995            let api = WebSocketStreamsApiClient::new(streams_base.clone());
2996
2997            let id = "test-id-123".to_string();
2998
2999            let params = PartialBookDepthParams::builder(
3000                "bnbusdt".to_string(),
3001                PartialBookDepthLevelsEnum::Levels5,
3002            )
3003            .id(Some(id.clone()))
3004            .build()
3005            .unwrap();
3006
3007            let PartialBookDepthParams {
3008                symbol,
3009                levels,
3010                id,
3011                update_speed,
3012            } = params.clone();
3013
3014            let pairs: &[(&str, Option<String>)] = &[
3015                ("symbol", Some(symbol.clone())),
3016                ("levels", Some(levels.as_str().to_string())),
3017                ("id", id.clone()),
3018                ("updateSpeed", update_speed.clone()),
3019            ];
3020
3021            let vars: HashMap<_, _> = pairs
3022                .iter()
3023                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3024                .collect();
3025            let stream = replace_websocket_streams_placeholders(
3026                "/<symbol>@depth<levels>@<updateSpeed>",
3027                &vars,
3028            );
3029
3030            let ws_stream = api.partial_book_depth(params).await.unwrap();
3031
3032            let called = Arc::new(AtomicBool::new(false));
3033            let called_with_message = called.clone();
3034            ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
3035                called_with_message.store(true, Ordering::SeqCst);
3036            });
3037
3038            let payload: Value = serde_json::from_str(
3039                r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
3040            )
3041            .unwrap();
3042            let msg = json!({
3043                "stream": stream,
3044                "data": payload,
3045            });
3046
3047            streams_base.on_message(msg.to_string(), conn.clone()).await;
3048            yield_now().await;
3049
3050            assert!(
3051                called.load(Ordering::SeqCst),
3052                "expected our callback to have been invoked"
3053            );
3054        });
3055    }
3056
3057    #[test]
3058    fn partial_book_depth_should_not_fire_after_unsubscribe() {
3059        TOKIO_SHARED_RT.block_on(async {
3060            let (streams_base, conn) = make_streams_base().await;
3061            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3062
3063            let id = "test-id-123".to_string();
3064
3065            let params = PartialBookDepthParams::builder(
3066                "bnbusdt".to_string(),
3067                PartialBookDepthLevelsEnum::Levels5,
3068            )
3069            .id(Some(id.clone()))
3070            .build()
3071            .unwrap();
3072
3073            let PartialBookDepthParams {
3074                symbol,
3075                levels,
3076                id,
3077                update_speed,
3078            } = params.clone();
3079
3080            let pairs: &[(&str, Option<String>)] = &[
3081                ("symbol", Some(symbol.clone())),
3082                ("levels", Some(levels.as_str().to_string())),
3083                ("id", id.clone()),
3084                ("updateSpeed", update_speed.clone()),
3085            ];
3086
3087            let vars: HashMap<_, _> = pairs
3088                .iter()
3089                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3090                .collect();
3091            let stream = replace_websocket_streams_placeholders(
3092                "/<symbol>@depth<levels>@<updateSpeed>",
3093                &vars,
3094            );
3095
3096            let ws_stream = api.partial_book_depth(params).await.unwrap();
3097
3098            let called = Arc::new(AtomicBool::new(false));
3099            let called_clone = called.clone();
3100            ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
3101                called_clone.store(true, Ordering::SeqCst);
3102            });
3103
3104            assert!(
3105                streams_base.is_subscribed(&stream).await,
3106                "should be subscribed before unsubscribe"
3107            );
3108
3109            ws_stream.unsubscribe().await;
3110
3111            let payload: Value = serde_json::from_str(
3112                r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
3113            )
3114            .unwrap();
3115            let msg = json!({
3116                "stream": stream,
3117                "data": payload,
3118            });
3119
3120            streams_base.on_message(msg.to_string(), conn.clone()).await;
3121
3122            yield_now().await;
3123
3124            assert!(
3125                !called.load(Ordering::SeqCst),
3126                "callback should not be invoked after unsubscribe"
3127            );
3128        });
3129    }
3130
3131    #[test]
3132    fn reference_price_should_execute_successfully() {
3133        TOKIO_SHARED_RT.block_on(async {
3134            let (streams_base, _) = make_streams_base().await;
3135            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3136
3137            let id = "test-id-123".to_string();
3138
3139            let params = ReferencePriceParams::builder("bnbusdt".to_string())
3140                .id(Some(id.clone()))
3141                .build()
3142                .unwrap();
3143
3144            let ReferencePriceParams { symbol, id } = params.clone();
3145
3146            let pairs: &[(&str, Option<String>)] =
3147                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3148
3149            let vars: HashMap<_, _> = pairs
3150                .iter()
3151                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3152                .collect();
3153            let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3154            let ws_stream = api
3155                .reference_price(params)
3156                .await
3157                .expect("reference_price should return a WebsocketStream");
3158
3159            assert!(
3160                streams_base.is_subscribed(&stream).await,
3161                "expected stream '{stream}' to be subscribed"
3162            );
3163            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3164        });
3165    }
3166
3167    #[test]
3168    fn reference_price_should_handle_incoming_message() {
3169        TOKIO_SHARED_RT.block_on(async {
3170            let (streams_base, conn) = make_streams_base().await;
3171            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3172
3173            let id = "test-id-123".to_string();
3174
3175            let params = ReferencePriceParams::builder("bnbusdt".to_string())
3176                .id(Some(id.clone()))
3177                .build()
3178                .unwrap();
3179
3180            let ReferencePriceParams { symbol, id } = params.clone();
3181
3182            let pairs: &[(&str, Option<String>)] =
3183                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3184
3185            let vars: HashMap<_, _> = pairs
3186                .iter()
3187                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3188                .collect();
3189            let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3190
3191            let ws_stream = api.reference_price(params).await.unwrap();
3192
3193            let called = Arc::new(AtomicBool::new(false));
3194            let called_with_message = called.clone();
3195            ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
3196                called_with_message.store(true, Ordering::SeqCst);
3197            });
3198
3199            let payload: Value = serde_json::from_str(
3200                r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
3201            )
3202            .unwrap();
3203            let msg = json!({
3204                "stream": stream,
3205                "data": payload,
3206            });
3207
3208            streams_base.on_message(msg.to_string(), conn.clone()).await;
3209            yield_now().await;
3210
3211            assert!(
3212                called.load(Ordering::SeqCst),
3213                "expected our callback to have been invoked"
3214            );
3215        });
3216    }
3217
3218    #[test]
3219    fn reference_price_should_not_fire_after_unsubscribe() {
3220        TOKIO_SHARED_RT.block_on(async {
3221            let (streams_base, conn) = make_streams_base().await;
3222            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3223
3224            let id = "test-id-123".to_string();
3225
3226            let params = ReferencePriceParams::builder("bnbusdt".to_string())
3227                .id(Some(id.clone()))
3228                .build()
3229                .unwrap();
3230
3231            let ReferencePriceParams { symbol, id } = params.clone();
3232
3233            let pairs: &[(&str, Option<String>)] =
3234                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3235
3236            let vars: HashMap<_, _> = pairs
3237                .iter()
3238                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3239                .collect();
3240            let stream = replace_websocket_streams_placeholders("/<symbol>@referencePrice", &vars);
3241
3242            let ws_stream = api.reference_price(params).await.unwrap();
3243
3244            let called = Arc::new(AtomicBool::new(false));
3245            let called_clone = called.clone();
3246            ws_stream.on_message(move |_payload: models::ReferencePriceResponse| {
3247                called_clone.store(true, Ordering::SeqCst);
3248            });
3249
3250            assert!(
3251                streams_base.is_subscribed(&stream).await,
3252                "should be subscribed before unsubscribe"
3253            );
3254
3255            ws_stream.unsubscribe().await;
3256
3257            let payload: Value = serde_json::from_str(
3258                r#"{"e":"referencePrice","s":"BAZUSD","r":"1.00","t":1770313263917}"#,
3259            )
3260            .unwrap();
3261            let msg = json!({
3262                "stream": stream,
3263                "data": payload,
3264            });
3265
3266            streams_base.on_message(msg.to_string(), conn.clone()).await;
3267
3268            yield_now().await;
3269
3270            assert!(
3271                !called.load(Ordering::SeqCst),
3272                "callback should not be invoked after unsubscribe"
3273            );
3274        });
3275    }
3276
3277    #[test]
3278    fn rolling_window_ticker_should_execute_successfully() {
3279        TOKIO_SHARED_RT.block_on(async {
3280            let (streams_base, _) = make_streams_base().await;
3281            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3282
3283            let id = "test-id-123".to_string();
3284
3285            let params = RollingWindowTickerParams::builder(
3286                "bnbusdt".to_string(),
3287                RollingWindowTickerWindowSizeEnum::WindowSize1h,
3288            )
3289            .id(Some(id.clone()))
3290            .build()
3291            .unwrap();
3292
3293            let RollingWindowTickerParams {
3294                symbol,
3295                window_size,
3296                id,
3297            } = params.clone();
3298
3299            let pairs: &[(&str, Option<String>)] = &[
3300                ("symbol", Some(symbol.clone())),
3301                ("windowSize", Some(window_size.as_str().to_string())),
3302                ("id", id.clone()),
3303            ];
3304
3305            let vars: HashMap<_, _> = pairs
3306                .iter()
3307                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3308                .collect();
3309            let stream =
3310                replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3311            let ws_stream = api
3312                .rolling_window_ticker(params)
3313                .await
3314                .expect("rolling_window_ticker should return a WebsocketStream");
3315
3316            assert!(
3317                streams_base.is_subscribed(&stream).await,
3318                "expected stream '{stream}' to be subscribed"
3319            );
3320            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3321        });
3322    }
3323
3324    #[test]
3325    fn rolling_window_ticker_should_handle_incoming_message() {
3326        TOKIO_SHARED_RT.block_on(async {
3327            let (streams_base, conn) = make_streams_base().await;
3328            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3329
3330            let id = "test-id-123".to_string();
3331
3332            let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
3333
3334            let RollingWindowTickerParams {
3335                symbol,window_size,id,
3336            } = params.clone();
3337
3338            let pairs: &[(&str, Option<String>)] = &[
3339                ("symbol",
3340                        Some(symbol.clone())
3341                ),
3342                ("windowSize",
3343                        Some(window_size.as_str().to_string())
3344                ),
3345                ("id",
3346                        id.clone()
3347                ),
3348            ];
3349
3350            let vars: HashMap<_, _> = pairs
3351                .iter()
3352                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3353                .collect();
3354            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3355
3356            let ws_stream = api.rolling_window_ticker(params).await.unwrap();
3357
3358            let called = Arc::new(AtomicBool::new(false));
3359            let called_with_message = called.clone();
3360            ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
3361                called_with_message.store(true, Ordering::SeqCst);
3362            });
3363
3364            let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
3365            let msg = json!({
3366                "stream": stream,
3367                "data": payload,
3368            });
3369
3370            streams_base.on_message(msg.to_string(), conn.clone()).await;
3371            yield_now().await;
3372
3373            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3374        });
3375    }
3376
3377    #[test]
3378    fn rolling_window_ticker_should_not_fire_after_unsubscribe() {
3379        TOKIO_SHARED_RT.block_on(async {
3380            let (streams_base, conn) = make_streams_base().await;
3381            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3382
3383            let id = "test-id-123".to_string();
3384
3385            let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();
3386
3387            let RollingWindowTickerParams {
3388                symbol,window_size,id,
3389            } = params.clone();
3390
3391            let pairs: &[(&str, Option<String>)] = &[
3392                ("symbol",
3393                        Some(symbol.clone())
3394                ),
3395                ("windowSize",
3396                        Some(window_size.as_str().to_string())
3397                ),
3398                ("id",
3399                        id.clone()
3400                ),
3401            ];
3402
3403            let vars: HashMap<_, _> = pairs
3404                .iter()
3405                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3406                .collect();
3407            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
3408
3409            let ws_stream = api.rolling_window_ticker(params).await.unwrap();
3410
3411            let called = Arc::new(AtomicBool::new(false));
3412            let called_clone = called.clone();
3413            ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
3414                called_clone.store(true, Ordering::SeqCst);
3415            });
3416
3417            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3418
3419            ws_stream.unsubscribe().await;
3420
3421            let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
3422            let msg = json!({
3423                "stream": stream,
3424                "data": payload,
3425            });
3426
3427            streams_base.on_message(msg.to_string(), conn.clone()).await;
3428
3429            yield_now().await;
3430
3431            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3432        });
3433    }
3434
3435    #[test]
3436    fn ticker_should_execute_successfully() {
3437        TOKIO_SHARED_RT.block_on(async {
3438            let (streams_base, _) = make_streams_base().await;
3439            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3440
3441            let id = "test-id-123".to_string();
3442
3443            let params = TickerParams::builder("bnbusdt".to_string())
3444                .id(Some(id.clone()))
3445                .build()
3446                .unwrap();
3447
3448            let TickerParams { symbol, id } = params.clone();
3449
3450            let pairs: &[(&str, Option<String>)] =
3451                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3452
3453            let vars: HashMap<_, _> = pairs
3454                .iter()
3455                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3456                .collect();
3457            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3458            let ws_stream = api
3459                .ticker(params)
3460                .await
3461                .expect("ticker should return a WebsocketStream");
3462
3463            assert!(
3464                streams_base.is_subscribed(&stream).await,
3465                "expected stream '{stream}' to be subscribed"
3466            );
3467            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3468        });
3469    }
3470
3471    #[test]
3472    fn ticker_should_handle_incoming_message() {
3473        TOKIO_SHARED_RT.block_on(async {
3474            let (streams_base, conn) = make_streams_base().await;
3475            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3476
3477            let id = "test-id-123".to_string();
3478
3479            let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3480
3481            let TickerParams {
3482                symbol,id,
3483            } = params.clone();
3484
3485            let pairs: &[(&str, Option<String>)] = &[
3486                ("symbol",
3487                        Some(symbol.clone())
3488                ),
3489                ("id",
3490                        id.clone()
3491                ),
3492            ];
3493
3494            let vars: HashMap<_, _> = pairs
3495                .iter()
3496                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3497                .collect();
3498            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3499
3500            let ws_stream = api.ticker(params).await.unwrap();
3501
3502            let called = Arc::new(AtomicBool::new(false));
3503            let called_with_message = called.clone();
3504            ws_stream.on_message(move |_payload: models::TickerResponse| {
3505                called_with_message.store(true, Ordering::SeqCst);
3506            });
3507
3508            let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
3509            let msg = json!({
3510                "stream": stream,
3511                "data": payload,
3512            });
3513
3514            streams_base.on_message(msg.to_string(), conn.clone()).await;
3515            yield_now().await;
3516
3517            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3518        });
3519    }
3520
3521    #[test]
3522    fn ticker_should_not_fire_after_unsubscribe() {
3523        TOKIO_SHARED_RT.block_on(async {
3524            let (streams_base, conn) = make_streams_base().await;
3525            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3526
3527            let id = "test-id-123".to_string();
3528
3529            let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3530
3531            let TickerParams {
3532                symbol,id,
3533            } = params.clone();
3534
3535            let pairs: &[(&str, Option<String>)] = &[
3536                ("symbol",
3537                        Some(symbol.clone())
3538                ),
3539                ("id",
3540                        id.clone()
3541                ),
3542            ];
3543
3544            let vars: HashMap<_, _> = pairs
3545                .iter()
3546                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3547                .collect();
3548            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
3549
3550            let ws_stream = api.ticker(params).await.unwrap();
3551
3552            let called = Arc::new(AtomicBool::new(false));
3553            let called_clone = called.clone();
3554            ws_stream.on_message(move |_payload: models::TickerResponse| {
3555                called_clone.store(true, Ordering::SeqCst);
3556            });
3557
3558            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3559
3560            ws_stream.unsubscribe().await;
3561
3562            let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
3563            let msg = json!({
3564                "stream": stream,
3565                "data": payload,
3566            });
3567
3568            streams_base.on_message(msg.to_string(), conn.clone()).await;
3569
3570            yield_now().await;
3571
3572            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3573        });
3574    }
3575
3576    #[test]
3577    fn trade_should_execute_successfully() {
3578        TOKIO_SHARED_RT.block_on(async {
3579            let (streams_base, _) = make_streams_base().await;
3580            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3581
3582            let id = "test-id-123".to_string();
3583
3584            let params = TradeParams::builder("bnbusdt".to_string())
3585                .id(Some(id.clone()))
3586                .build()
3587                .unwrap();
3588
3589            let TradeParams { symbol, id } = params.clone();
3590
3591            let pairs: &[(&str, Option<String>)] =
3592                &[("symbol", Some(symbol.clone())), ("id", id.clone())];
3593
3594            let vars: HashMap<_, _> = pairs
3595                .iter()
3596                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3597                .collect();
3598            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3599            let ws_stream = api
3600                .trade(params)
3601                .await
3602                .expect("trade should return a WebsocketStream");
3603
3604            assert!(
3605                streams_base.is_subscribed(&stream).await,
3606                "expected stream '{stream}' to be subscribed"
3607            );
3608            assert_eq!(ws_stream.id, Some(StreamId::Str("test-id-123".to_string())));
3609        });
3610    }
3611
3612    #[test]
3613    fn trade_should_handle_incoming_message() {
3614        TOKIO_SHARED_RT.block_on(async {
3615            let (streams_base, conn) = make_streams_base().await;
3616            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3617
3618            let id = "test-id-123".to_string();
3619
3620            let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3621
3622            let TradeParams {
3623                symbol,id,
3624            } = params.clone();
3625
3626            let pairs: &[(&str, Option<String>)] = &[
3627                ("symbol",
3628                        Some(symbol.clone())
3629                ),
3630                ("id",
3631                        id.clone()
3632                ),
3633            ];
3634
3635            let vars: HashMap<_, _> = pairs
3636                .iter()
3637                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3638                .collect();
3639            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3640
3641            let ws_stream = api.trade(params).await.unwrap();
3642
3643            let called = Arc::new(AtomicBool::new(false));
3644            let called_with_message = called.clone();
3645            ws_stream.on_message(move |_payload: models::TradeResponse| {
3646                called_with_message.store(true, Ordering::SeqCst);
3647            });
3648
3649            let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
3650            let msg = json!({
3651                "stream": stream,
3652                "data": payload,
3653            });
3654
3655            streams_base.on_message(msg.to_string(), conn.clone()).await;
3656            yield_now().await;
3657
3658            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
3659        });
3660    }
3661
3662    #[test]
3663    fn trade_should_not_fire_after_unsubscribe() {
3664        TOKIO_SHARED_RT.block_on(async {
3665            let (streams_base, conn) = make_streams_base().await;
3666            let api = WebSocketStreamsApiClient::new(streams_base.clone());
3667
3668            let id = "test-id-123".to_string();
3669
3670            let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();
3671
3672            let TradeParams {
3673                symbol,id,
3674            } = params.clone();
3675
3676            let pairs: &[(&str, Option<String>)] = &[
3677                ("symbol",
3678                        Some(symbol.clone())
3679                ),
3680                ("id",
3681                        id.clone()
3682                ),
3683            ];
3684
3685            let vars: HashMap<_, _> = pairs
3686                .iter()
3687                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
3688                .collect();
3689            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
3690
3691            let ws_stream = api.trade(params).await.unwrap();
3692
3693            let called = Arc::new(AtomicBool::new(false));
3694            let called_clone = called.clone();
3695            ws_stream.on_message(move |_payload: models::TradeResponse| {
3696                called_clone.store(true, Ordering::SeqCst);
3697            });
3698
3699            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");
3700
3701            ws_stream.unsubscribe().await;
3702
3703            let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
3704            let msg = json!({
3705                "stream": stream,
3706                "data": payload,
3707            });
3708
3709            streams_base.on_message(msg.to_string(), conn.clone()).await;
3710
3711            yield_now().await;
3712
3713            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
3714        });
3715    }
3716}