Skip to main content

binance_sdk/margin_trading/websocket_streams/
mod.rs

1/*
2 * Binance Margin Trading WebSocket Market Streams
3 *
4 * OpenAPI Specification for the Binance Margin Trading WebSocket Market Streams
5 *
6 * The version of the OpenAPI document: 1.0.0
7 *
8 *
9 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
10 * https://openapi-generator.tech
11 * Do not edit the class manually.
12 */
13
14#![allow(unused_imports)]
15use serde_json::Value;
16use std::sync::{Arc, atomic::Ordering};
17use tokio::spawn;
18
19use crate::common::config::ConfigurationWebsocketStreams;
20use crate::common::websocket::{
21    Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
22    create_stream_handler,
23};
24use crate::models::{StreamId, WebsocketEvent, WebsocketMode};
25
26mod apis;
27mod handle;
28mod models;
29
30pub use apis::*;
31pub use handle::*;
32pub use models::*;
33
34pub struct WebsocketStreams {
35    websocket_streams_base: Arc<WebsocketStreamsBase>,
36}
37
38impl WebsocketStreams {
39    pub(crate) async fn connect(
40        config: ConfigurationWebsocketStreams,
41        streams: Vec<String>,
42        mode: Option<WebsocketMode>,
43    ) -> anyhow::Result<Self> {
44        let mut cfg = config;
45        if let Some(m) = mode {
46            cfg.mode = m;
47        }
48
49        let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);
50
51        websocket_streams_base.clone().connect(streams).await?;
52
53        Ok(Self {
54            websocket_streams_base: websocket_streams_base.clone(),
55        })
56    }
57
58    /// Subscribes to WebSocket events with a provided callback function.
59    ///
60    /// # Arguments
61    ///
62    /// * `callback` - A mutable function that takes a `WebsocketEvent` and is `Send` and `'static`.
63    ///
64    /// # Returns
65    ///
66    /// A `Subscription` that can be used to manage the event subscription.
67    ///
68    /// # Examples
69    ///
70    ///
71    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
72    ///     // Handle WebSocket event
73    /// });
74    ///
75    pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
76    where
77        F: FnMut(WebsocketEvent) + Send + 'static,
78    {
79        let base = Arc::clone(&self.websocket_streams_base);
80        base.common.events.subscribe(callback)
81    }
82
83    /// Unsubscribes from WebSocket events for a given `Subscription`.
84    ///
85    /// # Arguments
86    ///
87    /// * `subscription` - The `Subscription` to unsubscribe from WebSocket events.
88    ///
89    /// # Examples
90    ///
91    ///
92    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
93    ///     // Handle WebSocket event
94    /// });
95    /// `websocket_streams.unsubscribe_from_ws_events(subscription)`;
96    ///
97    pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
98        subscription.unsubscribe();
99    }
100
101    /// Disconnects the WebSocket connection.
102    ///
103    /// # Returns
104    ///
105    /// A `Result` indicating whether the disconnection was successful.
106    /// Returns an error if the disconnection fails.
107    ///
108    /// # Errors
109    ///
110    /// Returns an [`anyhow::Error`] if the connection fails.
111    ///
112    /// # Examples
113    ///
114    ///
115    /// let `websocket_streams` = `WebSocketStreams::new`(...);
116    /// `websocket_streams.disconnect().await`?;
117    ///
118    pub async fn disconnect(&self) -> anyhow::Result<()> {
119        self.websocket_streams_base
120            .disconnect()
121            .await
122            .map_err(anyhow::Error::msg)
123    }
124
125    /// Checks if the WebSocket connection is currently active.
126    ///
127    /// # Returns
128    ///
129    /// A `bool` indicating whether the WebSocket connection is established and connected.
130    ///
131    /// # Examples
132    ///
133    ///
134    /// let `is_active` = `websocket_streams.is_connected().await`;
135    /// if `is_active` {
136    ///     // WebSocket connection is active
137    /// }
138    ///
139    pub async fn is_connected(&self) -> bool {
140        self.websocket_streams_base.is_connected().await
141    }
142
143    /// Sends a ping to the WebSocket server to maintain the connection.
144    ///
145    /// # Examples
146    ///
147    ///
148    /// `websocket_streams.ping_server().await`;
149    ///
150    ///
151    /// This method sends a ping request to the WebSocket server to keep the connection alive
152    /// and check the server's responsiveness.
153    pub async fn ping_server(&self) {
154        self.websocket_streams_base.ping_server().await;
155    }
156
157    /// Subscribes to specified WebSocket streams.
158    ///
159    /// # Arguments
160    ///
161    /// * `streams` - A vector of stream names to subscribe to
162    /// * `id` - An optional identifier for the subscription request
163    ///
164    /// # Examples
165    ///
166    ///
167    /// `websocket_streams.subscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
168    ///
169    ///
170    /// This method initiates an asynchronous subscription to the specified WebSocket streams.
171    /// The subscription is performed in a separate task using `spawn`.
172    pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
173        let base = Arc::clone(&self.websocket_streams_base);
174        spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
175    }
176
177    /// Unsubscribes from specified WebSocket streams.
178    ///
179    /// # Arguments
180    ///
181    /// * `streams` - A vector of stream names to unsubscribe from
182    /// * `id` - An optional identifier for the unsubscription request
183    ///
184    /// # Examples
185    ///
186    ///
187    /// `websocket_streams.unsubscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
188    ///
189    ///
190    /// This method initiates an asynchronous unsubscription from the specified WebSocket streams.
191    /// The unsubscription is performed in a separate task using `spawn`.
192    pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
193        let base = Arc::clone(&self.websocket_streams_base);
194        spawn(async move {
195            base.unsubscribe(streams, id.map(StreamId::from), None)
196                .await;
197        });
198    }
199
200    /// Checks if the current WebSocket stream is subscribed to a specific stream.
201    ///
202    /// # Arguments
203    ///
204    /// * `stream` - The name of the stream to check for subscription
205    ///
206    /// # Returns
207    ///
208    /// A boolean indicating whether the stream is currently subscribed
209    ///
210    /// # Examples
211    ///
212    ///
213    /// let `is_subscribed` = `websocket_streams.is_subscribed("btcusdt@trade").await`;
214    ///
215    ///
216    /// This method checks the subscription status of a specific WebSocket stream.
217    pub async fn is_subscribed(&self, stream: &str) -> bool {
218        self.websocket_streams_base.is_subscribed(stream).await
219    }
220
221    /// Risk Data Stream
222    ///
223    /// Establishes a WebSocket stream for risk-specific data events.
224    ///
225    /// # Arguments
226    ///
227    /// - `listen_key`: A unique key for identifying the risk data stream
228    /// - `id`: An optional identifier for the stream request
229    ///
230    /// # Returns
231    ///
232    /// [`Arc<WebsocketStream<RiskDataStreamEventsResponse>>`] on success.
233    ///
234    /// # Errors
235    ///
236    /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
237    ///
238    /// # Examples
239    ///
240    ///
241    /// let `risk_stream` = `websocket_streams.risk_data(listen_key`, None).await?;
242    ///
243    pub async fn risk_data(
244        &self,
245        listen_key: String,
246        id: Option<String>,
247    ) -> anyhow::Result<Arc<WebsocketStream<RiskDataStreamEventsResponse>>> {
248        Ok(create_stream_handler::<RiskDataStreamEventsResponse>(
249            WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
250            listen_key,
251            id.map(StreamId::from),
252            None,
253        )
254        .await)
255    }
256
257    /// Trade Data Stream
258    ///
259    /// Establishes a WebSocket stream for trade-specific data events.
260    ///
261    /// # Arguments
262    ///
263    /// - `listen_key`: A unique key for identifying the trade data stream
264    /// - `id`: An optional identifier for the stream request
265    ///
266    /// # Returns
267    ///
268    /// [`Arc<WebsocketStream<TradeDataStreamEventsResponse>>`] on success.
269    ///
270    /// # Errors
271    ///
272    /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
273    ///
274    /// # Examples
275    ///
276    ///
277    /// let `trade_stream` = `websocket_streams.trade_data(listen_key`, None).await?;
278    ///
279    pub async fn trade_data(
280        &self,
281        listen_key: String,
282        id: Option<String>,
283    ) -> anyhow::Result<Arc<WebsocketStream<TradeDataStreamEventsResponse>>> {
284        Ok(create_stream_handler::<TradeDataStreamEventsResponse>(
285            WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
286            listen_key,
287            id.map(StreamId::from),
288            None,
289        )
290        .await)
291    }
292}