Skip to main content

binance_sdk/spot/websocket_streams/
mod.rs

1/*
2 * Binance Spot WebSocket Streams
3 *
4 * OpenAPI Specifications for the Binance Spot WebSocket Streams
5 *
6 * API documents:
7 * - [Github web-socket-streams documentation file](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md)
8 * - [General API information for web-socket-streams on website](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams)
9 *
10 *
11 * The version of the OpenAPI document: 1.0.0
12 *
13 *
14 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
15 * https://openapi-generator.tech
16 * Do not edit the class manually.
17 */
18
19#![allow(unused_imports)]
20use serde_json::Value;
21use std::sync::{Arc, atomic::Ordering};
22use tokio::spawn;
23
24use crate::common::config::ConfigurationWebsocketStreams;
25use crate::common::websocket::{
26    Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
27    create_stream_handler,
28};
29use crate::models::{StreamId, WebsocketEvent, WebsocketMode};
30
31mod apis;
32mod handle;
33mod models;
34
35pub use apis::*;
36pub use handle::*;
37pub use models::*;
38
39const HAS_TIME_UNIT: bool = true;
40
41pub struct WebsocketStreams {
42    websocket_streams_base: Arc<WebsocketStreamsBase>,
43    web_socket_streams_api_client: WebSocketStreamsApiClient,
44}
45
46impl WebsocketStreams {
47    pub(crate) async fn connect(
48        config: ConfigurationWebsocketStreams,
49        streams: Vec<String>,
50        mode: Option<WebsocketMode>,
51    ) -> anyhow::Result<Self> {
52        let mut cfg = config;
53        if let Some(m) = mode {
54            cfg.mode = m;
55        }
56
57        if !HAS_TIME_UNIT {
58            cfg.time_unit = None;
59        }
60
61        let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);
62
63        websocket_streams_base.clone().connect(streams).await?;
64
65        Ok(Self {
66            websocket_streams_base: websocket_streams_base.clone(),
67            web_socket_streams_api_client: WebSocketStreamsApiClient::new(
68                websocket_streams_base.clone(),
69            ),
70        })
71    }
72
73    /// Subscribes to WebSocket events with a provided callback function.
74    ///
75    /// # Arguments
76    ///
77    /// * `callback` - A mutable function that takes a `WebsocketEvent` and is `Send` and `'static`.
78    ///
79    /// # Returns
80    ///
81    /// A `Subscription` that can be used to manage the event subscription.
82    ///
83    /// # Examples
84    ///
85    ///
86    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
87    ///     // Handle WebSocket event
88    /// });
89    ///
90    pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
91    where
92        F: FnMut(WebsocketEvent) + Send + 'static,
93    {
94        let base = Arc::clone(&self.websocket_streams_base);
95        base.common.events.subscribe(callback)
96    }
97
98    /// Unsubscribes from WebSocket events for a given `Subscription`.
99    ///
100    /// # Arguments
101    ///
102    /// * `subscription` - The `Subscription` to unsubscribe from WebSocket events.
103    ///
104    /// # Examples
105    ///
106    ///
107    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
108    ///     // Handle WebSocket event
109    /// });
110    /// `websocket_streams.unsubscribe_from_ws_events(subscription)`;
111    ///
112    pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
113        subscription.unsubscribe();
114    }
115
116    /// Disconnects the WebSocket connection.
117    ///
118    /// # Returns
119    ///
120    /// A `Result` indicating whether the disconnection was successful.
121    /// Returns an error if the disconnection fails.
122    ///
123    /// # Errors
124    ///
125    /// Returns an [`anyhow::Error`] if the connection fails.
126    ///
127    /// # Examples
128    ///
129    ///
130    /// let `websocket_streams` = `WebSocketStreams::new`(...);
131    /// `websocket_streams.disconnect().await`?;
132    ///
133    pub async fn disconnect(&self) -> anyhow::Result<()> {
134        self.websocket_streams_base
135            .disconnect()
136            .await
137            .map_err(anyhow::Error::msg)
138    }
139
140    /// Checks if the WebSocket connection is currently active.
141    ///
142    /// # Returns
143    ///
144    /// A `bool` indicating whether the WebSocket connection is established and connected.
145    ///
146    /// # Examples
147    ///
148    ///
149    /// let `is_active` = `websocket_streams.is_connected().await`;
150    /// if `is_active` {
151    ///     // WebSocket connection is active
152    /// }
153    ///
154    pub async fn is_connected(&self) -> bool {
155        self.websocket_streams_base.is_connected().await
156    }
157
158    /// Sends a ping to the WebSocket server to maintain the connection.
159    ///
160    /// # Examples
161    ///
162    ///
163    /// `websocket_streams.ping_server().await`;
164    ///
165    ///
166    /// This method sends a ping request to the WebSocket server to keep the connection alive
167    /// and check the server's responsiveness.
168    pub async fn ping_server(&self) {
169        self.websocket_streams_base.ping_server().await;
170    }
171
172    /// Subscribes to specified WebSocket streams.
173    ///
174    /// # Arguments
175    ///
176    /// * `streams` - A vector of stream names to subscribe to
177    /// * `id` - An optional identifier for the subscription request
178    ///
179    /// # Examples
180    ///
181    ///
182    /// `websocket_streams.subscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
183    ///
184    ///
185    /// This method initiates an asynchronous subscription to the specified WebSocket streams.
186    /// The subscription is performed in a separate task using `spawn`.
187    pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
188        let base = Arc::clone(&self.websocket_streams_base);
189        spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
190    }
191
192    /// Unsubscribes from specified WebSocket streams.
193    ///
194    /// # Arguments
195    ///
196    /// * `streams` - A vector of stream names to unsubscribe from
197    /// * `id` - An optional identifier for the unsubscription request
198    ///
199    /// # Examples
200    ///
201    ///
202    /// `websocket_streams.unsubscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
203    ///
204    ///
205    /// This method initiates an asynchronous unsubscription from the specified WebSocket streams.
206    /// The unsubscription is performed in a separate task using `spawn`.
207    pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
208        let base = Arc::clone(&self.websocket_streams_base);
209        spawn(async move {
210            base.unsubscribe(streams, id.map(StreamId::from), None)
211                .await;
212        });
213    }
214
215    /// Checks if the current WebSocket stream is subscribed to a specific stream.
216    ///
217    /// # Arguments
218    ///
219    /// * `stream` - The name of the stream to check for subscription
220    ///
221    /// # Returns
222    ///
223    /// A boolean indicating whether the stream is currently subscribed
224    ///
225    /// # Examples
226    ///
227    ///
228    /// let `is_subscribed` = `websocket_streams.is_subscribed("btcusdt@trade").await`;
229    ///
230    ///
231    /// This method checks the subscription status of a specific WebSocket stream.
232    pub async fn is_subscribed(&self, stream: &str) -> bool {
233        self.websocket_streams_base.is_subscribed(stream).await
234    }
235
236    /// WebSocket Aggregate Trade Streams
237    ///
238    /// The Aggregate Trade Streams push trade information that is aggregated for a single taker order.
239    ///
240    /// # Arguments
241    ///
242    /// - `params`: [`AggTradeParams`]
243    ///   The parameters for this operation.
244    ///
245    /// # Returns
246    ///
247    /// [`Arc<WebsocketStream<models::AggTradeResponse>>`] on success.
248    ///
249    /// # Errors
250    ///
251    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
252    ///
253    ///
254    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#aggregate-trade-streams).
255    ///
256    pub async fn agg_trade(
257        &self,
258        params: AggTradeParams,
259    ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
260        self.web_socket_streams_api_client.agg_trade(params).await
261    }
262
263    /// WebSocket All Market Rolling Window Statistics Streams
264    ///
265    /// Rolling window ticker statistics for all market symbols, computed over multiple windows.
266    /// Note that only tickers that have changed will be present in the array.
267    ///
268    /// # Arguments
269    ///
270    /// - `params`: [`AllMarketRollingWindowTickerParams`]
271    ///   The parameters for this operation.
272    ///
273    /// # Returns
274    ///
275    /// [`Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>`] on success.
276    ///
277    /// # Errors
278    ///
279    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
280    ///
281    ///
282    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#all-market-rolling-window-statistics-streams).
283    ///
284    pub async fn all_market_rolling_window_ticker(
285        &self,
286        params: AllMarketRollingWindowTickerParams,
287    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
288    {
289        self.web_socket_streams_api_client
290            .all_market_rolling_window_ticker(params)
291            .await
292    }
293
294    /// WebSocket All Market Mini Tickers Stream
295    ///
296    /// 24hr rolling window mini-ticker statistics for all symbols that changed in an array. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs. Note that only tickers that have changed will be present in the array.
297    ///
298    /// # Arguments
299    ///
300    /// - `params`: [`AllMiniTickerParams`]
301    ///   The parameters for this operation.
302    ///
303    /// # Returns
304    ///
305    /// [`Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>`] on success.
306    ///
307    /// # Errors
308    ///
309    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
310    ///
311    ///
312    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#all-market-mini-tickers-stream).
313    ///
314    pub async fn all_mini_ticker(
315        &self,
316        params: AllMiniTickerParams,
317    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
318        self.web_socket_streams_api_client
319            .all_mini_ticker(params)
320            .await
321    }
322
323    /// WebSocket Average Price
324    ///
325    /// Average price streams push changes in the average price over a fixed time interval.
326    ///
327    /// # Arguments
328    ///
329    /// - `params`: [`AvgPriceParams`]
330    ///   The parameters for this operation.
331    ///
332    /// # Returns
333    ///
334    /// [`Arc<WebsocketStream<models::AvgPriceResponse>>`] on success.
335    ///
336    /// # Errors
337    ///
338    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
339    ///
340    ///
341    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#average-price).
342    ///
343    pub async fn avg_price(
344        &self,
345        params: AvgPriceParams,
346    ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
347        self.web_socket_streams_api_client.avg_price(params).await
348    }
349
350    /// WebSocket Block Trade Streams
351    ///
352    ///
353    ///
354    /// # Arguments
355    ///
356    /// - `params`: [`BlockTradeParams`]
357    ///   The parameters for this operation.
358    ///
359    /// # Returns
360    ///
361    /// [`Arc<WebsocketStream<models::BlockTradeResponse>>`] on success.
362    ///
363    /// # Errors
364    ///
365    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
366    ///
367    ///
368    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#block-trade-streams).
369    ///
370    pub async fn block_trade(
371        &self,
372        params: BlockTradeParams,
373    ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>> {
374        self.web_socket_streams_api_client.block_trade(params).await
375    }
376
377    /// WebSocket Individual Symbol Book Ticker Streams
378    ///
379    /// Pushes any update to the best bid or ask's price or quantity in real-time for a specified symbol.
380    /// Multiple `<symbol>@bookTicker` streams can be subscribed to over one connection.
381    ///
382    /// # Arguments
383    ///
384    /// - `params`: [`BookTickerParams`]
385    ///   The parameters for this operation.
386    ///
387    /// # Returns
388    ///
389    /// [`Arc<WebsocketStream<models::BookTickerResponse>>`] on success.
390    ///
391    /// # Errors
392    ///
393    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
394    ///
395    ///
396    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-book-ticker-streams).
397    ///
398    pub async fn book_ticker(
399        &self,
400        params: BookTickerParams,
401    ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
402        self.web_socket_streams_api_client.book_ticker(params).await
403    }
404
405    /// WebSocket Diff. Depth Stream
406    ///
407    /// Order book price and quantity depth updates used to locally manage an order book.
408    ///
409    /// # Arguments
410    ///
411    /// - `params`: [`DiffBookDepthParams`]
412    ///   The parameters for this operation.
413    ///
414    /// # Returns
415    ///
416    /// [`Arc<WebsocketStream<models::DiffBookDepthResponse>>`] on success.
417    ///
418    /// # Errors
419    ///
420    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
421    ///
422    ///
423    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#diff-depth-stream).
424    ///
425    pub async fn diff_book_depth(
426        &self,
427        params: DiffBookDepthParams,
428    ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
429        self.web_socket_streams_api_client
430            .diff_book_depth(params)
431            .await
432    }
433
434    /// WebSocket Kline/Candlestick Streams for UTC
435    ///
436    /// The Kline/Candlestick Stream push updates to the current klines/candlestick every second in `UTC+0` timezone
437    ///
438    /// <a id="kline-intervals"></a>
439    ///
440    /// # Arguments
441    ///
442    /// - `params`: [`KlineParams`]
443    ///   The parameters for this operation.
444    ///
445    /// # Returns
446    ///
447    /// [`Arc<WebsocketStream<models::KlineResponse>>`] on success.
448    ///
449    /// # Errors
450    ///
451    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
452    ///
453    ///
454    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#klinecandlestick-streams-for-utc).
455    ///
456    pub async fn kline(
457        &self,
458        params: KlineParams,
459    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
460        self.web_socket_streams_api_client.kline(params).await
461    }
462
463    /// WebSocket Kline/Candlestick Streams with timezone offset
464    ///
465    /// The Kline/Candlestick Stream push updates to the current klines/candlestick every second in `UTC+8` timezone
466    ///
467    /// # Arguments
468    ///
469    /// - `params`: [`KlineOffsetParams`]
470    ///   The parameters for this operation.
471    ///
472    /// # Returns
473    ///
474    /// [`Arc<WebsocketStream<models::KlineOffsetResponse>>`] on success.
475    ///
476    /// # Errors
477    ///
478    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
479    ///
480    ///
481    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#klinecandlestick-streams-with-timezone-offset).
482    ///
483    pub async fn kline_offset(
484        &self,
485        params: KlineOffsetParams,
486    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
487        self.web_socket_streams_api_client
488            .kline_offset(params)
489            .await
490    }
491
492    /// WebSocket Individual Symbol Mini Ticker Stream
493    ///
494    /// 24hr rolling window mini-ticker statistics. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs.
495    ///
496    /// # Arguments
497    ///
498    /// - `params`: [`MiniTickerParams`]
499    ///   The parameters for this operation.
500    ///
501    /// # Returns
502    ///
503    /// [`Arc<WebsocketStream<models::MiniTickerResponse>>`] on success.
504    ///
505    /// # Errors
506    ///
507    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
508    ///
509    ///
510    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-mini-ticker-stream).
511    ///
512    pub async fn mini_ticker(
513        &self,
514        params: MiniTickerParams,
515    ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
516        self.web_socket_streams_api_client.mini_ticker(params).await
517    }
518
519    /// WebSocket Partial Book Depth Streams
520    ///
521    /// Top **\<levels\>** bids and asks, pushed every second. Valid **\<levels\>** are 5, 10, or 20.
522    ///
523    /// # Arguments
524    ///
525    /// - `params`: [`PartialBookDepthParams`]
526    ///   The parameters for this operation.
527    ///
528    /// # Returns
529    ///
530    /// [`Arc<WebsocketStream<models::PartialBookDepthResponse>>`] on success.
531    ///
532    /// # Errors
533    ///
534    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
535    ///
536    ///
537    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#partial-book-depth-streams).
538    ///
539    pub async fn partial_book_depth(
540        &self,
541        params: PartialBookDepthParams,
542    ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
543        self.web_socket_streams_api_client
544            .partial_book_depth(params)
545            .await
546    }
547
548    /// WebSocket Reference Price Streams
549    ///
550    ///
551    ///
552    /// # Arguments
553    ///
554    /// - `params`: [`ReferencePriceParams`]
555    ///   The parameters for this operation.
556    ///
557    /// # Returns
558    ///
559    /// [`Arc<WebsocketStream<models::ReferencePriceResponse>>`] on success.
560    ///
561    /// # Errors
562    ///
563    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
564    ///
565    ///
566    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#reference-price-streams).
567    ///
568    pub async fn reference_price(
569        &self,
570        params: ReferencePriceParams,
571    ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>> {
572        self.web_socket_streams_api_client
573            .reference_price(params)
574            .await
575    }
576
577    /// WebSocket Individual Symbol Rolling Window Statistics Streams
578    ///
579    /// Rolling window ticker statistics for a single symbol, computed over multiple windows.
580    ///
581    /// # Arguments
582    ///
583    /// - `params`: [`RollingWindowTickerParams`]
584    ///   The parameters for this operation.
585    ///
586    /// # Returns
587    ///
588    /// [`Arc<WebsocketStream<models::RollingWindowTickerResponse>>`] on success.
589    ///
590    /// # Errors
591    ///
592    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
593    ///
594    ///
595    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-rolling-window-statistics-streams).
596    ///
597    pub async fn rolling_window_ticker(
598        &self,
599        params: RollingWindowTickerParams,
600    ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
601        self.web_socket_streams_api_client
602            .rolling_window_ticker(params)
603            .await
604    }
605
606    /// WebSocket Individual Symbol Ticker Streams
607    ///
608    /// 24hr rolling window ticker statistics for a single symbol. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs.
609    ///
610    /// # Arguments
611    ///
612    /// - `params`: [`TickerParams`]
613    ///   The parameters for this operation.
614    ///
615    /// # Returns
616    ///
617    /// [`Arc<WebsocketStream<models::TickerResponse>>`] on success.
618    ///
619    /// # Errors
620    ///
621    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
622    ///
623    ///
624    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-ticker-streams).
625    ///
626    pub async fn ticker(
627        &self,
628        params: TickerParams,
629    ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
630        self.web_socket_streams_api_client.ticker(params).await
631    }
632
633    /// WebSocket Trade Streams
634    ///
635    /// The Trade Streams push raw trade information; each trade has a unique buyer and seller.
636    ///
637    /// # Arguments
638    ///
639    /// - `params`: [`TradeParams`]
640    ///   The parameters for this operation.
641    ///
642    /// # Returns
643    ///
644    /// [`Arc<WebsocketStream<models::TradeResponse>>`] on success.
645    ///
646    /// # Errors
647    ///
648    /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
649    ///
650    ///
651    /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#trade-streams).
652    ///
653    pub async fn trade(
654        &self,
655        params: TradeParams,
656    ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
657        self.web_socket_streams_api_client.trade(params).await
658    }
659}