binance_sdk/spot/websocket_streams/mod.rs
1/*
2 * Binance Spot WebSocket Streams
3 *
4 * OpenAPI Specifications for the Binance Spot WebSocket Streams
5 *
6 * API documents:
7 * - [Github web-socket-streams documentation file](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md)
8 * - [General API information for web-socket-streams on website](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams)
9 *
10 *
11 * The version of the OpenAPI document: 1.0.0
12 *
13 *
14 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
15 * https://openapi-generator.tech
16 * Do not edit the class manually.
17 */
18
19#![allow(unused_imports)]
20use serde_json::Value;
21use std::sync::{Arc, atomic::Ordering};
22use tokio::spawn;
23
24use crate::common::config::ConfigurationWebsocketStreams;
25use crate::common::websocket::{
26 Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
27 create_stream_handler,
28};
29use crate::models::{StreamId, WebsocketEvent, WebsocketMode};
30
31mod apis;
32mod handle;
33mod models;
34
35pub use apis::*;
36pub use handle::*;
37pub use models::*;
38
39const HAS_TIME_UNIT: bool = true;
40
41pub struct WebsocketStreams {
42 websocket_streams_base: Arc<WebsocketStreamsBase>,
43 web_socket_streams_api_client: WebSocketStreamsApiClient,
44}
45
46impl WebsocketStreams {
47 pub(crate) async fn connect(
48 config: ConfigurationWebsocketStreams,
49 streams: Vec<String>,
50 mode: Option<WebsocketMode>,
51 ) -> anyhow::Result<Self> {
52 let mut cfg = config;
53 if let Some(m) = mode {
54 cfg.mode = m;
55 }
56
57 if !HAS_TIME_UNIT {
58 cfg.time_unit = None;
59 }
60
61 let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);
62
63 websocket_streams_base.clone().connect(streams).await?;
64
65 Ok(Self {
66 websocket_streams_base: websocket_streams_base.clone(),
67 web_socket_streams_api_client: WebSocketStreamsApiClient::new(
68 websocket_streams_base.clone(),
69 ),
70 })
71 }
72
73 /// Subscribes to WebSocket events with a provided callback function.
74 ///
75 /// # Arguments
76 ///
77 /// * `callback` - A mutable function that takes a `WebsocketEvent` and is `Send` and `'static`.
78 ///
79 /// # Returns
80 ///
81 /// A `Subscription` that can be used to manage the event subscription.
82 ///
83 /// # Examples
84 ///
85 ///
86 /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
87 /// // Handle WebSocket event
88 /// });
89 ///
90 pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
91 where
92 F: FnMut(WebsocketEvent) + Send + 'static,
93 {
94 let base = Arc::clone(&self.websocket_streams_base);
95 base.common.events.subscribe(callback)
96 }
97
98 /// Unsubscribes from WebSocket events for a given `Subscription`.
99 ///
100 /// # Arguments
101 ///
102 /// * `subscription` - The `Subscription` to unsubscribe from WebSocket events.
103 ///
104 /// # Examples
105 ///
106 ///
107 /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
108 /// // Handle WebSocket event
109 /// });
110 /// `websocket_streams.unsubscribe_from_ws_events(subscription)`;
111 ///
112 pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
113 subscription.unsubscribe();
114 }
115
116 /// Disconnects the WebSocket connection.
117 ///
118 /// # Returns
119 ///
120 /// A `Result` indicating whether the disconnection was successful.
121 /// Returns an error if the disconnection fails.
122 ///
123 /// # Errors
124 ///
125 /// Returns an [`anyhow::Error`] if the connection fails.
126 ///
127 /// # Examples
128 ///
129 ///
130 /// let `websocket_streams` = `WebSocketStreams::new`(...);
131 /// `websocket_streams.disconnect().await`?;
132 ///
133 pub async fn disconnect(&self) -> anyhow::Result<()> {
134 self.websocket_streams_base
135 .disconnect()
136 .await
137 .map_err(anyhow::Error::msg)
138 }
139
140 /// Checks if the WebSocket connection is currently active.
141 ///
142 /// # Returns
143 ///
144 /// A `bool` indicating whether the WebSocket connection is established and connected.
145 ///
146 /// # Examples
147 ///
148 ///
149 /// let `is_active` = `websocket_streams.is_connected().await`;
150 /// if `is_active` {
151 /// // WebSocket connection is active
152 /// }
153 ///
154 pub async fn is_connected(&self) -> bool {
155 self.websocket_streams_base.is_connected().await
156 }
157
158 /// Sends a ping to the WebSocket server to maintain the connection.
159 ///
160 /// # Examples
161 ///
162 ///
163 /// `websocket_streams.ping_server().await`;
164 ///
165 ///
166 /// This method sends a ping request to the WebSocket server to keep the connection alive
167 /// and check the server's responsiveness.
168 pub async fn ping_server(&self) {
169 self.websocket_streams_base.ping_server().await;
170 }
171
172 /// Subscribes to specified WebSocket streams.
173 ///
174 /// # Arguments
175 ///
176 /// * `streams` - A vector of stream names to subscribe to
177 /// * `id` - An optional identifier for the subscription request
178 ///
179 /// # Examples
180 ///
181 ///
182 /// `websocket_streams.subscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
183 ///
184 ///
185 /// This method initiates an asynchronous subscription to the specified WebSocket streams.
186 /// The subscription is performed in a separate task using `spawn`.
187 pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
188 let base = Arc::clone(&self.websocket_streams_base);
189 spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
190 }
191
192 /// Unsubscribes from specified WebSocket streams.
193 ///
194 /// # Arguments
195 ///
196 /// * `streams` - A vector of stream names to unsubscribe from
197 /// * `id` - An optional identifier for the unsubscription request
198 ///
199 /// # Examples
200 ///
201 ///
202 /// `websocket_streams.unsubscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
203 ///
204 ///
205 /// This method initiates an asynchronous unsubscription from the specified WebSocket streams.
206 /// The unsubscription is performed in a separate task using `spawn`.
207 pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
208 let base = Arc::clone(&self.websocket_streams_base);
209 spawn(async move {
210 base.unsubscribe(streams, id.map(StreamId::from), None)
211 .await;
212 });
213 }
214
215 /// Checks if the current WebSocket stream is subscribed to a specific stream.
216 ///
217 /// # Arguments
218 ///
219 /// * `stream` - The name of the stream to check for subscription
220 ///
221 /// # Returns
222 ///
223 /// A boolean indicating whether the stream is currently subscribed
224 ///
225 /// # Examples
226 ///
227 ///
228 /// let `is_subscribed` = `websocket_streams.is_subscribed("btcusdt@trade").await`;
229 ///
230 ///
231 /// This method checks the subscription status of a specific WebSocket stream.
232 pub async fn is_subscribed(&self, stream: &str) -> bool {
233 self.websocket_streams_base.is_subscribed(stream).await
234 }
235
236 /// WebSocket Aggregate Trade Streams
237 ///
238 /// The Aggregate Trade Streams push trade information that is aggregated for a single taker order.
239 ///
240 /// # Arguments
241 ///
242 /// - `params`: [`AggTradeParams`]
243 /// The parameters for this operation.
244 ///
245 /// # Returns
246 ///
247 /// [`Arc<WebsocketStream<models::AggTradeResponse>>`] on success.
248 ///
249 /// # Errors
250 ///
251 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
252 ///
253 ///
254 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#aggregate-trade-streams).
255 ///
256 pub async fn agg_trade(
257 &self,
258 params: AggTradeParams,
259 ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
260 self.web_socket_streams_api_client.agg_trade(params).await
261 }
262
263 /// WebSocket All Market Rolling Window Statistics Streams
264 ///
265 /// Rolling window ticker statistics for all market symbols, computed over multiple windows.
266 /// Note that only tickers that have changed will be present in the array.
267 ///
268 /// # Arguments
269 ///
270 /// - `params`: [`AllMarketRollingWindowTickerParams`]
271 /// The parameters for this operation.
272 ///
273 /// # Returns
274 ///
275 /// [`Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>`] on success.
276 ///
277 /// # Errors
278 ///
279 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
280 ///
281 ///
282 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#all-market-rolling-window-statistics-streams).
283 ///
284 pub async fn all_market_rolling_window_ticker(
285 &self,
286 params: AllMarketRollingWindowTickerParams,
287 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
288 {
289 self.web_socket_streams_api_client
290 .all_market_rolling_window_ticker(params)
291 .await
292 }
293
294 /// WebSocket All Market Mini Tickers Stream
295 ///
296 /// 24hr rolling window mini-ticker statistics for all symbols that changed in an array. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs. Note that only tickers that have changed will be present in the array.
297 ///
298 /// # Arguments
299 ///
300 /// - `params`: [`AllMiniTickerParams`]
301 /// The parameters for this operation.
302 ///
303 /// # Returns
304 ///
305 /// [`Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>`] on success.
306 ///
307 /// # Errors
308 ///
309 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
310 ///
311 ///
312 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#all-market-mini-tickers-stream).
313 ///
314 pub async fn all_mini_ticker(
315 &self,
316 params: AllMiniTickerParams,
317 ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
318 self.web_socket_streams_api_client
319 .all_mini_ticker(params)
320 .await
321 }
322
323 /// WebSocket Average Price
324 ///
325 /// Average price streams push changes in the average price over a fixed time interval.
326 ///
327 /// # Arguments
328 ///
329 /// - `params`: [`AvgPriceParams`]
330 /// The parameters for this operation.
331 ///
332 /// # Returns
333 ///
334 /// [`Arc<WebsocketStream<models::AvgPriceResponse>>`] on success.
335 ///
336 /// # Errors
337 ///
338 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
339 ///
340 ///
341 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#average-price).
342 ///
343 pub async fn avg_price(
344 &self,
345 params: AvgPriceParams,
346 ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
347 self.web_socket_streams_api_client.avg_price(params).await
348 }
349
350 /// WebSocket Block Trade Streams
351 ///
352 ///
353 ///
354 /// # Arguments
355 ///
356 /// - `params`: [`BlockTradeParams`]
357 /// The parameters for this operation.
358 ///
359 /// # Returns
360 ///
361 /// [`Arc<WebsocketStream<models::BlockTradeResponse>>`] on success.
362 ///
363 /// # Errors
364 ///
365 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
366 ///
367 ///
368 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#block-trade-streams).
369 ///
370 pub async fn block_trade(
371 &self,
372 params: BlockTradeParams,
373 ) -> anyhow::Result<Arc<WebsocketStream<models::BlockTradeResponse>>> {
374 self.web_socket_streams_api_client.block_trade(params).await
375 }
376
377 /// WebSocket Individual Symbol Book Ticker Streams
378 ///
379 /// Pushes any update to the best bid or ask's price or quantity in real-time for a specified symbol.
380 /// Multiple `<symbol>@bookTicker` streams can be subscribed to over one connection.
381 ///
382 /// # Arguments
383 ///
384 /// - `params`: [`BookTickerParams`]
385 /// The parameters for this operation.
386 ///
387 /// # Returns
388 ///
389 /// [`Arc<WebsocketStream<models::BookTickerResponse>>`] on success.
390 ///
391 /// # Errors
392 ///
393 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
394 ///
395 ///
396 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-book-ticker-streams).
397 ///
398 pub async fn book_ticker(
399 &self,
400 params: BookTickerParams,
401 ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
402 self.web_socket_streams_api_client.book_ticker(params).await
403 }
404
405 /// WebSocket Diff. Depth Stream
406 ///
407 /// Order book price and quantity depth updates used to locally manage an order book.
408 ///
409 /// # Arguments
410 ///
411 /// - `params`: [`DiffBookDepthParams`]
412 /// The parameters for this operation.
413 ///
414 /// # Returns
415 ///
416 /// [`Arc<WebsocketStream<models::DiffBookDepthResponse>>`] on success.
417 ///
418 /// # Errors
419 ///
420 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
421 ///
422 ///
423 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#diff-depth-stream).
424 ///
425 pub async fn diff_book_depth(
426 &self,
427 params: DiffBookDepthParams,
428 ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
429 self.web_socket_streams_api_client
430 .diff_book_depth(params)
431 .await
432 }
433
434 /// WebSocket Kline/Candlestick Streams for UTC
435 ///
436 /// The Kline/Candlestick Stream push updates to the current klines/candlestick every second in `UTC+0` timezone
437 ///
438 /// <a id="kline-intervals"></a>
439 ///
440 /// # Arguments
441 ///
442 /// - `params`: [`KlineParams`]
443 /// The parameters for this operation.
444 ///
445 /// # Returns
446 ///
447 /// [`Arc<WebsocketStream<models::KlineResponse>>`] on success.
448 ///
449 /// # Errors
450 ///
451 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
452 ///
453 ///
454 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#klinecandlestick-streams-for-utc).
455 ///
456 pub async fn kline(
457 &self,
458 params: KlineParams,
459 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
460 self.web_socket_streams_api_client.kline(params).await
461 }
462
463 /// WebSocket Kline/Candlestick Streams with timezone offset
464 ///
465 /// The Kline/Candlestick Stream push updates to the current klines/candlestick every second in `UTC+8` timezone
466 ///
467 /// # Arguments
468 ///
469 /// - `params`: [`KlineOffsetParams`]
470 /// The parameters for this operation.
471 ///
472 /// # Returns
473 ///
474 /// [`Arc<WebsocketStream<models::KlineOffsetResponse>>`] on success.
475 ///
476 /// # Errors
477 ///
478 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
479 ///
480 ///
481 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#klinecandlestick-streams-with-timezone-offset).
482 ///
483 pub async fn kline_offset(
484 &self,
485 params: KlineOffsetParams,
486 ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
487 self.web_socket_streams_api_client
488 .kline_offset(params)
489 .await
490 }
491
492 /// WebSocket Individual Symbol Mini Ticker Stream
493 ///
494 /// 24hr rolling window mini-ticker statistics. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs.
495 ///
496 /// # Arguments
497 ///
498 /// - `params`: [`MiniTickerParams`]
499 /// The parameters for this operation.
500 ///
501 /// # Returns
502 ///
503 /// [`Arc<WebsocketStream<models::MiniTickerResponse>>`] on success.
504 ///
505 /// # Errors
506 ///
507 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
508 ///
509 ///
510 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-mini-ticker-stream).
511 ///
512 pub async fn mini_ticker(
513 &self,
514 params: MiniTickerParams,
515 ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
516 self.web_socket_streams_api_client.mini_ticker(params).await
517 }
518
519 /// WebSocket Partial Book Depth Streams
520 ///
521 /// Top **\<levels\>** bids and asks, pushed every second. Valid **\<levels\>** are 5, 10, or 20.
522 ///
523 /// # Arguments
524 ///
525 /// - `params`: [`PartialBookDepthParams`]
526 /// The parameters for this operation.
527 ///
528 /// # Returns
529 ///
530 /// [`Arc<WebsocketStream<models::PartialBookDepthResponse>>`] on success.
531 ///
532 /// # Errors
533 ///
534 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
535 ///
536 ///
537 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#partial-book-depth-streams).
538 ///
539 pub async fn partial_book_depth(
540 &self,
541 params: PartialBookDepthParams,
542 ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
543 self.web_socket_streams_api_client
544 .partial_book_depth(params)
545 .await
546 }
547
548 /// WebSocket Reference Price Streams
549 ///
550 ///
551 ///
552 /// # Arguments
553 ///
554 /// - `params`: [`ReferencePriceParams`]
555 /// The parameters for this operation.
556 ///
557 /// # Returns
558 ///
559 /// [`Arc<WebsocketStream<models::ReferencePriceResponse>>`] on success.
560 ///
561 /// # Errors
562 ///
563 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
564 ///
565 ///
566 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#reference-price-streams).
567 ///
568 pub async fn reference_price(
569 &self,
570 params: ReferencePriceParams,
571 ) -> anyhow::Result<Arc<WebsocketStream<models::ReferencePriceResponse>>> {
572 self.web_socket_streams_api_client
573 .reference_price(params)
574 .await
575 }
576
577 /// WebSocket Individual Symbol Rolling Window Statistics Streams
578 ///
579 /// Rolling window ticker statistics for a single symbol, computed over multiple windows.
580 ///
581 /// # Arguments
582 ///
583 /// - `params`: [`RollingWindowTickerParams`]
584 /// The parameters for this operation.
585 ///
586 /// # Returns
587 ///
588 /// [`Arc<WebsocketStream<models::RollingWindowTickerResponse>>`] on success.
589 ///
590 /// # Errors
591 ///
592 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
593 ///
594 ///
595 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-rolling-window-statistics-streams).
596 ///
597 pub async fn rolling_window_ticker(
598 &self,
599 params: RollingWindowTickerParams,
600 ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
601 self.web_socket_streams_api_client
602 .rolling_window_ticker(params)
603 .await
604 }
605
606 /// WebSocket Individual Symbol Ticker Streams
607 ///
608 /// 24hr rolling window ticker statistics for a single symbol. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs.
609 ///
610 /// # Arguments
611 ///
612 /// - `params`: [`TickerParams`]
613 /// The parameters for this operation.
614 ///
615 /// # Returns
616 ///
617 /// [`Arc<WebsocketStream<models::TickerResponse>>`] on success.
618 ///
619 /// # Errors
620 ///
621 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
622 ///
623 ///
624 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#individual-symbol-ticker-streams).
625 ///
626 pub async fn ticker(
627 &self,
628 params: TickerParams,
629 ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
630 self.web_socket_streams_api_client.ticker(params).await
631 }
632
633 /// WebSocket Trade Streams
634 ///
635 /// The Trade Streams push raw trade information; each trade has a unique buyer and seller.
636 ///
637 /// # Arguments
638 ///
639 /// - `params`: [`TradeParams`]
640 /// The parameters for this operation.
641 ///
642 /// # Returns
643 ///
644 /// [`Arc<WebsocketStream<models::TradeResponse>>`] on success.
645 ///
646 /// # Errors
647 ///
648 /// Returns an [`anyhow::Error`] if the stream request fails, if parameters are invalid, or if parsing the response fails.
649 ///
650 ///
651 /// For full API details, see the [Binance API Documentation](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams#trade-streams).
652 ///
653 pub async fn trade(
654 &self,
655 params: TradeParams,
656 ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
657 self.web_socket_streams_api_client.trade(params).await
658 }
659}