Skip to main content

bybit/
ws.rs

1use crate::prelude::*;
2
3use futures::{SinkExt, StreamExt};
4use log::trace;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::net::TcpStream;
9use tokio::sync::mpsc;
10use tokio::time::Duration;
11use tokio_tungstenite::WebSocketStream;
12use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
13
14#[derive(Clone)]
15pub struct Stream {
16    pub client: Client,
17}
18
19impl Stream {
20    /// Tests for connectivity by sending a ping request to the Bybit server.
21    ///
22    /// # Returns
23    ///
24    /// Returns a `Result` containing a `String` with the response message if successful,
25
26    /// * `private` is set to `true` if the request is for a private endpoint
27    /// or a `BybitError` if an error occurs.
28    pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
29        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
30        parameters.insert("req_id".into(), generate_random_uid(8).into());
31        parameters.insert("op".into(), "ping".into());
32        let request = build_json_request(&parameters);
33        let endpoint = if private {
34            WebsocketAPI::Private
35        } else {
36            WebsocketAPI::PublicLinear
37        };
38        let mut response = self
39            .client
40            .wss_connect(endpoint, Some(request), private, None)
41            .await?;
42        let Some(data) = response.next().await else {
43            return Err(BybitError::Base(
44                "Failed to receive ping response".to_string(),
45            ));
46        };
47
48        let data = data
49            .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
50        if let WsMessage::Text(data) = data {
51            let response: PongResponse = serde_json::from_str(&data)?;
52            match response {
53                PongResponse::PublicPong(pong) => {
54                    trace!("Pong received successfully: {:#?}", pong);
55                }
56                PongResponse::PrivatePong(pong) => {
57                    trace!("Pong received successfully: {:#?}", pong);
58                }
59            }
60        }
61        Ok(())
62    }
63
64    pub async fn ws_priv_subscribe<'b, F>(
65        &self,
66        req: Subscription<'_>,
67        handler: F,
68    ) -> Result<(), BybitError>
69    where
70        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
71    {
72        let request = Self::build_subscription(req);
73        let response = self
74            .client
75            .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
76            .await?;
77        if let Ok(_) = Self::event_loop(response, handler, None).await {}
78        Ok(())
79    }
80
81    pub async fn ws_subscribe<'b, F>(
82        &self,
83        req: Subscription<'_>,
84        category: Category,
85        handler: F,
86    ) -> Result<(), BybitError>
87    where
88        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
89    {
90        let endpoint = {
91            match category {
92                Category::Linear => WebsocketAPI::PublicLinear,
93                Category::Inverse => WebsocketAPI::PublicInverse,
94                Category::Spot => WebsocketAPI::PublicSpot,
95                Category::Option => WebsocketAPI::PublicOption,
96            }
97        };
98        let request = Self::build_subscription(req);
99        let response = self
100            .client
101            .wss_connect(endpoint, Some(request), false, None)
102            .await?;
103        Self::event_loop(response, handler, None).await?;
104        Ok(())
105    }
106
107    pub fn build_subscription(action: Subscription) -> String {
108        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
109        parameters.insert("req_id".into(), generate_random_uid(8).into());
110        parameters.insert("op".into(), action.op.into());
111        let args_value: Value = action
112            .args
113            .iter()
114            .map(ToString::to_string)
115            .collect::<Vec<_>>()
116            .into();
117        parameters.insert("args".into(), args_value);
118
119        build_json_request(&parameters)
120    }
121
122    pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
123        let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
124        parameters.insert("reqId".into(), generate_random_uid(16).into());
125        let mut header_map: BTreeMap<String, String> = BTreeMap::new();
126        header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
127        header_map.insert(
128            "X-BAPI-RECV-WINDOW".into(),
129            recv_window.unwrap_or(5000).to_string(),
130        );
131        parameters.insert("header".into(), json!(header_map));
132        match orders {
133            RequestType::Create(order) => {
134                parameters.insert("op".into(), "order.create".into());
135                parameters.insert("args".into(), build_ws_orders(RequestType::Create(order)));
136            }
137            RequestType::CreateBatch(order) => {
138                parameters.insert("op".into(), "order.create-batch".into());
139                parameters.insert(
140                    "args".into(),
141                    build_ws_orders(RequestType::CreateBatch(order)),
142                );
143            }
144            RequestType::Amend(order) => {
145                parameters.insert("op".into(), "order.amend".into());
146                parameters.insert("args".into(), build_ws_orders(RequestType::Amend(order)));
147            }
148            RequestType::AmendBatch(order) => {
149                parameters.insert("op".into(), "order.amend-batch".into());
150                parameters.insert(
151                    "args".into(),
152                    build_ws_orders(RequestType::AmendBatch(order)),
153                );
154            }
155            RequestType::Cancel(order) => {
156                parameters.insert("op".into(), "order.cancel".into());
157                parameters.insert("args".into(), build_ws_orders(RequestType::Cancel(order)));
158            }
159            RequestType::CancelBatch(order) => {
160                parameters.insert("op".into(), "order.cancel-batch".into());
161                parameters.insert(
162                    "args".into(),
163                    build_ws_orders(RequestType::CancelBatch(order)),
164                );
165            }
166        }
167        build_json_request(&parameters)
168    }
169
170    /// Subscribes to the specified order book updates and handles the order book events
171    ///
172    /// # Arguments
173    ///
174    /// * `subs` - A vector of tuples containing the order book ID and symbol
175    /// * `category` - The category of the order book
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use your_crate_name::Category;
181    /// let subs = vec![(1, "BTC"), (2, "ETH")];
182    /// ```
183    pub async fn ws_orderbook(
184        &self,
185        subs: Vec<(i32, &str)>,
186        category: Category,
187        sender: mpsc::UnboundedSender<OrderBookUpdate>,
188    ) -> Result<(), BybitError> {
189        let arr: Vec<String> = subs
190            .into_iter()
191            .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
192            .collect();
193        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
194        self.ws_subscribe(request, category, move |event| {
195            if let WebsocketEvents::OrderBookEvent(order_book) = event {
196                sender
197                    .send(order_book)
198                    .map_err(|e| BybitError::ChannelSendError {
199                        underlying: e.to_string(),
200                    })?;
201            }
202            Ok(())
203        })
204        .await
205    }
206
207    /// Subscribes to RPI (Real-time Price Improvement) orderbook stream.
208    ///
209    /// RPI orderbooks show both regular orders and RPI orders, which can provide price improvement for takers.
210    /// Push frequency: 100ms for Spot, Perpetual & Futures (level 50 data).
211    /// Topic format: `orderbook.rpi.{symbol}`
212    ///
213    /// # Arguments
214    ///
215    /// * `subs` - Vector of symbol strings to subscribe to (e.g., `vec!["BTCUSDT", "ETHUSDT"]`)
216    /// * `category` - Product category (Linear, Inverse, or Spot)
217    /// * `sender` - Channel sender for RPI orderbook updates
218    ///
219    /// # Returns
220    ///
221    /// Returns `Ok(())` if subscription succeeds, otherwise returns a `BybitError`.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// use rs_bybit::prelude::*;
227    /// use tokio::sync::mpsc;
228    ///
229    /// # #[tokio::main]
230    /// # async fn main() -> Result<(), BybitError> {
231    /// let client = Client::new("api_key", "api_secret", None, None)?;
232    /// let stream = Stream { client };
233    /// let (tx, mut rx) = mpsc::unbounded_channel();
234    ///
235    /// stream.ws_rpi_orderbook(vec!["BTCUSDT"], Category::Linear, tx).await?;
236    ///
237    /// while let Some(update) = rx.recv().await {
238    ///     println!("RPI Orderbook update: {:?}", update);
239    /// }
240    /// # Ok(())
241    /// # }
242    /// ```
243    pub async fn ws_rpi_orderbook(
244        &self,
245        subs: Vec<&str>,
246        category: Category,
247        sender: mpsc::UnboundedSender<RPIOrderbookUpdate>,
248    ) -> Result<(), BybitError> {
249        let arr: Vec<String> = subs
250            .into_iter()
251            .map(|sym| format!("orderbook.rpi.{}", sym.to_uppercase()))
252            .collect();
253        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
254        self.ws_subscribe(request, category, move |event| {
255            if let WebsocketEvents::RPIOrderBookEvent(rpi_order_book) = event {
256                sender
257                    .send(rpi_order_book)
258                    .map_err(|e| BybitError::ChannelSendError {
259                        underlying: e.to_string(),
260                    })?;
261            }
262            Ok(())
263        })
264        .await
265    }
266
267    /// This function subscribes to the specified trades and handles the trade events.
268    /// # Arguments
269    ///
270    /// * `subs` - A vector of trade subscriptions
271    /// * `category` - The category of the trades
272    ///
273    /// # Example
274    ///
275    /// ```
276    /// use your_crate_name::Category;
277    /// let subs = vec!["BTCUSD", "ETHUSD"];
278    /// let category = Category::Linear;
279    /// ws_trades(subs, category);
280    /// ```
281    pub async fn ws_trades(
282        &self,
283        subs: Vec<&str>,
284        category: Category,
285        sender: mpsc::UnboundedSender<WsTrade>,
286    ) -> Result<(), BybitError> {
287        let arr: Vec<String> = subs
288            .iter()
289            .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
290            .collect();
291        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
292        let handler = move |event| {
293            if let WebsocketEvents::TradeEvent(trades) = event {
294                for trade in trades.data {
295                    sender
296                        .send(trade)
297                        .map_err(|e| BybitError::ChannelSendError {
298                            underlying: e.to_string(),
299                        })?;
300                }
301            }
302            Ok(())
303        };
304
305        self.ws_subscribe(request, category, handler).await
306    }
307
308    /// Subscribes to ticker events for the specified symbols and category.
309    ///
310    /// # Arguments
311    ///
312    /// * `subs` - A vector of symbols for which ticker events are subscribed.
313    /// * `category` - The category for which ticker events are subscribed.
314    ///
315    /// # Examples
316    ///
317    /// ```
318    /// use your_crate_name::Category;
319    /// let subs = vec!["BTCUSD", "ETHUSD"];
320    /// let category = Category::Linear;
321    /// let sender = UnboundedSender<Ticker>;
322    /// ws_tickers(subs, category, sender);
323    /// ```
324    pub async fn ws_tickers(
325        &self,
326        subs: Vec<&str>,
327        category: Category,
328        sender: mpsc::UnboundedSender<Ticker>,
329    ) -> Result<(), BybitError> {
330        self._ws_tickers_internal(subs, category, sender, |ws_ticker: WsTicker| {
331            Some(ws_ticker.data)
332        })
333        .await
334    }
335
336    /// Subscribes to ticker events with timestamp for the specified symbols and category.
337    ///
338    /// # Arguments
339    ///
340    /// * `subs` - A vector of symbols for which ticker events are subscribed.
341    /// * `category` - The category for which ticker events are subscribed.
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use your_crate_name::Category;
347    /// let subs = vec!["BTCUSD", "ETHUSD"];
348    /// let category = Category::Linear;
349    /// let sender = UnboundedSender<Ticker>;
350    /// ws_timed_tickers(subs, category, sender);
351    /// ```
352    pub async fn ws_timed_tickers(
353        &self,
354        subs: Vec<&str>,
355        category: Category,
356        sender: mpsc::UnboundedSender<Timed<Ticker>>,
357    ) -> Result<(), BybitError> {
358        self._ws_tickers_internal(subs, category, sender, |ticker: WsTicker| {
359            Some(Timed {
360                time: ticker.ts,
361                data: ticker.data,
362            })
363        })
364        .await
365    }
366
367    /// A high abstraction level stream of timed linear snapshots, which you can
368    /// subscribe to using the receiver of the sender. Internally this method
369    /// consumes the linear ticker API but instead of returning a stream of deltas
370    /// we update the initial snapshot with all subsequent streams, and thanks
371    /// to internally using `.scan` we you get `Timed<LinearTickerDataSnapshot>`,
372    /// instead of `Timed<LinearTickerDataDelta>`.
373    ///
374    /// If you provide multiple symbols, the `LinearTickerDataSnapshot` values
375    /// will be interleaved.
376    ///
377    /// # Usage
378    /// ```no_run
379    /// use bybit::prelude::*;
380    /// use tokio::sync::mpsc;
381    /// use std::sync::Arc;
382    ///
383    /// #[tokio::main]
384    /// async fn main() {
385    ///
386    /// let ws: Arc<Stream> = Arc::new(Bybit::new(None, None));
387    /// let (tx, mut rx) = mpsc::unbounded_channel::<Timed<LinearTickerDataSnapshot>>();
388    /// tokio::spawn(async move {
389    ///     ws.ws_timed_linear_tickers(vec!["BTCUSDT".to_owned(), "ETHUSDT".to_owned()], tx)
390    ///         .await
391    ///         .unwrap();
392    /// });
393    /// while let Some(ticker_snapshot) = rx.recv().await {
394    ///     println!("{:#?}", ticker_snapshot);
395    /// }
396    /// }
397    /// ```
398    pub async fn ws_timed_linear_tickers(
399        self: Arc<Self>,
400        subs: Vec<String>,
401        sender: mpsc::UnboundedSender<Timed<LinearTickerDataSnapshot>>,
402    ) -> Result<(), BybitError> {
403        let (tx, mut rx) = mpsc::unbounded_channel::<Timed<LinearTickerData>>();
404        // Spawn the WebSocket task
405        tokio::spawn({
406            let self_arc = Arc::clone(&self);
407            let subs = subs.clone();
408            async move {
409                self_arc
410                    ._ws_tickers_internal(
411                        subs.iter().map(|s| s.as_str()).collect(),
412                        Category::Linear,
413                        tx,
414                        |ticker: WsTicker| match &ticker.data {
415                            Ticker::Linear(linear) => Some(Timed {
416                                time: ticker.ts,
417                                data: linear.clone(),
418                            }),
419                            Ticker::Spot(_) => None,
420                            Ticker::Options(_) => None,
421                            Ticker::Futures(_) => None,
422                        },
423                    )
424                    .await
425            }
426        });
427
428        // State to store snapshots for each symbol
429        let mut snapshots: HashMap<String, Timed<LinearTickerDataSnapshot>> = HashMap::new();
430
431        // Process incoming messages
432        while let Some(ticker) = rx.recv().await {
433            match ticker.data {
434                LinearTickerData::Snapshot(snapshot) => {
435                    let symbol = snapshot.symbol.clone();
436                    let timed_snapshot = Timed {
437                        time: ticker.time,
438                        data: snapshot,
439                    };
440                    // Store the snapshot and send it
441                    snapshots.insert(symbol.clone(), timed_snapshot.clone());
442                    sender
443                        .send(timed_snapshot)
444                        .map_err(|e| BybitError::ChannelSendError {
445                            underlying: e.to_string(),
446                        })?
447                }
448                LinearTickerData::Delta(delta) => {
449                    let symbol = delta.symbol.clone();
450                    if let Some(snapshot_timed) = snapshots.get_mut(&symbol) {
451                        let mut snapshot = snapshot_timed.data.clone();
452                        snapshot.update(delta);
453                        let new = Timed {
454                            data: snapshot,
455                            time: ticker.time,
456                        };
457                        *snapshot_timed = new.clone();
458                        sender.send(new).map_err(|e| BybitError::ChannelSendError {
459                            underlying: e.to_string(),
460                        })?
461                    }
462                    // If no snapshot exists for the symbol, skip the delta
463                }
464            }
465        }
466
467        Ok(())
468    }
469
470    async fn _ws_tickers_internal<T, F>(
471        &self,
472        subs: Vec<&str>,
473        category: Category,
474        sender: mpsc::UnboundedSender<T>,
475        filter_map: F,
476    ) -> Result<(), BybitError>
477    where
478        T: 'static + Sync + Send,
479        F: 'static + Sync + Send + Fn(WsTicker) -> Option<T>,
480    {
481        let arr: Vec<String> = subs
482            .into_iter()
483            .map(|sub| format!("tickers.{}", sub.to_uppercase()))
484            .collect();
485        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
486
487        let handler = move |event| {
488            if let WebsocketEvents::TickerEvent(ticker) = event {
489                if let Some(mapped) = filter_map(ticker) {
490                    sender
491                        .send(mapped)
492                        .map_err(|e| BybitError::ChannelSendError {
493                            underlying: e.to_string(),
494                        })?;
495                }
496            }
497            Ok(())
498        };
499
500        self.ws_subscribe(request, category, handler).await
501    }
502    pub async fn ws_liquidations(
503        &self,
504        subs: Vec<&str>,
505        category: Category,
506        sender: mpsc::UnboundedSender<LiquidationData>,
507    ) -> Result<(), BybitError> {
508        let arr: Vec<String> = subs
509            .into_iter()
510            .map(|sub| format!("allLiquidation.{}", sub.to_uppercase()))
511            .collect();
512        let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
513
514        let handler = move |event| {
515            if let WebsocketEvents::LiquidationEvent(liquidation) = event {
516                sender
517                    .send(liquidation.data)
518                    .map_err(|e| BybitError::ChannelSendError {
519                        underlying: e.to_string(),
520                    })?;
521            }
522            Ok(())
523        };
524
525        self.ws_subscribe(request, category, handler).await
526    }
527    pub async fn ws_klines(
528        &self,
529        subs: Vec<(&str, &str)>,
530        category: Category,
531        sender: mpsc::UnboundedSender<WsKline>,
532    ) -> Result<(), BybitError> {
533        let arr: Vec<String> = subs
534            .into_iter()
535            .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
536            .collect();
537        let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
538        self.ws_subscribe(request, category, move |event| {
539            if let WebsocketEvents::KlineEvent(kline) = event {
540                sender
541                    .send(kline)
542                    .map_err(|e| BybitError::ChannelSendError {
543                        underlying: e.to_string(),
544                    })?;
545            }
546            Ok(())
547        })
548        .await
549    }
550
551    pub async fn ws_position(
552        &self,
553        cat: Option<Category>,
554        sender: mpsc::UnboundedSender<PositionData>,
555    ) -> Result<(), BybitError> {
556        let sub_str = if let Some(v) = cat {
557            match v {
558                Category::Linear => "position.linear",
559                Category::Inverse => "position.inverse",
560                _ => "",
561            }
562        } else {
563            "position"
564        };
565
566        let request = Subscription::new("subscribe", vec![sub_str]);
567        self.ws_priv_subscribe(request, move |event| {
568            if let WebsocketEvents::PositionEvent(position) = event {
569                for v in position.data {
570                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
571                        underlying: e.to_string(),
572                    })?;
573                }
574            }
575            Ok(())
576        })
577        .await
578    }
579
580    pub async fn ws_executions(
581        &self,
582        cat: Option<Category>,
583        sender: mpsc::UnboundedSender<ExecutionData>,
584    ) -> Result<(), BybitError> {
585        let sub_str = if let Some(v) = cat {
586            match v {
587                Category::Linear => "execution.linear",
588                Category::Inverse => "execution.inverse",
589                Category::Spot => "execution.spot",
590                Category::Option => "execution.option",
591            }
592        } else {
593            "execution"
594        };
595
596        let request = Subscription::new("subscribe", vec![sub_str]);
597        self.ws_priv_subscribe(request, move |event| {
598            if let WebsocketEvents::ExecutionEvent(execute) = event {
599                for v in execute.data {
600                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
601                        underlying: e.to_string(),
602                    })?;
603                }
604            }
605            Ok(())
606        })
607        .await
608    }
609
610    pub async fn ws_fast_exec(
611        &self,
612        sender: mpsc::UnboundedSender<FastExecData>,
613    ) -> Result<(), BybitError> {
614        let sub_str = "execution.fast";
615        let request = Subscription::new("subscribe", vec![sub_str]);
616
617        self.ws_priv_subscribe(request, move |event| {
618            if let WebsocketEvents::FastExecEvent(execution) = event {
619                for v in execution.data {
620                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
621                        underlying: e.to_string(),
622                    })?;
623                }
624            }
625            Ok(())
626        })
627        .await
628    }
629
630    pub async fn ws_orders(
631        &self,
632        cat: Option<Category>,
633        sender: mpsc::UnboundedSender<OrderData>,
634    ) -> Result<(), BybitError> {
635        let sub_str = if let Some(v) = cat {
636            match v {
637                Category::Linear => "order.linear",
638                Category::Inverse => "order.inverse",
639                Category::Spot => "order.spot",
640                Category::Option => "order.option",
641            }
642        } else {
643            "order"
644        };
645
646        let request = Subscription::new("subscribe", vec![sub_str]);
647        self.ws_priv_subscribe(request, move |event| {
648            if let WebsocketEvents::OrderEvent(order) = event {
649                for v in order.data {
650                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
651                        underlying: e.to_string(),
652                    })?;
653                }
654            }
655            Ok(())
656        })
657        .await
658    }
659
660    pub async fn ws_wallet(
661        &self,
662        sender: mpsc::UnboundedSender<WalletData>,
663    ) -> Result<(), BybitError> {
664        let sub_str = "wallet";
665        let request = Subscription::new("subscribe", vec![sub_str]);
666        self.ws_priv_subscribe(request, move |event| {
667            if let WebsocketEvents::Wallet(wallet) = event {
668                for v in wallet.data {
669                    sender.send(v).map_err(|e| BybitError::ChannelSendError {
670                        underlying: e.to_string(),
671                    })?;
672                }
673            }
674            Ok(())
675        })
676        .await
677    }
678
679    /// Subscribes to system status updates via WebSocket.
680    ///
681    /// System status updates provide real-time information about platform maintenance
682    /// or service incidents. This is useful for monitoring exchange health and
683    /// planning trading activities around maintenance windows.
684    ///
685    /// # Arguments
686    ///
687    /// * `sender` - Channel sender for system status updates
688    ///
689    /// # Returns
690    ///
691    /// Returns `Ok(())` if subscription succeeds, otherwise returns a `BybitError`.
692    ///
693    /// # Example
694    ///
695    /// ```no_run
696    /// use rs_bybit::prelude::*;
697    /// use tokio::sync::mpsc;
698    ///
699    /// # #[tokio::main]
700    /// # async fn main() -> Result<(), BybitError> {
701    /// let client = Client::new("api_key", "api_secret", None, None)?;
702    /// let stream = Stream { client };
703    /// let (tx, mut rx) = mpsc::unbounded_channel();
704    ///
705    /// stream.ws_system_status(tx).await?;
706    ///
707    /// while let Some(update) = rx.recv().await {
708    ///     println!("System status update: {:?}", update);
709    /// }
710    /// # Ok(())
711    /// # }
712    /// ```
713    pub async fn ws_system_status(
714        &self,
715        sender: mpsc::UnboundedSender<SystemStatusUpdate>,
716    ) -> Result<(), BybitError> {
717        let request = Subscription::new("subscribe", vec!["system.status"]);
718        let request_str = Self::build_subscription(request);
719
720        // System status uses the misc/status endpoint
721        let endpoint = WebsocketAPI::PublicMiscStatus;
722        let response = self
723            .client
724            .wss_connect(endpoint, Some(request_str), false, None)
725            .await?;
726
727        let handler = move |event| {
728            if let WebsocketEvents::SystemStatusEvent(status_update) = event {
729                sender
730                    .send(status_update)
731                    .map_err(|e| BybitError::ChannelSendError {
732                        underlying: e.to_string(),
733                    })?;
734            }
735            Ok(())
736        };
737
738        Self::event_loop(response, handler, None).await?;
739        Ok(())
740    }
741
742    pub async fn ws_trade_stream<'a, F>(
743        &self,
744        req: mpsc::UnboundedReceiver<RequestType<'a>>,
745        handler: F,
746    ) -> Result<(), BybitError>
747    where
748        F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
749        'a: 'static,
750    {
751        let response = self
752            .client
753            .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
754            .await?;
755        Self::event_loop(response, handler, Some(req)).await?;
756
757        Ok(())
758    }
759
760    pub async fn event_loop<'a, H>(
761        mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
762        mut handler: H,
763        mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
764    ) -> Result<(), BybitError>
765    where
766        H: WebSocketHandler,
767    {
768        let mut interval = Instant::now();
769        loop {
770            let msg = stream.next().await;
771            match msg {
772                Some(Ok(WsMessage::Text(msg))) => {
773                    if let Err(_) = handler.handle_msg(&msg) {
774                        return Err(BybitError::Base(
775                            "Error handling stream message".to_string(),
776                        ));
777                    }
778                }
779                Some(Err(e)) => {
780                    return Err(BybitError::from(e.to_string()));
781                }
782                None => {
783                    return Err(BybitError::Base("Stream was closed".to_string()));
784                }
785                _ => {}
786            }
787            if let Some(sender) = order_sender.as_mut() {
788                if let Some(v) = sender.recv().await {
789                    let order_req = Self::build_trade_subscription(v, Some(3000));
790                    stream.send(WsMessage::Text(order_req)).await?;
791                }
792            }
793
794            if interval.elapsed() > Duration::from_secs(30) {
795                let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
796                if order_sender.is_none() {
797                    parameters.insert("req_id".into(), generate_random_uid(8).into());
798                }
799                parameters.insert("op".into(), "ping".into());
800                let request = build_json_request(&parameters);
801                let _ = stream
802                    .send(WsMessage::Text(request))
803                    .await
804                    .map_err(BybitError::from);
805                interval = Instant::now();
806            }
807        }
808    }
809}
810
811pub trait WebSocketHandler {
812    type Event;
813    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
814}
815
816impl<F> WebSocketHandler for F
817where
818    F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
819{
820    type Event = WebsocketEvents;
821    fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
822        let update: Value = serde_json::from_str(msg)?;
823        if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
824            self(event)?;
825        }
826
827        Ok(())
828    }
829}