bybit/
ws.rs

1use crate::api::WebsocketAPI;
2use crate::client::Client;
3use crate::errors::BybitError;
4use crate::model::{
5    Category, ExecutionData, FastExecData, LiquidationData, OrderBookUpdate, OrderData,
6    PongResponse, PositionData, RequestType, Subscription, Tickers, WalletData, WebsocketEvents,
7    WsKline, WsTrade,
8};
9use crate::trade::build_ws_orders;
10use crate::util::{build_json_request, generate_random_uid, get_timestamp};
11use futures::{SinkExt, StreamExt};
12use serde_json::{json, Value};
13use std::collections::BTreeMap;
14use std::time::Instant;
15use tokio::net::TcpStream;
16use tokio::sync::mpsc;
17use tokio::time::Duration;
18use tokio_tungstenite::WebSocketStream;
19use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
20
21#[derive(Clone)]
22pub struct Stream {
23    pub client: Client,
24}
25
26impl Stream {
27    /// Tests for connectivity by sending a ping request to the Bybit server.
28    ///
29    /// # Returns
30    ///
31    /// Returns a `Result` containing a `String` with the response message if successful,
32    /// * `private` is set to `true` if the request is for a private endpoint
33    /// or a `BybitError` if an error occurs.
34    pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
35        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
36        parameters.insert("req_id".into(), generate_random_uid(8).into());
37        parameters.insert("op".into(), "ping".into());
38        let request = build_json_request(&parameters);
39        let endpoint = if private {
40            WebsocketAPI::Private
41        } else {
42            WebsocketAPI::PublicLinear
43        };
44        let mut response = self
45            .client
46            .wss_connect(endpoint, Some(request), private, None)
47            .await?;
48        let data = response.next().await.unwrap()?;
49        match data {
50            WsMessage::Text(data) => {
51                let response: PongResponse = serde_json::from_str(&data)?;
52                match response {
53                    PongResponse::PublicPong(pong) => {
54                        println!("Pong received successfully: {:#?}", pong);
55                    }
56                    PongResponse::PrivatePong(pong) => {
57                        println!("Pong received successfully: {:#?}", pong);
58                    }
59                }
60            }
61            _ => {}
62        }
63        Ok(())
64    }
65
66    pub async fn ws_priv_subscribe<'b, F>(
67        &self,
68        req: Subscription<'_>,
69        handler: F,
70    ) -> Result<(), BybitError>
71    where
72        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
73    {
74        let request = Self::build_subscription(req);
75        let response = self
76            .client
77            .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
78            .await?;
79        match Self::event_loop(response, handler, None).await {
80            Ok(_) => {}
81            Err(_) => {}
82        }
83        Ok(())
84    }
85
86    pub async fn ws_subscribe<'b, F>(
87        &self,
88        req: Subscription<'_>,
89        category: Category,
90        handler: F,
91    ) -> Result<(), BybitError>
92    where
93        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
94    {
95        let endpoint = {
96            match category {
97                Category::Linear => WebsocketAPI::PublicLinear,
98                Category::Inverse => WebsocketAPI::PublicInverse,
99                Category::Spot => WebsocketAPI::PublicSpot,
100                _ => unimplemented!("Option has not been implemented"),
101            }
102        };
103        let request = Self::build_subscription(req);
104        let response = self
105            .client
106            .wss_connect(endpoint, Some(request), false, None)
107            .await?;
108        Self::event_loop(response, handler, None).await?;
109        Ok(())
110    }
111
112    pub fn build_subscription(action: Subscription) -> String {
113        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
114        parameters.insert("req_id".into(), generate_random_uid(8).into());
115        parameters.insert("op".into(), action.op.into());
116        let args_value: Value = action
117            .args
118            .iter()
119            .map(ToString::to_string)
120            .collect::<Vec<_>>()
121            .into();
122        parameters.insert("args".into(), args_value);
123
124        build_json_request(&parameters)
125    }
126
127    pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
128        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
129        parameters.insert("reqId".into(), generate_random_uid(16).into());
130        let mut header_map: BTreeMap<String, String> = BTreeMap::new();
131        header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
132        header_map.insert(
133            "X-BAPI-RECV-WINDOW".into(),
134            recv_window.unwrap_or(5000).to_string(),
135        );
136        parameters.insert("header".into(), json!(header_map).into());
137        match orders {
138            RequestType::Create(order) => {
139                parameters.insert("op".into(), "order.create".into());
140                parameters.insert(
141                    "args".into(),
142                    build_ws_orders(RequestType::Create(order)).into(),
143                );
144            }
145            RequestType::Cancel(order) => {
146                parameters.insert("op".into(), "order.cancel".into());
147                parameters.insert(
148                    "args".into(),
149                    build_ws_orders(RequestType::Cancel(order)).into(),
150                );
151            }
152
153            RequestType::Amend(order) => {
154                parameters.insert("op".into(), "order.amend".into());
155                parameters.insert(
156                    "args".into(),
157                    build_ws_orders(RequestType::Amend(order)).into(),
158                );
159            }
160        }
161        build_json_request(&parameters)
162    }
163
164    /// Subscribes to the specified order book updates and handles the order book events
165    ///
166    /// # Arguments
167    ///
168    /// * `subs` - A vector of tuples containing the order book ID and symbol
169    /// * `category` - The category of the order book
170    ///
171    /// # Example
172    ///
173    /// ```
174    /// use your_crate_name::Category;
175    /// let subs = vec![(1, "BTC"), (2, "ETH")];
176    /// ```
177    pub async fn ws_orderbook(
178        &self,
179        subs: Vec<(i32, &str)>,
180        category: Category,
181        sender: mpsc::UnboundedSender<OrderBookUpdate>,
182    ) -> Result<(), BybitError> {
183        let arr: Vec<String> = subs
184            .into_iter()
185            .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
186            .collect();
187        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
188        self.ws_subscribe(request, category, move |event| {
189            if let WebsocketEvents::OrderBookEvent(order_book) = event {
190                sender.send(order_book).unwrap();
191            }
192            Ok(())
193        })
194        .await
195    }
196
197    /// This function subscribes to the specified trades and handles the trade events.
198    /// # Arguments
199    ///
200    /// * `subs` - A vector of trade subscriptions
201    /// * `category` - The category of the trades
202    ///
203    /// # Example
204    ///
205    /// ```
206    /// use your_crate_name::Category;
207    /// let subs = vec!["BTCUSD", "ETHUSD"];
208    /// let category = Category::Linear;
209    /// ws_trades(subs, category);
210    /// ```
211    pub async fn ws_trades(
212        &self,
213        subs: Vec<&str>,
214        category: Category,
215        sender: mpsc::UnboundedSender<WsTrade>,
216    ) -> Result<(), BybitError> {
217        let arr: Vec<String> = subs
218            .iter()
219            .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
220            .collect();
221        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
222        let handler = move |event| {
223            if let WebsocketEvents::TradeEvent(trades) = event {
224                for trade in trades.data {
225                    sender.send(trade).unwrap();
226                }
227            }
228            Ok(())
229        };
230
231        self.ws_subscribe(request, category, handler).await
232    }
233
234    /// Subscribes to ticker events for the specified symbols and category.
235    ///
236    /// # Arguments
237    ///
238    /// * `subs` - A vector of symbols for which ticker events are subscribed.
239    /// * `category` - The category for which ticker events are subscribed.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// use your_crate_name::Category;
245    /// let subs = vec!["BTCUSD", "ETHUSD"];
246    /// let category = Category::Linear;
247    /// let sender = UnboundedSender<Tickers>;
248    /// ws_tickers(subs, category, sender);
249    /// ```
250    pub async fn ws_tickers(
251        &self,
252        subs: Vec<&str>,
253        category: Category,
254        sender: mpsc::UnboundedSender<Tickers>,
255    ) -> Result<(), BybitError> {
256        let arr: Vec<String> = subs
257            .into_iter()
258            .map(|sub| format!("tickers.{}", sub.to_uppercase()))
259            .collect();
260        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
261
262        let handler = move |event| {
263            if let WebsocketEvents::TickerEvent(tickers) = event {
264                match tickers.data {
265                    Tickers::Linear(linear_ticker) => {
266                        sender.send(Tickers::Linear(linear_ticker)).unwrap()
267                    }
268                    Tickers::Spot(spot_ticker) => sender.send(Tickers::Spot(spot_ticker)).unwrap(),
269                }
270            }
271            Ok(())
272        };
273
274        self.ws_subscribe(request, category, handler).await
275    }
276    pub async fn ws_liquidations(
277        &self,
278        subs: Vec<&str>,
279        category: Category,
280        sender: mpsc::UnboundedSender<LiquidationData>,
281    ) -> Result<(), BybitError> {
282        let arr: Vec<String> = subs
283            .into_iter()
284            .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
285            .collect();
286        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
287
288        let handler = move |event| {
289            if let WebsocketEvents::LiquidationEvent(liquidation) = event {
290                sender.send(liquidation.data).unwrap();
291            }
292            Ok(())
293        };
294
295        self.ws_subscribe(request, category, handler).await
296    }
297    pub async fn ws_klines(
298        &self,
299        subs: Vec<(&str, &str)>,
300        category: Category,
301        sender: mpsc::UnboundedSender<WsKline>,
302    ) -> Result<(), BybitError> {
303        let arr: Vec<String> = subs
304            .into_iter()
305            .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
306            .collect();
307        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
308        self.ws_subscribe(request, category, move |event| {
309            if let WebsocketEvents::KlineEvent(kline) = event {
310                sender.send(kline).unwrap();
311            }
312            Ok(())
313        })
314        .await
315    }
316
317    pub async fn ws_position(
318        &self,
319        cat: Option<Category>,
320        sender: mpsc::UnboundedSender<PositionData>,
321    ) -> Result<(), BybitError> {
322        let sub_str = if let Some(v) = cat {
323            match v {
324                Category::Linear => "position.linear",
325                Category::Inverse => "position.inverse",
326                _ => "",
327            }
328        } else {
329            "position"
330        };
331
332        let request = Subscription::new("subscribe", vec![sub_str]);
333        self.ws_priv_subscribe(request, move |event| {
334            if let WebsocketEvents::PositionEvent(position) = event {
335                for v in position.data {
336                    sender.send(v).unwrap();
337                }
338            }
339            Ok(())
340        })
341        .await
342    }
343
344    pub async fn ws_executions(
345        &self,
346        cat: Option<Category>,
347        sender: mpsc::UnboundedSender<ExecutionData>,
348    ) -> Result<(), BybitError> {
349        let sub_str = if let Some(v) = cat {
350            match v {
351                Category::Linear => "execution.linear",
352                Category::Inverse => "execution.inverse",
353                Category::Spot => "execution.spot",
354                Category::Option => "execution.option",
355            }
356        } else {
357            "execution"
358        };
359
360        let request = Subscription::new("subscribe", vec![sub_str]);
361        self.ws_priv_subscribe(request, move |event| {
362            if let WebsocketEvents::ExecutionEvent(execute) = event {
363                for v in execute.data {
364                    sender.send(v).unwrap();
365                }
366            }
367            Ok(())
368        })
369        .await
370    }
371
372    pub async fn ws_fast_exec(
373        &self,
374        sender: mpsc::UnboundedSender<FastExecData>,
375    ) -> Result<(), BybitError> {
376        let sub_str = "execution.fast";
377        let request = Subscription::new("subscribe", vec![sub_str]);
378
379        self.ws_priv_subscribe(request, move |event| {
380            if let WebsocketEvents::FastExecEvent(execution) = event {
381                for v in execution.data {
382                    sender.send(v).unwrap();
383                }
384            }
385            Ok(())
386        })
387        .await
388    }
389
390    pub async fn ws_orders(
391        &self,
392        cat: Option<Category>,
393        sender: mpsc::UnboundedSender<OrderData>,
394    ) -> Result<(), BybitError> {
395        let sub_str = if let Some(v) = cat {
396            match v {
397                Category::Linear => "order.linear",
398                Category::Inverse => "order.inverse",
399                Category::Spot => "order.spot",
400                Category::Option => "order.option",
401            }
402        } else {
403            "order"
404        };
405
406        let request = Subscription::new("subscribe", vec![sub_str]);
407        self.ws_priv_subscribe(request, move |event| {
408            if let WebsocketEvents::OrderEvent(order) = event {
409                for v in order.data {
410                    sender.send(v).unwrap();
411                }
412            }
413            Ok(())
414        })
415        .await
416    }
417
418    pub async fn ws_wallet(
419        &self,
420        sender: mpsc::UnboundedSender<WalletData>,
421    ) -> Result<(), BybitError> {
422        let sub_str = "wallet";
423        let request = Subscription::new("subscribe", vec![sub_str]);
424        self.ws_priv_subscribe(request, move |event| {
425            if let WebsocketEvents::Wallet(wallet) = event {
426                for v in wallet.data {
427                    sender.send(v).unwrap();
428                }
429            }
430            Ok(())
431        })
432        .await
433    }
434
435    pub async fn ws_trade_stream<'a, F>(
436        &self,
437        req: mpsc::UnboundedReceiver<RequestType<'a>>,
438        handler: F,
439    ) -> Result<(), BybitError>
440    where
441        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
442        'a: 'static,
443    {
444        let response = self
445            .client
446            .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
447            .await?;
448        Self::event_loop(response, handler, Some(req)).await?;
449
450        Ok(())
451    }
452
453    pub async fn event_loop<'a, H>(
454        mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
455        mut handler: H,
456        mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
457    ) -> Result<(), BybitError>
458    where
459        H: WebSocketHandler,
460    {
461        let mut interval = Instant::now();
462        loop {
463            let msg = stream.next().await;
464            match msg {
465                Some(Ok(WsMessage::Text(msg))) => {
466                    if let Err(_) = handler.handle_msg(&msg) {
467                        return Err(BybitError::Base(
468                            "Error handling stream message".to_string(),
469                        ));
470                    }
471                }
472                Some(Err(e)) => {
473                    return Err(BybitError::from(e.to_string()));
474                }
475                None => {
476                    return Err(BybitError::Base("Stream was closed".to_string()));
477                }
478                _ => {}
479            }
480            if let Some(sender) = order_sender.as_mut() {
481                if let Some(v) = sender.recv().await {
482                    let order_req = Self::build_trade_subscription(v, Some(3000));
483                    stream.send(WsMessage::Text(order_req)).await?;
484                }
485            }
486
487            if interval.elapsed() > Duration::from_secs(300) {
488                let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
489                if order_sender.is_none() {
490                    parameters.insert("req_id".into(), generate_random_uid(8).into());
491                }
492                parameters.insert("op".into(), "ping".into());
493                let request = build_json_request(&parameters);
494                let _ = stream
495                    .send(WsMessage::Text(request))
496                    .await
497                    .map_err(BybitError::from);
498                interval = Instant::now();
499            }
500        }
501    }
502}
503
504pub trait WebSocketHandler {
505    type Event;
506    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
507}
508
509impl<F> WebSocketHandler for F
510where
511    F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
512{
513    type Event = WebsocketEvents;
514    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
515        let update: Value = serde_json::from_str(msg)?;
516        if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
517            self(event)?;
518        }
519
520        Ok(())
521    }
522}