binance_sdk/margin_trading/websocket_streams/mod.rs
1/*
2 * Binance Margin Trading WebSocket Market Streams
3 *
4 * OpenAPI Specification for the Binance Margin Trading WebSocket Market Streams
5 *
6 * The version of the OpenAPI document: 1.0.0
7 *
8 *
9 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
10 * https://openapi-generator.tech
11 * Do not edit the class manually.
12 */
13
14#![allow(unused_imports)]
15use serde_json::Value;
16use std::sync::{Arc, atomic::Ordering};
17use tokio::spawn;
18
19use crate::common::config::ConfigurationWebsocketStreams;
20use crate::common::websocket::{
21 Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
22 create_stream_handler,
23};
24use crate::models::{StreamId, WebsocketEvent, WebsocketMode};
25
26mod apis;
27mod handle;
28mod models;
29
30pub use apis::*;
31pub use handle::*;
32pub use models::*;
33
34pub struct WebsocketStreams {
35 websocket_streams_base: Arc<WebsocketStreamsBase>,
36}
37
38impl WebsocketStreams {
39 pub(crate) async fn connect(
40 config: ConfigurationWebsocketStreams,
41 streams: Vec<String>,
42 mode: Option<WebsocketMode>,
43 ) -> anyhow::Result<Self> {
44 let mut cfg = config;
45 if let Some(m) = mode {
46 cfg.mode = m;
47 }
48
49 let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);
50
51 websocket_streams_base.clone().connect(streams).await?;
52
53 Ok(Self {
54 websocket_streams_base: websocket_streams_base.clone(),
55 })
56 }
57
58 /// Subscribes to WebSocket events with a provided callback function.
59 ///
60 /// # Arguments
61 ///
62 /// * `callback` - A mutable function that takes a `WebsocketEvent` and is `Send` and `'static`.
63 ///
64 /// # Returns
65 ///
66 /// A `Subscription` that can be used to manage the event subscription.
67 ///
68 /// # Examples
69 ///
70 ///
71 /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
72 /// // Handle WebSocket event
73 /// });
74 ///
75 pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
76 where
77 F: FnMut(WebsocketEvent) + Send + 'static,
78 {
79 let base = Arc::clone(&self.websocket_streams_base);
80 base.common.events.subscribe(callback)
81 }
82
83 /// Unsubscribes from WebSocket events for a given `Subscription`.
84 ///
85 /// # Arguments
86 ///
87 /// * `subscription` - The `Subscription` to unsubscribe from WebSocket events.
88 ///
89 /// # Examples
90 ///
91 ///
92 /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
93 /// // Handle WebSocket event
94 /// });
95 /// `websocket_streams.unsubscribe_from_ws_events(subscription)`;
96 ///
97 pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
98 subscription.unsubscribe();
99 }
100
101 /// Disconnects the WebSocket connection.
102 ///
103 /// # Returns
104 ///
105 /// A `Result` indicating whether the disconnection was successful.
106 /// Returns an error if the disconnection fails.
107 ///
108 /// # Errors
109 ///
110 /// Returns an [`anyhow::Error`] if the connection fails.
111 ///
112 /// # Examples
113 ///
114 ///
115 /// let `websocket_streams` = `WebSocketStreams::new`(...);
116 /// `websocket_streams.disconnect().await`?;
117 ///
118 pub async fn disconnect(&self) -> anyhow::Result<()> {
119 self.websocket_streams_base
120 .disconnect()
121 .await
122 .map_err(anyhow::Error::msg)
123 }
124
125 /// Checks if the WebSocket connection is currently active.
126 ///
127 /// # Returns
128 ///
129 /// A `bool` indicating whether the WebSocket connection is established and connected.
130 ///
131 /// # Examples
132 ///
133 ///
134 /// let `is_active` = `websocket_streams.is_connected().await`;
135 /// if `is_active` {
136 /// // WebSocket connection is active
137 /// }
138 ///
139 pub async fn is_connected(&self) -> bool {
140 self.websocket_streams_base.is_connected().await
141 }
142
143 /// Sends a ping to the WebSocket server to maintain the connection.
144 ///
145 /// # Examples
146 ///
147 ///
148 /// `websocket_streams.ping_server().await`;
149 ///
150 ///
151 /// This method sends a ping request to the WebSocket server to keep the connection alive
152 /// and check the server's responsiveness.
153 pub async fn ping_server(&self) {
154 self.websocket_streams_base.ping_server().await;
155 }
156
157 /// Subscribes to specified WebSocket streams.
158 ///
159 /// # Arguments
160 ///
161 /// * `streams` - A vector of stream names to subscribe to
162 /// * `id` - An optional identifier for the subscription request
163 ///
164 /// # Examples
165 ///
166 ///
167 /// `websocket_streams.subscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
168 ///
169 ///
170 /// This method initiates an asynchronous subscription to the specified WebSocket streams.
171 /// The subscription is performed in a separate task using `spawn`.
172 pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
173 let base = Arc::clone(&self.websocket_streams_base);
174 spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
175 }
176
177 /// Unsubscribes from specified WebSocket streams.
178 ///
179 /// # Arguments
180 ///
181 /// * `streams` - A vector of stream names to unsubscribe from
182 /// * `id` - An optional identifier for the unsubscription request
183 ///
184 /// # Examples
185 ///
186 ///
187 /// `websocket_streams.unsubscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
188 ///
189 ///
190 /// This method initiates an asynchronous unsubscription from the specified WebSocket streams.
191 /// The unsubscription is performed in a separate task using `spawn`.
192 pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
193 let base = Arc::clone(&self.websocket_streams_base);
194 spawn(async move {
195 base.unsubscribe(streams, id.map(StreamId::from), None)
196 .await;
197 });
198 }
199
200 /// Checks if the current WebSocket stream is subscribed to a specific stream.
201 ///
202 /// # Arguments
203 ///
204 /// * `stream` - The name of the stream to check for subscription
205 ///
206 /// # Returns
207 ///
208 /// A boolean indicating whether the stream is currently subscribed
209 ///
210 /// # Examples
211 ///
212 ///
213 /// let `is_subscribed` = `websocket_streams.is_subscribed("btcusdt@trade").await`;
214 ///
215 ///
216 /// This method checks the subscription status of a specific WebSocket stream.
217 pub async fn is_subscribed(&self, stream: &str) -> bool {
218 self.websocket_streams_base.is_subscribed(stream).await
219 }
220
221 /// Risk Data Stream
222 ///
223 /// Establishes a WebSocket stream for risk-specific data events.
224 ///
225 /// # Arguments
226 ///
227 /// - `listen_key`: A unique key for identifying the risk data stream
228 /// - `id`: An optional identifier for the stream request
229 ///
230 /// # Returns
231 ///
232 /// [`Arc<WebsocketStream<RiskDataStreamEventsResponse>>`] on success.
233 ///
234 /// # Errors
235 ///
236 /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
237 ///
238 /// # Examples
239 ///
240 ///
241 /// let `risk_stream` = `websocket_streams.risk_data(listen_key`, None).await?;
242 ///
243 pub async fn risk_data(
244 &self,
245 listen_key: String,
246 id: Option<String>,
247 ) -> anyhow::Result<Arc<WebsocketStream<RiskDataStreamEventsResponse>>> {
248 Ok(create_stream_handler::<RiskDataStreamEventsResponse>(
249 WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
250 listen_key,
251 id.map(StreamId::from),
252 None,
253 )
254 .await)
255 }
256
257 /// Trade Data Stream
258 ///
259 /// Establishes a WebSocket stream for trade-specific data events.
260 ///
261 /// # Arguments
262 ///
263 /// - `listen_key`: A unique key for identifying the trade data stream
264 /// - `id`: An optional identifier for the stream request
265 ///
266 /// # Returns
267 ///
268 /// [`Arc<WebsocketStream<TradeDataStreamEventsResponse>>`] on success.
269 ///
270 /// # Errors
271 ///
272 /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
273 ///
274 /// # Examples
275 ///
276 ///
277 /// let `trade_stream` = `websocket_streams.trade_data(listen_key`, None).await?;
278 ///
279 pub async fn trade_data(
280 &self,
281 listen_key: String,
282 id: Option<String>,
283 ) -> anyhow::Result<Arc<WebsocketStream<TradeDataStreamEventsResponse>>> {
284 Ok(create_stream_handler::<TradeDataStreamEventsResponse>(
285 WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
286 listen_key,
287 id.map(StreamId::from),
288 None,
289 )
290 .await)
291 }
292}