bybit/
ws.rs

1use crate::prelude::*;
2
3use futures::{SinkExt, StreamExt};
4use log::trace;
5use std::time::Instant;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc;
8use tokio::time::Duration;
9use tokio_tungstenite::WebSocketStream;
10use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
11
12#[derive(Clone)]
13pub struct Stream {
14    pub client: Client,
15}
16
17impl Stream {
18    /// Tests for connectivity by sending a ping request to the Bybit server.
19    ///
20    /// # Returns
21    ///
22    /// Returns a `Result` containing a `String` with the response message if successful,
23
24    /// * `private` is set to `true` if the request is for a private endpoint
25    /// or a `BybitError` if an error occurs.
26    pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
27        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
28        parameters.insert("req_id".into(), generate_random_uid(8).into());
29        parameters.insert("op".into(), "ping".into());
30        let request = build_json_request(&parameters);
31        let endpoint = if private {
32            WebsocketAPI::Private
33        } else {
34            WebsocketAPI::PublicLinear
35        };
36        let mut response = self
37            .client
38            .wss_connect(endpoint, Some(request), private, None)
39            .await?;
40        let Some(data) = response.next().await else {
41            return Err(BybitError::Base(
42                "Failed to receive ping response".to_string(),
43            ));
44        };
45
46        let data = data
47            .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
48        if let WsMessage::Text(data) = data {
49            let response: PongResponse = serde_json::from_str(&data)?;
50            match response {
51                PongResponse::PublicPong(pong) => {
52                    trace!("Pong received successfully: {:#?}", pong);
53                }
54                PongResponse::PrivatePong(pong) => {
55                    trace!("Pong received successfully: {:#?}", pong);
56                }
57            }
58        }
59        Ok(())
60    }
61
62    pub async fn ws_priv_subscribe<'b, F>(
63        &self,
64        req: Subscription<'_>,
65        handler: F,
66    ) -> Result<(), BybitError>
67    where
68        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
69    {
70        let request = Self::build_subscription(req);
71        let response = self
72            .client
73            .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
74            .await?;
75        if let Ok(_) = Self::event_loop(response, handler, None).await {}
76        Ok(())
77    }
78
79    pub async fn ws_subscribe<'b, F>(
80        &self,
81        req: Subscription<'_>,
82        category: Category,
83        handler: F,
84    ) -> Result<(), BybitError>
85    where
86        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
87    {
88        let endpoint = {
89            match category {
90                Category::Linear => WebsocketAPI::PublicLinear,
91                Category::Inverse => WebsocketAPI::PublicInverse,
92                Category::Spot => WebsocketAPI::PublicSpot,
93                _ => unimplemented!("Option has not been implemented"),
94            }
95        };
96        let request = Self::build_subscription(req);
97        let response = self
98            .client
99            .wss_connect(endpoint, Some(request), false, None)
100            .await?;
101        Self::event_loop(response, handler, None).await?;
102        Ok(())
103    }
104
105    pub fn build_subscription(action: Subscription) -> String {
106        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
107        parameters.insert("req_id".into(), generate_random_uid(8).into());
108        parameters.insert("op".into(), action.op.into());
109        let args_value: Value = action
110            .args
111            .iter()
112            .map(ToString::to_string)
113            .collect::<Vec<_>>()
114            .into();
115        parameters.insert("args".into(), args_value);
116
117        build_json_request(&parameters)
118    }
119
120    pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
121        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
122        parameters.insert("reqId".into(), generate_random_uid(16).into());
123        let mut header_map: BTreeMap<String, String> = BTreeMap::new();
124        header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
125        header_map.insert(
126            "X-BAPI-RECV-WINDOW".into(),
127            recv_window.unwrap_or(5000).to_string(),
128        );
129        parameters.insert("header".into(), json!(header_map));
130        match orders {
131            RequestType::Create(order) => {
132                parameters.insert("op".into(), "order.create".into());
133                parameters.insert("args".into(), build_ws_orders(RequestType::Create(order)));
134            }
135            RequestType::CreateBatch(order) => {
136                parameters.insert("op".into(), "order.create-batch".into());
137                parameters.insert(
138                    "args".into(),
139                    build_ws_orders(RequestType::CreateBatch(order)),
140                );
141            }
142            RequestType::Amend(order) => {
143                parameters.insert("op".into(), "order.amend".into());
144                parameters.insert("args".into(), build_ws_orders(RequestType::Amend(order)));
145            }
146            RequestType::AmendBatch(order) => {
147                parameters.insert("op".into(), "order.amend-batch".into());
148                parameters.insert(
149                    "args".into(),
150                    build_ws_orders(RequestType::AmendBatch(order)),
151                );
152            }
153            RequestType::Cancel(order) => {
154                parameters.insert("op".into(), "order.cancel".into());
155                parameters.insert("args".into(), build_ws_orders(RequestType::Cancel(order)));
156            }
157            RequestType::CancelBatch(order) => {
158                parameters.insert("op".into(), "order.cancel-batch".into());
159                parameters.insert(
160                    "args".into(),
161                    build_ws_orders(RequestType::CancelBatch(order)),
162                );
163            }
164        }
165        build_json_request(&parameters)
166    }
167
168    /// Subscribes to the specified order book updates and handles the order book events
169    ///
170    /// # Arguments
171    ///
172    /// * `subs` - A vector of tuples containing the order book ID and symbol
173    /// * `category` - The category of the order book
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// use your_crate_name::Category;
179    /// let subs = vec![(1, "BTC"), (2, "ETH")];
180    /// ```
181    pub async fn ws_orderbook(
182        &self,
183        subs: Vec<(i32, &str)>,
184        category: Category,
185        sender: mpsc::UnboundedSender<OrderBookUpdate>,
186    ) -> Result<(), BybitError> {
187        let arr: Vec<String> = subs
188            .into_iter()
189            .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
190            .collect();
191        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
192        self.ws_subscribe(request, category, move |event| {
193            if let WebsocketEvents::OrderBookEvent(order_book) = event {
194                sender
195                    .send(order_book)
196                    .map_err(|e| BybitError::ChannelSendError {
197                        underlying: e.to_string(),
198                    })?;
199            }
200            Ok(())
201        })
202        .await
203    }
204
205    /// This function subscribes to the specified trades and handles the trade events.
206    /// # Arguments
207    ///
208    /// * `subs` - A vector of trade subscriptions
209    /// * `category` - The category of the trades
210    ///
211    /// # Example
212    ///
213    /// ```
214    /// use your_crate_name::Category;
215    /// let subs = vec!["BTCUSD", "ETHUSD"];
216    /// let category = Category::Linear;
217    /// ws_trades(subs, category);
218    /// ```
219    pub async fn ws_trades(
220        &self,
221        subs: Vec<&str>,
222        category: Category,
223        sender: mpsc::UnboundedSender<WsTrade>,
224    ) -> Result<(), BybitError> {
225        let arr: Vec<String> = subs
226            .iter()
227            .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
228            .collect();
229        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
230        let handler = move |event| {
231            if let WebsocketEvents::TradeEvent(trades) = event {
232                for trade in trades.data {
233                    sender
234                        .send(trade)
235                        .map_err(|e| BybitError::ChannelSendError {
236                            underlying: e.to_string(),
237                        })?;
238                }
239            }
240            Ok(())
241        };
242
243        self.ws_subscribe(request, category, handler).await
244    }
245
246    /// Subscribes to ticker events for the specified symbols and category.
247    ///
248    /// # Arguments
249    ///
250    /// * `subs` - A vector of symbols for which ticker events are subscribed.
251    /// * `category` - The category for which ticker events are subscribed.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// use your_crate_name::Category;
257    /// let subs = vec!["BTCUSD", "ETHUSD"];
258    /// let category = Category::Linear;
259    /// let sender = UnboundedSender<Ticker>;
260    /// ws_tickers(subs, category, sender);
261    /// ```
262    pub async fn ws_tickers(
263        &self,
264        subs: Vec<&str>,
265        category: Category,
266        sender: mpsc::UnboundedSender<Ticker>,
267    ) -> Result<(), BybitError> {
268        self._ws_tickers(subs, category, sender, |ws_ticker| ws_ticker.data)
269            .await
270    }
271
272    /// Subscribes to ticker events with timestamp for the specified symbols and category.
273    ///
274    /// # Arguments
275    ///
276    /// * `subs` - A vector of symbols for which ticker events are subscribed.
277    /// * `category` - The category for which ticker events are subscribed.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use your_crate_name::Category;
283    /// let subs = vec!["BTCUSD", "ETHUSD"];
284    /// let category = Category::Linear;
285    /// let sender = UnboundedSender<Ticker>;
286    /// ws_timed_tickers(subs, category, sender);
287    /// ```
288    pub async fn ws_timed_tickers(
289        &self,
290        subs: Vec<&str>,
291        category: Category,
292        sender: mpsc::UnboundedSender<Timed<Ticker>>,
293    ) -> Result<(), BybitError> {
294        self._ws_tickers(subs, category, sender, |ticker| Timed {
295            time: ticker.ts,
296            data: ticker.data,
297        })
298        .await
299    }
300
301    async fn _ws_tickers<T, F>(
302        &self,
303        subs: Vec<&str>,
304        category: Category,
305        sender: mpsc::UnboundedSender<T>,
306        map: F,
307    ) -> Result<(), BybitError>
308    where
309        T: 'static + Sync + Send,
310        F: 'static + Sync + Send + Fn(WsTicker) -> T,
311    {
312        let arr: Vec<String> = subs
313            .into_iter()
314            .map(|sub| format!("tickers.{}", sub.to_uppercase()))
315            .collect();
316        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
317
318        let handler = move |event| {
319            if let WebsocketEvents::TickerEvent(ticker) = event {
320                let mapped = map(ticker);
321                sender
322                    .send(mapped)
323                    .map_err(|e| BybitError::ChannelSendError {
324                        underlying: e.to_string(),
325                    })?;
326            }
327            Ok(())
328        };
329
330        self.ws_subscribe(request, category, handler).await
331    }
332    pub async fn ws_liquidations(
333        &self,
334        subs: Vec<&str>,
335        category: Category,
336        sender: mpsc::UnboundedSender<LiquidationData>,
337    ) -> Result<(), BybitError> {
338        let arr: Vec<String> = subs
339            .into_iter()
340            .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
341            .collect();
342        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
343
344        let handler = move |event| {
345            if let WebsocketEvents::LiquidationEvent(liquidation) = event {
346                sender
347                    .send(liquidation.data)
348                    .map_err(|e| BybitError::ChannelSendError {
349                        underlying: e.to_string(),
350                    })?;
351            }
352            Ok(())
353        };
354
355        self.ws_subscribe(request, category, handler).await
356    }
357    pub async fn ws_klines(
358        &self,
359        subs: Vec<(&str, &str)>,
360        category: Category,
361        sender: mpsc::UnboundedSender<WsKline>,
362    ) -> Result<(), BybitError> {
363        let arr: Vec<String> = subs
364            .into_iter()
365            .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
366            .collect();
367        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
368        self.ws_subscribe(request, category, move |event| {
369            if let WebsocketEvents::KlineEvent(kline) = event {
370                sender
371                    .send(kline)
372                    .map_err(|e| BybitError::ChannelSendError {
373                        underlying: e.to_string(),
374                    })?;
375            }
376            Ok(())
377        })
378        .await
379    }
380
381    pub async fn ws_position(
382        &self,
383        cat: Option<Category>,
384        sender: mpsc::UnboundedSender<PositionData>,
385    ) -> Result<(), BybitError> {
386        let sub_str = if let Some(v) = cat {
387            match v {
388                Category::Linear => "position.linear",
389                Category::Inverse => "position.inverse",
390                _ => "",
391            }
392        } else {
393            "position"
394        };
395
396        let request = Subscription::new("subscribe", vec![sub_str]);
397        self.ws_priv_subscribe(request, move |event| {
398            if let WebsocketEvents::PositionEvent(position) = event {
399                for v in position.data {
400                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
401                        underlying: e.to_string(),
402                    })?;
403                }
404            }
405            Ok(())
406        })
407        .await
408    }
409
410    pub async fn ws_executions(
411        &self,
412        cat: Option<Category>,
413        sender: mpsc::UnboundedSender<ExecutionData>,
414    ) -> Result<(), BybitError> {
415        let sub_str = if let Some(v) = cat {
416            match v {
417                Category::Linear => "execution.linear",
418                Category::Inverse => "execution.inverse",
419                Category::Spot => "execution.spot",
420                Category::Option => "execution.option",
421            }
422        } else {
423            "execution"
424        };
425
426        let request = Subscription::new("subscribe", vec![sub_str]);
427        self.ws_priv_subscribe(request, move |event| {
428            if let WebsocketEvents::ExecutionEvent(execute) = event {
429                for v in execute.data {
430                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
431                        underlying: e.to_string(),
432                    })?;
433                }
434            }
435            Ok(())
436        })
437        .await
438    }
439
440    pub async fn ws_fast_exec(
441        &self,
442        sender: mpsc::UnboundedSender<FastExecData>,
443    ) -> Result<(), BybitError> {
444        let sub_str = "execution.fast";
445        let request = Subscription::new("subscribe", vec![sub_str]);
446
447        self.ws_priv_subscribe(request, move |event| {
448            if let WebsocketEvents::FastExecEvent(execution) = event {
449                for v in execution.data {
450                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
451                        underlying: e.to_string(),
452                    })?;
453                }
454            }
455            Ok(())
456        })
457        .await
458    }
459
460    pub async fn ws_orders(
461        &self,
462        cat: Option<Category>,
463        sender: mpsc::UnboundedSender<OrderData>,
464    ) -> Result<(), BybitError> {
465        let sub_str = if let Some(v) = cat {
466            match v {
467                Category::Linear => "order.linear",
468                Category::Inverse => "order.inverse",
469                Category::Spot => "order.spot",
470                Category::Option => "order.option",
471            }
472        } else {
473            "order"
474        };
475
476        let request = Subscription::new("subscribe", vec![sub_str]);
477        self.ws_priv_subscribe(request, move |event| {
478            if let WebsocketEvents::OrderEvent(order) = event {
479                for v in order.data {
480                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
481                        underlying: e.to_string(),
482                    })?;
483                }
484            }
485            Ok(())
486        })
487        .await
488    }
489
490    pub async fn ws_wallet(
491        &self,
492        sender: mpsc::UnboundedSender<WalletData>,
493    ) -> Result<(), BybitError> {
494        let sub_str = "wallet";
495        let request = Subscription::new("subscribe", vec![sub_str]);
496        self.ws_priv_subscribe(request, move |event| {
497            if let WebsocketEvents::Wallet(wallet) = event {
498                for v in wallet.data {
499                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
500                        underlying: e.to_string(),
501                    })?;
502                }
503            }
504            Ok(())
505        })
506        .await
507    }
508
509    pub async fn ws_trade_stream<'a, F>(
510        &self,
511        req: mpsc::UnboundedReceiver<RequestType<'a>>,
512        handler: F,
513    ) -> Result<(), BybitError>
514    where
515        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
516        'a: 'static,
517    {
518        let response = self
519            .client
520            .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
521            .await?;
522        Self::event_loop(response, handler, Some(req)).await?;
523
524        Ok(())
525    }
526
527    pub async fn event_loop<'a, H>(
528        mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
529        mut handler: H,
530        mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
531    ) -> Result<(), BybitError>
532    where
533        H: WebSocketHandler,
534    {
535        let mut interval = Instant::now();
536        loop {
537            let msg = stream.next().await;
538            match msg {
539                Some(Ok(WsMessage::Text(msg))) => {
540                    if let Err(_) = handler.handle_msg(&msg) {
541                        return Err(BybitError::Base(
542                            "Error handling stream message".to_string(),
543                        ));
544                    }
545                }
546                Some(Err(e)) => {
547                    return Err(BybitError::from(e.to_string()));
548                }
549                None => {
550                    return Err(BybitError::Base("Stream was closed".to_string()));
551                }
552                _ => {}
553            }
554            if let Some(sender) = order_sender.as_mut() {
555                if let Some(v) = sender.recv().await {
556                    let order_req = Self::build_trade_subscription(v, Some(3000));
557                    stream.send(WsMessage::Text(order_req)).await?;
558                }
559            }
560
561            if interval.elapsed() > Duration::from_secs(300) {
562                let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
563                if order_sender.is_none() {
564                    parameters.insert("req_id".into(), generate_random_uid(8).into());
565                }
566                parameters.insert("op".into(), "ping".into());
567                let request = build_json_request(&parameters);
568                let _ = stream
569                    .send(WsMessage::Text(request))
570                    .await
571                    .map_err(BybitError::from);
572                interval = Instant::now();
573            }
574        }
575    }
576}
577
578pub trait WebSocketHandler {
579    type Event;
580    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
581}
582
583impl<F> WebSocketHandler for F
584where
585    F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
586{
587    type Event = WebsocketEvents;
588    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
589        let update: Value = serde_json::from_str(msg)?;
590        if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
591            self(event)?;
592        }
593
594        Ok(())
595    }
596}