1use crate::prelude::*;
2
3use futures::{SinkExt, StreamExt};
4use log::trace;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::net::TcpStream;
9use tokio::sync::mpsc;
10use tokio::time::Duration;
11use tokio_tungstenite::WebSocketStream;
12use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
13
14#[derive(Clone)]
15pub struct Stream {
16 pub client: Client,
17}
18
19impl Stream {
20 pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
29 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
30 parameters.insert("req_id".into(), generate_random_uid(8).into());
31 parameters.insert("op".into(), "ping".into());
32 let request = build_json_request(¶meters);
33 let endpoint = if private {
34 WebsocketAPI::Private
35 } else {
36 WebsocketAPI::PublicLinear
37 };
38 let mut response = self
39 .client
40 .wss_connect(endpoint, Some(request), private, None)
41 .await?;
42 let Some(data) = response.next().await else {
43 return Err(BybitError::Base(
44 "Failed to receive ping response".to_string(),
45 ));
46 };
47
48 let data = data
49 .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
50 if let WsMessage::Text(data) = data {
51 let response: PongResponse = serde_json::from_str(&data)?;
52 match response {
53 PongResponse::PublicPong(pong) => {
54 trace!("Pong received successfully: {:#?}", pong);
55 }
56 PongResponse::PrivatePong(pong) => {
57 trace!("Pong received successfully: {:#?}", pong);
58 }
59 }
60 }
61 Ok(())
62 }
63
64 pub async fn ws_priv_subscribe<'b, F>(
65 &self,
66 req: Subscription<'_>,
67 handler: F,
68 ) -> Result<(), BybitError>
69 where
70 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
71 {
72 let request = Self::build_subscription(req);
73 let response = self
74 .client
75 .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
76 .await?;
77 if let Ok(_) = Self::event_loop(response, handler, None).await {}
78 Ok(())
79 }
80
81 pub async fn ws_subscribe<'b, F>(
82 &self,
83 req: Subscription<'_>,
84 category: Category,
85 handler: F,
86 ) -> Result<(), BybitError>
87 where
88 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
89 {
90 let endpoint = {
91 match category {
92 Category::Linear => WebsocketAPI::PublicLinear,
93 Category::Inverse => WebsocketAPI::PublicInverse,
94 Category::Spot => WebsocketAPI::PublicSpot,
95 Category::Option => WebsocketAPI::PublicOption,
96 }
97 };
98 let request = Self::build_subscription(req);
99 let response = self
100 .client
101 .wss_connect(endpoint, Some(request), false, None)
102 .await?;
103 Self::event_loop(response, handler, None).await?;
104 Ok(())
105 }
106
107 pub fn build_subscription(action: Subscription) -> String {
108 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
109 parameters.insert("req_id".into(), generate_random_uid(8).into());
110 parameters.insert("op".into(), action.op.into());
111 let args_value: Value = action
112 .args
113 .iter()
114 .map(ToString::to_string)
115 .collect::<Vec<_>>()
116 .into();
117 parameters.insert("args".into(), args_value);
118
119 build_json_request(¶meters)
120 }
121
122 pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
123 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
124 parameters.insert("reqId".into(), generate_random_uid(16).into());
125 let mut header_map: BTreeMap<String, String> = BTreeMap::new();
126 header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
127 header_map.insert(
128 "X-BAPI-RECV-WINDOW".into(),
129 recv_window.unwrap_or(5000).to_string(),
130 );
131 parameters.insert("header".into(), json!(header_map));
132 match orders {
133 RequestType::Create(order) => {
134 parameters.insert("op".into(), "order.create".into());
135 parameters.insert("args".into(), build_ws_orders(RequestType::Create(order)));
136 }
137 RequestType::CreateBatch(order) => {
138 parameters.insert("op".into(), "order.create-batch".into());
139 parameters.insert(
140 "args".into(),
141 build_ws_orders(RequestType::CreateBatch(order)),
142 );
143 }
144 RequestType::Amend(order) => {
145 parameters.insert("op".into(), "order.amend".into());
146 parameters.insert("args".into(), build_ws_orders(RequestType::Amend(order)));
147 }
148 RequestType::AmendBatch(order) => {
149 parameters.insert("op".into(), "order.amend-batch".into());
150 parameters.insert(
151 "args".into(),
152 build_ws_orders(RequestType::AmendBatch(order)),
153 );
154 }
155 RequestType::Cancel(order) => {
156 parameters.insert("op".into(), "order.cancel".into());
157 parameters.insert("args".into(), build_ws_orders(RequestType::Cancel(order)));
158 }
159 RequestType::CancelBatch(order) => {
160 parameters.insert("op".into(), "order.cancel-batch".into());
161 parameters.insert(
162 "args".into(),
163 build_ws_orders(RequestType::CancelBatch(order)),
164 );
165 }
166 }
167 build_json_request(¶meters)
168 }
169
170 pub async fn ws_orderbook(
184 &self,
185 subs: Vec<(i32, &str)>,
186 category: Category,
187 sender: mpsc::UnboundedSender<OrderBookUpdate>,
188 ) -> Result<(), BybitError> {
189 let arr: Vec<String> = subs
190 .into_iter()
191 .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
192 .collect();
193 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
194 self.ws_subscribe(request, category, move |event| {
195 if let WebsocketEvents::OrderBookEvent(order_book) = event {
196 sender
197 .send(order_book)
198 .map_err(|e| BybitError::ChannelSendError {
199 underlying: e.to_string(),
200 })?;
201 }
202 Ok(())
203 })
204 .await
205 }
206
207 pub async fn ws_rpi_orderbook(
244 &self,
245 subs: Vec<&str>,
246 category: Category,
247 sender: mpsc::UnboundedSender<RPIOrderbookUpdate>,
248 ) -> Result<(), BybitError> {
249 let arr: Vec<String> = subs
250 .into_iter()
251 .map(|sym| format!("orderbook.rpi.{}", sym.to_uppercase()))
252 .collect();
253 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
254 self.ws_subscribe(request, category, move |event| {
255 if let WebsocketEvents::RPIOrderBookEvent(rpi_order_book) = event {
256 sender
257 .send(rpi_order_book)
258 .map_err(|e| BybitError::ChannelSendError {
259 underlying: e.to_string(),
260 })?;
261 }
262 Ok(())
263 })
264 .await
265 }
266
267 pub async fn ws_trades(
282 &self,
283 subs: Vec<&str>,
284 category: Category,
285 sender: mpsc::UnboundedSender<WsTrade>,
286 ) -> Result<(), BybitError> {
287 let arr: Vec<String> = subs
288 .iter()
289 .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
290 .collect();
291 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
292 let handler = move |event| {
293 if let WebsocketEvents::TradeEvent(trades) = event {
294 for trade in trades.data {
295 sender
296 .send(trade)
297 .map_err(|e| BybitError::ChannelSendError {
298 underlying: e.to_string(),
299 })?;
300 }
301 }
302 Ok(())
303 };
304
305 self.ws_subscribe(request, category, handler).await
306 }
307
308 pub async fn ws_tickers(
325 &self,
326 subs: Vec<&str>,
327 category: Category,
328 sender: mpsc::UnboundedSender<Ticker>,
329 ) -> Result<(), BybitError> {
330 self._ws_tickers_internal(subs, category, sender, |ws_ticker: WsTicker| {
331 Some(ws_ticker.data)
332 })
333 .await
334 }
335
336 pub async fn ws_timed_tickers(
353 &self,
354 subs: Vec<&str>,
355 category: Category,
356 sender: mpsc::UnboundedSender<Timed<Ticker>>,
357 ) -> Result<(), BybitError> {
358 self._ws_tickers_internal(subs, category, sender, |ticker: WsTicker| {
359 Some(Timed {
360 time: ticker.ts,
361 data: ticker.data,
362 })
363 })
364 .await
365 }
366
367 pub async fn ws_timed_linear_tickers(
399 self: Arc<Self>,
400 subs: Vec<String>,
401 sender: mpsc::UnboundedSender<Timed<LinearTickerDataSnapshot>>,
402 ) -> Result<(), BybitError> {
403 let (tx, mut rx) = mpsc::unbounded_channel::<Timed<LinearTickerData>>();
404 tokio::spawn({
406 let self_arc = Arc::clone(&self);
407 let subs = subs.clone();
408 async move {
409 self_arc
410 ._ws_tickers_internal(
411 subs.iter().map(|s| s.as_str()).collect(),
412 Category::Linear,
413 tx,
414 |ticker: WsTicker| match &ticker.data {
415 Ticker::Linear(linear) => Some(Timed {
416 time: ticker.ts,
417 data: linear.clone(),
418 }),
419 Ticker::Spot(_) => None,
420 Ticker::Options(_) => None,
421 Ticker::Futures(_) => None,
422 },
423 )
424 .await
425 }
426 });
427
428 let mut snapshots: HashMap<String, Timed<LinearTickerDataSnapshot>> = HashMap::new();
430
431 while let Some(ticker) = rx.recv().await {
433 match ticker.data {
434 LinearTickerData::Snapshot(snapshot) => {
435 let symbol = snapshot.symbol.clone();
436 let timed_snapshot = Timed {
437 time: ticker.time,
438 data: snapshot,
439 };
440 snapshots.insert(symbol.clone(), timed_snapshot.clone());
442 sender
443 .send(timed_snapshot)
444 .map_err(|e| BybitError::ChannelSendError {
445 underlying: e.to_string(),
446 })?
447 }
448 LinearTickerData::Delta(delta) => {
449 let symbol = delta.symbol.clone();
450 if let Some(snapshot_timed) = snapshots.get_mut(&symbol) {
451 let mut snapshot = snapshot_timed.data.clone();
452 snapshot.update(delta);
453 let new = Timed {
454 data: snapshot,
455 time: ticker.time,
456 };
457 *snapshot_timed = new.clone();
458 sender.send(new).map_err(|e| BybitError::ChannelSendError {
459 underlying: e.to_string(),
460 })?
461 }
462 }
464 }
465 }
466
467 Ok(())
468 }
469
470 async fn _ws_tickers_internal<T, F>(
471 &self,
472 subs: Vec<&str>,
473 category: Category,
474 sender: mpsc::UnboundedSender<T>,
475 filter_map: F,
476 ) -> Result<(), BybitError>
477 where
478 T: 'static + Sync + Send,
479 F: 'static + Sync + Send + Fn(WsTicker) -> Option<T>,
480 {
481 let arr: Vec<String> = subs
482 .into_iter()
483 .map(|sub| format!("tickers.{}", sub.to_uppercase()))
484 .collect();
485 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
486
487 let handler = move |event| {
488 if let WebsocketEvents::TickerEvent(ticker) = event {
489 if let Some(mapped) = filter_map(ticker) {
490 sender
491 .send(mapped)
492 .map_err(|e| BybitError::ChannelSendError {
493 underlying: e.to_string(),
494 })?;
495 }
496 }
497 Ok(())
498 };
499
500 self.ws_subscribe(request, category, handler).await
501 }
502 pub async fn ws_liquidations(
503 &self,
504 subs: Vec<&str>,
505 category: Category,
506 sender: mpsc::UnboundedSender<LiquidationData>,
507 ) -> Result<(), BybitError> {
508 let arr: Vec<String> = subs
509 .into_iter()
510 .map(|sub| format!("allLiquidation.{}", sub.to_uppercase()))
511 .collect();
512 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
513
514 let handler = move |event| {
515 if let WebsocketEvents::LiquidationEvent(liquidation) = event {
516 sender
517 .send(liquidation.data)
518 .map_err(|e| BybitError::ChannelSendError {
519 underlying: e.to_string(),
520 })?;
521 }
522 Ok(())
523 };
524
525 self.ws_subscribe(request, category, handler).await
526 }
527 pub async fn ws_klines(
528 &self,
529 subs: Vec<(&str, &str)>,
530 category: Category,
531 sender: mpsc::UnboundedSender<WsKline>,
532 ) -> Result<(), BybitError> {
533 let arr: Vec<String> = subs
534 .into_iter()
535 .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
536 .collect();
537 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
538 self.ws_subscribe(request, category, move |event| {
539 if let WebsocketEvents::KlineEvent(kline) = event {
540 sender
541 .send(kline)
542 .map_err(|e| BybitError::ChannelSendError {
543 underlying: e.to_string(),
544 })?;
545 }
546 Ok(())
547 })
548 .await
549 }
550
551 pub async fn ws_position(
552 &self,
553 cat: Option<Category>,
554 sender: mpsc::UnboundedSender<PositionData>,
555 ) -> Result<(), BybitError> {
556 let sub_str = if let Some(v) = cat {
557 match v {
558 Category::Linear => "position.linear",
559 Category::Inverse => "position.inverse",
560 _ => "",
561 }
562 } else {
563 "position"
564 };
565
566 let request = Subscription::new("subscribe", vec![sub_str]);
567 self.ws_priv_subscribe(request, move |event| {
568 if let WebsocketEvents::PositionEvent(position) = event {
569 for v in position.data {
570 sender.send(v).map_err(|e| BybitError::ChannelSendError {
571 underlying: e.to_string(),
572 })?;
573 }
574 }
575 Ok(())
576 })
577 .await
578 }
579
580 pub async fn ws_executions(
581 &self,
582 cat: Option<Category>,
583 sender: mpsc::UnboundedSender<ExecutionData>,
584 ) -> Result<(), BybitError> {
585 let sub_str = if let Some(v) = cat {
586 match v {
587 Category::Linear => "execution.linear",
588 Category::Inverse => "execution.inverse",
589 Category::Spot => "execution.spot",
590 Category::Option => "execution.option",
591 }
592 } else {
593 "execution"
594 };
595
596 let request = Subscription::new("subscribe", vec![sub_str]);
597 self.ws_priv_subscribe(request, move |event| {
598 if let WebsocketEvents::ExecutionEvent(execute) = event {
599 for v in execute.data {
600 sender.send(v).map_err(|e| BybitError::ChannelSendError {
601 underlying: e.to_string(),
602 })?;
603 }
604 }
605 Ok(())
606 })
607 .await
608 }
609
610 pub async fn ws_fast_exec(
611 &self,
612 sender: mpsc::UnboundedSender<FastExecData>,
613 ) -> Result<(), BybitError> {
614 let sub_str = "execution.fast";
615 let request = Subscription::new("subscribe", vec![sub_str]);
616
617 self.ws_priv_subscribe(request, move |event| {
618 if let WebsocketEvents::FastExecEvent(execution) = event {
619 for v in execution.data {
620 sender.send(v).map_err(|e| BybitError::ChannelSendError {
621 underlying: e.to_string(),
622 })?;
623 }
624 }
625 Ok(())
626 })
627 .await
628 }
629
630 pub async fn ws_orders(
631 &self,
632 cat: Option<Category>,
633 sender: mpsc::UnboundedSender<OrderData>,
634 ) -> Result<(), BybitError> {
635 let sub_str = if let Some(v) = cat {
636 match v {
637 Category::Linear => "order.linear",
638 Category::Inverse => "order.inverse",
639 Category::Spot => "order.spot",
640 Category::Option => "order.option",
641 }
642 } else {
643 "order"
644 };
645
646 let request = Subscription::new("subscribe", vec![sub_str]);
647 self.ws_priv_subscribe(request, move |event| {
648 if let WebsocketEvents::OrderEvent(order) = event {
649 for v in order.data {
650 sender.send(v).map_err(|e| BybitError::ChannelSendError {
651 underlying: e.to_string(),
652 })?;
653 }
654 }
655 Ok(())
656 })
657 .await
658 }
659
660 pub async fn ws_wallet(
661 &self,
662 sender: mpsc::UnboundedSender<WalletData>,
663 ) -> Result<(), BybitError> {
664 let sub_str = "wallet";
665 let request = Subscription::new("subscribe", vec![sub_str]);
666 self.ws_priv_subscribe(request, move |event| {
667 if let WebsocketEvents::Wallet(wallet) = event {
668 for v in wallet.data {
669 sender.send(v).map_err(|e| BybitError::ChannelSendError {
670 underlying: e.to_string(),
671 })?;
672 }
673 }
674 Ok(())
675 })
676 .await
677 }
678
679 pub async fn ws_system_status(
714 &self,
715 sender: mpsc::UnboundedSender<SystemStatusUpdate>,
716 ) -> Result<(), BybitError> {
717 let request = Subscription::new("subscribe", vec!["system.status"]);
718 let request_str = Self::build_subscription(request);
719
720 let endpoint = WebsocketAPI::PublicMiscStatus;
722 let response = self
723 .client
724 .wss_connect(endpoint, Some(request_str), false, None)
725 .await?;
726
727 let handler = move |event| {
728 if let WebsocketEvents::SystemStatusEvent(status_update) = event {
729 sender
730 .send(status_update)
731 .map_err(|e| BybitError::ChannelSendError {
732 underlying: e.to_string(),
733 })?;
734 }
735 Ok(())
736 };
737
738 Self::event_loop(response, handler, None).await?;
739 Ok(())
740 }
741
742 pub async fn ws_trade_stream<'a, F>(
743 &self,
744 req: mpsc::UnboundedReceiver<RequestType<'a>>,
745 handler: F,
746 ) -> Result<(), BybitError>
747 where
748 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
749 'a: 'static,
750 {
751 let response = self
752 .client
753 .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
754 .await?;
755 Self::event_loop(response, handler, Some(req)).await?;
756
757 Ok(())
758 }
759
760 pub async fn event_loop<'a, H>(
761 mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
762 mut handler: H,
763 mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
764 ) -> Result<(), BybitError>
765 where
766 H: WebSocketHandler,
767 {
768 let mut interval = Instant::now();
769 loop {
770 let msg = stream.next().await;
771 match msg {
772 Some(Ok(WsMessage::Text(msg))) => {
773 if let Err(_) = handler.handle_msg(&msg) {
774 return Err(BybitError::Base(
775 "Error handling stream message".to_string(),
776 ));
777 }
778 }
779 Some(Err(e)) => {
780 return Err(BybitError::from(e.to_string()));
781 }
782 None => {
783 return Err(BybitError::Base("Stream was closed".to_string()));
784 }
785 _ => {}
786 }
787 if let Some(sender) = order_sender.as_mut() {
788 if let Some(v) = sender.recv().await {
789 let order_req = Self::build_trade_subscription(v, Some(3000));
790 stream.send(WsMessage::Text(order_req)).await?;
791 }
792 }
793
794 if interval.elapsed() > Duration::from_secs(30) {
795 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
796 if order_sender.is_none() {
797 parameters.insert("req_id".into(), generate_random_uid(8).into());
798 }
799 parameters.insert("op".into(), "ping".into());
800 let request = build_json_request(¶meters);
801 let _ = stream
802 .send(WsMessage::Text(request))
803 .await
804 .map_err(BybitError::from);
805 interval = Instant::now();
806 }
807 }
808 }
809}
810
811pub trait WebSocketHandler {
812 type Event;
813 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
814}
815
816impl<F> WebSocketHandler for F
817where
818 F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
819{
820 type Event = WebsocketEvents;
821 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
822 let update: Value = serde_json::from_str(msg)?;
823 if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
824 self(event)?;
825 }
826
827 Ok(())
828 }
829}