bybit/
ws.rs

1use crate::api::WebsocketAPI;
2use crate::client::Client;
3use crate::errors::BybitError;
4use crate::model::{
5    Category, ExecutionData, FastExecData, LiquidationData, OrderBookUpdate, OrderData,
6    PongResponse, PositionData, RequestType, Subscription, Tickers, WalletData, WebsocketEvents,
7    WsKline, WsTrade,
8};
9use crate::trade::build_ws_orders;
10use crate::util::{build_json_request, generate_random_uid, get_timestamp};
11use futures::{SinkExt, StreamExt};
12use log::trace;
13use serde_json::{json, Value};
14use std::collections::BTreeMap;
15use std::time::Instant;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc;
18use tokio::time::Duration;
19use tokio_tungstenite::WebSocketStream;
20use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
21
22#[derive(Clone)]
23pub struct Stream {
24    pub client: Client,
25}
26
27impl Stream {
28    /// Tests for connectivity by sending a ping request to the Bybit server.
29    ///
30    /// # Returns
31    ///
32    /// Returns a `Result` containing a `String` with the response message if successful,
33    /// * `private` is set to `true` if the request is for a private endpoint
34    /// or a `BybitError` if an error occurs.
35    pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
36        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
37        parameters.insert("req_id".into(), generate_random_uid(8).into());
38        parameters.insert("op".into(), "ping".into());
39        let request = build_json_request(&parameters);
40        let endpoint = if private {
41            WebsocketAPI::Private
42        } else {
43            WebsocketAPI::PublicLinear
44        };
45        let mut response = self
46            .client
47            .wss_connect(endpoint, Some(request), private, None)
48            .await?;
49        let Some(data) = response.next().await else {
50            return Err(BybitError::Base(
51                "Failed to receive ping response".to_string(),
52            ));
53        };
54
55        let data = data
56            .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
57        match data {
58            WsMessage::Text(data) => {
59                let response: PongResponse = serde_json::from_str(&data)?;
60                match response {
61                    PongResponse::PublicPong(pong) => {
62                        trace!("Pong received successfully: {:#?}", pong);
63                    }
64                    PongResponse::PrivatePong(pong) => {
65                        trace!("Pong received successfully: {:#?}", pong);
66                    }
67                }
68            }
69            _ => {}
70        }
71        Ok(())
72    }
73
74    pub async fn ws_priv_subscribe<'b, F>(
75        &self,
76        req: Subscription<'_>,
77        handler: F,
78    ) -> Result<(), BybitError>
79    where
80        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
81    {
82        let request = Self::build_subscription(req);
83        let response = self
84            .client
85            .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
86            .await?;
87        match Self::event_loop(response, handler, None).await {
88            Ok(_) => {}
89            Err(_) => {}
90        }
91        Ok(())
92    }
93
94    pub async fn ws_subscribe<'b, F>(
95        &self,
96        req: Subscription<'_>,
97        category: Category,
98        handler: F,
99    ) -> Result<(), BybitError>
100    where
101        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
102    {
103        let endpoint = {
104            match category {
105                Category::Linear => WebsocketAPI::PublicLinear,
106                Category::Inverse => WebsocketAPI::PublicInverse,
107                Category::Spot => WebsocketAPI::PublicSpot,
108                _ => unimplemented!("Option has not been implemented"),
109            }
110        };
111        let request = Self::build_subscription(req);
112        let response = self
113            .client
114            .wss_connect(endpoint, Some(request), false, None)
115            .await?;
116        Self::event_loop(response, handler, None).await?;
117        Ok(())
118    }
119
120    pub fn build_subscription(action: Subscription) -> String {
121        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
122        parameters.insert("req_id".into(), generate_random_uid(8).into());
123        parameters.insert("op".into(), action.op.into());
124        let args_value: Value = action
125            .args
126            .iter()
127            .map(ToString::to_string)
128            .collect::<Vec<_>>()
129            .into();
130        parameters.insert("args".into(), args_value);
131
132        build_json_request(&parameters)
133    }
134
135    pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
136        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
137        parameters.insert("reqId".into(), generate_random_uid(16).into());
138        let mut header_map: BTreeMap<String, String> = BTreeMap::new();
139        header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
140        header_map.insert(
141            "X-BAPI-RECV-WINDOW".into(),
142            recv_window.unwrap_or(5000).to_string(),
143        );
144        parameters.insert("header".into(), json!(header_map).into());
145        match orders {
146            RequestType::Create(order) => {
147                parameters.insert("op".into(), "order.create".into());
148                parameters.insert(
149                    "args".into(),
150                    build_ws_orders(RequestType::Create(order)).into(),
151                );
152            }
153            RequestType::Cancel(order) => {
154                parameters.insert("op".into(), "order.cancel".into());
155                parameters.insert(
156                    "args".into(),
157                    build_ws_orders(RequestType::Cancel(order)).into(),
158                );
159            }
160
161            RequestType::Amend(order) => {
162                parameters.insert("op".into(), "order.amend".into());
163                parameters.insert(
164                    "args".into(),
165                    build_ws_orders(RequestType::Amend(order)).into(),
166                );
167            }
168        }
169        build_json_request(&parameters)
170    }
171
172    /// Subscribes to the specified order book updates and handles the order book events
173    ///
174    /// # Arguments
175    ///
176    /// * `subs` - A vector of tuples containing the order book ID and symbol
177    /// * `category` - The category of the order book
178    ///
179    /// # Example
180    ///
181    /// ```
182    /// use your_crate_name::Category;
183    /// let subs = vec![(1, "BTC"), (2, "ETH")];
184    /// ```
185    pub async fn ws_orderbook(
186        &self,
187        subs: Vec<(i32, &str)>,
188        category: Category,
189        sender: mpsc::UnboundedSender<OrderBookUpdate>,
190    ) -> Result<(), BybitError> {
191        let arr: Vec<String> = subs
192            .into_iter()
193            .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
194            .collect();
195        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
196        self.ws_subscribe(request, category, move |event| {
197            if let WebsocketEvents::OrderBookEvent(order_book) = event {
198                sender
199                    .send(order_book)
200                    .map_err(|e| BybitError::ChannelSendError {
201                        underlying: e.to_string(),
202                    })?;
203            }
204            Ok(())
205        })
206        .await
207    }
208
209    /// This function subscribes to the specified trades and handles the trade events.
210    /// # Arguments
211    ///
212    /// * `subs` - A vector of trade subscriptions
213    /// * `category` - The category of the trades
214    ///
215    /// # Example
216    ///
217    /// ```
218    /// use your_crate_name::Category;
219    /// let subs = vec!["BTCUSD", "ETHUSD"];
220    /// let category = Category::Linear;
221    /// ws_trades(subs, category);
222    /// ```
223    pub async fn ws_trades(
224        &self,
225        subs: Vec<&str>,
226        category: Category,
227        sender: mpsc::UnboundedSender<WsTrade>,
228    ) -> Result<(), BybitError> {
229        let arr: Vec<String> = subs
230            .iter()
231            .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
232            .collect();
233        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
234        let handler = move |event| {
235            if let WebsocketEvents::TradeEvent(trades) = event {
236                for trade in trades.data {
237                    sender
238                        .send(trade)
239                        .map_err(|e| BybitError::ChannelSendError {
240                            underlying: e.to_string(),
241                        })?;
242                }
243            }
244            Ok(())
245        };
246
247        self.ws_subscribe(request, category, handler).await
248    }
249
250    /// Subscribes to ticker events for the specified symbols and category.
251    ///
252    /// # Arguments
253    ///
254    /// * `subs` - A vector of symbols for which ticker events are subscribed.
255    /// * `category` - The category for which ticker events are subscribed.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// use your_crate_name::Category;
261    /// let subs = vec!["BTCUSD", "ETHUSD"];
262    /// let category = Category::Linear;
263    /// let sender = UnboundedSender<Tickers>;
264    /// ws_tickers(subs, category, sender);
265    /// ```
266    pub async fn ws_tickers(
267        &self,
268        subs: Vec<&str>,
269        category: Category,
270        sender: mpsc::UnboundedSender<Tickers>,
271    ) -> Result<(), BybitError> {
272        let arr: Vec<String> = subs
273            .into_iter()
274            .map(|sub| format!("tickers.{}", sub.to_uppercase()))
275            .collect();
276        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
277
278        let handler =
279            move |event| {
280                if let WebsocketEvents::TickerEvent(tickers) = event {
281                    match tickers.data {
282                        Tickers::Linear(linear_ticker) => {
283                            sender.send(Tickers::Linear(linear_ticker)).map_err(|e| {
284                                BybitError::ChannelSendError {
285                                    underlying: e.to_string(),
286                                }
287                            })?;
288                        }
289                        Tickers::Spot(spot_ticker) => sender
290                            .send(Tickers::Spot(spot_ticker))
291                            .map_err(|e| BybitError::ChannelSendError {
292                                underlying: e.to_string(),
293                            })?,
294                    }
295                }
296                Ok(())
297            };
298
299        self.ws_subscribe(request, category, handler).await
300    }
301    pub async fn ws_liquidations(
302        &self,
303        subs: Vec<&str>,
304        category: Category,
305        sender: mpsc::UnboundedSender<LiquidationData>,
306    ) -> Result<(), BybitError> {
307        let arr: Vec<String> = subs
308            .into_iter()
309            .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
310            .collect();
311        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
312
313        let handler = move |event| {
314            if let WebsocketEvents::LiquidationEvent(liquidation) = event {
315                sender
316                    .send(liquidation.data)
317                    .map_err(|e| BybitError::ChannelSendError {
318                        underlying: e.to_string(),
319                    })?;
320            }
321            Ok(())
322        };
323
324        self.ws_subscribe(request, category, handler).await
325    }
326    pub async fn ws_klines(
327        &self,
328        subs: Vec<(&str, &str)>,
329        category: Category,
330        sender: mpsc::UnboundedSender<WsKline>,
331    ) -> Result<(), BybitError> {
332        let arr: Vec<String> = subs
333            .into_iter()
334            .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
335            .collect();
336        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
337        self.ws_subscribe(request, category, move |event| {
338            if let WebsocketEvents::KlineEvent(kline) = event {
339                sender
340                    .send(kline)
341                    .map_err(|e| BybitError::ChannelSendError {
342                        underlying: e.to_string(),
343                    })?;
344            }
345            Ok(())
346        })
347        .await
348    }
349
350    pub async fn ws_position(
351        &self,
352        cat: Option<Category>,
353        sender: mpsc::UnboundedSender<PositionData>,
354    ) -> Result<(), BybitError> {
355        let sub_str = if let Some(v) = cat {
356            match v {
357                Category::Linear => "position.linear",
358                Category::Inverse => "position.inverse",
359                _ => "",
360            }
361        } else {
362            "position"
363        };
364
365        let request = Subscription::new("subscribe", vec![sub_str]);
366        self.ws_priv_subscribe(request, move |event| {
367            if let WebsocketEvents::PositionEvent(position) = event {
368                for v in position.data {
369                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
370                        underlying: e.to_string(),
371                    })?;
372                }
373            }
374            Ok(())
375        })
376        .await
377    }
378
379    pub async fn ws_executions(
380        &self,
381        cat: Option<Category>,
382        sender: mpsc::UnboundedSender<ExecutionData>,
383    ) -> Result<(), BybitError> {
384        let sub_str = if let Some(v) = cat {
385            match v {
386                Category::Linear => "execution.linear",
387                Category::Inverse => "execution.inverse",
388                Category::Spot => "execution.spot",
389                Category::Option => "execution.option",
390            }
391        } else {
392            "execution"
393        };
394
395        let request = Subscription::new("subscribe", vec![sub_str]);
396        self.ws_priv_subscribe(request, move |event| {
397            if let WebsocketEvents::ExecutionEvent(execute) = event {
398                for v in execute.data {
399                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
400                        underlying: e.to_string(),
401                    })?;
402                }
403            }
404            Ok(())
405        })
406        .await
407    }
408
409    pub async fn ws_fast_exec(
410        &self,
411        sender: mpsc::UnboundedSender<FastExecData>,
412    ) -> Result<(), BybitError> {
413        let sub_str = "execution.fast";
414        let request = Subscription::new("subscribe", vec![sub_str]);
415
416        self.ws_priv_subscribe(request, move |event| {
417            if let WebsocketEvents::FastExecEvent(execution) = event {
418                for v in execution.data {
419                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
420                        underlying: e.to_string(),
421                    })?;
422                }
423            }
424            Ok(())
425        })
426        .await
427    }
428
429    pub async fn ws_orders(
430        &self,
431        cat: Option<Category>,
432        sender: mpsc::UnboundedSender<OrderData>,
433    ) -> Result<(), BybitError> {
434        let sub_str = if let Some(v) = cat {
435            match v {
436                Category::Linear => "order.linear",
437                Category::Inverse => "order.inverse",
438                Category::Spot => "order.spot",
439                Category::Option => "order.option",
440            }
441        } else {
442            "order"
443        };
444
445        let request = Subscription::new("subscribe", vec![sub_str]);
446        self.ws_priv_subscribe(request, move |event| {
447            if let WebsocketEvents::OrderEvent(order) = event {
448                for v in order.data {
449                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
450                        underlying: e.to_string(),
451                    })?;
452                }
453            }
454            Ok(())
455        })
456        .await
457    }
458
459    pub async fn ws_wallet(
460        &self,
461        sender: mpsc::UnboundedSender<WalletData>,
462    ) -> Result<(), BybitError> {
463        let sub_str = "wallet";
464        let request = Subscription::new("subscribe", vec![sub_str]);
465        self.ws_priv_subscribe(request, move |event| {
466            if let WebsocketEvents::Wallet(wallet) = event {
467                for v in wallet.data {
468                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
469                        underlying: e.to_string(),
470                    })?;
471                }
472            }
473            Ok(())
474        })
475        .await
476    }
477
478    pub async fn ws_trade_stream<'a, F>(
479        &self,
480        req: mpsc::UnboundedReceiver<RequestType<'a>>,
481        handler: F,
482    ) -> Result<(), BybitError>
483    where
484        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
485        'a: 'static,
486    {
487        let response = self
488            .client
489            .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
490            .await?;
491        Self::event_loop(response, handler, Some(req)).await?;
492
493        Ok(())
494    }
495
496    pub async fn event_loop<'a, H>(
497        mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
498        mut handler: H,
499        mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
500    ) -> Result<(), BybitError>
501    where
502        H: WebSocketHandler,
503    {
504        let mut interval = Instant::now();
505        loop {
506            let msg = stream.next().await;
507            match msg {
508                Some(Ok(WsMessage::Text(msg))) => {
509                    if let Err(_) = handler.handle_msg(&msg) {
510                        return Err(BybitError::Base(
511                            "Error handling stream message".to_string(),
512                        ));
513                    }
514                }
515                Some(Err(e)) => {
516                    return Err(BybitError::from(e.to_string()));
517                }
518                None => {
519                    return Err(BybitError::Base("Stream was closed".to_string()));
520                }
521                _ => {}
522            }
523            if let Some(sender) = order_sender.as_mut() {
524                if let Some(v) = sender.recv().await {
525                    let order_req = Self::build_trade_subscription(v, Some(3000));
526                    stream.send(WsMessage::Text(order_req)).await?;
527                }
528            }
529
530            if interval.elapsed() > Duration::from_secs(300) {
531                let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
532                if order_sender.is_none() {
533                    parameters.insert("req_id".into(), generate_random_uid(8).into());
534                }
535                parameters.insert("op".into(), "ping".into());
536                let request = build_json_request(&parameters);
537                let _ = stream
538                    .send(WsMessage::Text(request))
539                    .await
540                    .map_err(BybitError::from);
541                interval = Instant::now();
542            }
543        }
544    }
545}
546
547pub trait WebSocketHandler {
548    type Event;
549    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
550}
551
552impl<F> WebSocketHandler for F
553where
554    F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
555{
556    type Event = WebsocketEvents;
557    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
558        let update: Value = serde_json::from_str(msg)?;
559        if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
560            self(event)?;
561        }
562
563        Ok(())
564    }
565}