1use crate::prelude::*;
2
3use futures::{SinkExt, StreamExt};
4use log::trace;
5use std::time::Instant;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc;
8use tokio::time::Duration;
9use tokio_tungstenite::WebSocketStream;
10use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
11
12#[derive(Clone)]
13pub struct Stream {
14 pub client: Client,
15}
16
17impl Stream {
18 pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
27 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
28 parameters.insert("req_id".into(), generate_random_uid(8).into());
29 parameters.insert("op".into(), "ping".into());
30 let request = build_json_request(¶meters);
31 let endpoint = if private {
32 WebsocketAPI::Private
33 } else {
34 WebsocketAPI::PublicLinear
35 };
36 let mut response = self
37 .client
38 .wss_connect(endpoint, Some(request), private, None)
39 .await?;
40 let Some(data) = response.next().await else {
41 return Err(BybitError::Base(
42 "Failed to receive ping response".to_string(),
43 ));
44 };
45
46 let data = data
47 .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
48 if let WsMessage::Text(data) = data {
49 let response: PongResponse = serde_json::from_str(&data)?;
50 match response {
51 PongResponse::PublicPong(pong) => {
52 trace!("Pong received successfully: {:#?}", pong);
53 }
54 PongResponse::PrivatePong(pong) => {
55 trace!("Pong received successfully: {:#?}", pong);
56 }
57 }
58 }
59 Ok(())
60 }
61
62 pub async fn ws_priv_subscribe<'b, F>(
63 &self,
64 req: Subscription<'_>,
65 handler: F,
66 ) -> Result<(), BybitError>
67 where
68 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
69 {
70 let request = Self::build_subscription(req);
71 let response = self
72 .client
73 .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
74 .await?;
75 if let Ok(_) = Self::event_loop(response, handler, None).await {}
76 Ok(())
77 }
78
79 pub async fn ws_subscribe<'b, F>(
80 &self,
81 req: Subscription<'_>,
82 category: Category,
83 handler: F,
84 ) -> Result<(), BybitError>
85 where
86 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
87 {
88 let endpoint = {
89 match category {
90 Category::Linear => WebsocketAPI::PublicLinear,
91 Category::Inverse => WebsocketAPI::PublicInverse,
92 Category::Spot => WebsocketAPI::PublicSpot,
93 _ => unimplemented!("Option has not been implemented"),
94 }
95 };
96 let request = Self::build_subscription(req);
97 let response = self
98 .client
99 .wss_connect(endpoint, Some(request), false, None)
100 .await?;
101 Self::event_loop(response, handler, None).await?;
102 Ok(())
103 }
104
105 pub fn build_subscription(action: Subscription) -> String {
106 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
107 parameters.insert("req_id".into(), generate_random_uid(8).into());
108 parameters.insert("op".into(), action.op.into());
109 let args_value: Value = action
110 .args
111 .iter()
112 .map(ToString::to_string)
113 .collect::<Vec<_>>()
114 .into();
115 parameters.insert("args".into(), args_value);
116
117 build_json_request(¶meters)
118 }
119
120 pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
121 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
122 parameters.insert("reqId".into(), generate_random_uid(16).into());
123 let mut header_map: BTreeMap<String, String> = BTreeMap::new();
124 header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
125 header_map.insert(
126 "X-BAPI-RECV-WINDOW".into(),
127 recv_window.unwrap_or(5000).to_string(),
128 );
129 parameters.insert("header".into(), json!(header_map));
130 match orders {
131 RequestType::Create(order) => {
132 parameters.insert("op".into(), "order.create".into());
133 parameters.insert("args".into(), build_ws_orders(RequestType::Create(order)));
134 }
135 RequestType::CreateBatch(order) => {
136 parameters.insert("op".into(), "order.create-batch".into());
137 parameters.insert(
138 "args".into(),
139 build_ws_orders(RequestType::CreateBatch(order)),
140 );
141 }
142 RequestType::Amend(order) => {
143 parameters.insert("op".into(), "order.amend".into());
144 parameters.insert("args".into(), build_ws_orders(RequestType::Amend(order)));
145 }
146 RequestType::AmendBatch(order) => {
147 parameters.insert("op".into(), "order.amend-batch".into());
148 parameters.insert(
149 "args".into(),
150 build_ws_orders(RequestType::AmendBatch(order)),
151 );
152 }
153 RequestType::Cancel(order) => {
154 parameters.insert("op".into(), "order.cancel".into());
155 parameters.insert("args".into(), build_ws_orders(RequestType::Cancel(order)));
156 }
157 RequestType::CancelBatch(order) => {
158 parameters.insert("op".into(), "order.cancel-batch".into());
159 parameters.insert(
160 "args".into(),
161 build_ws_orders(RequestType::CancelBatch(order)),
162 );
163 }
164 }
165 build_json_request(¶meters)
166 }
167
168 pub async fn ws_orderbook(
182 &self,
183 subs: Vec<(i32, &str)>,
184 category: Category,
185 sender: mpsc::UnboundedSender<OrderBookUpdate>,
186 ) -> Result<(), BybitError> {
187 let arr: Vec<String> = subs
188 .into_iter()
189 .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
190 .collect();
191 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
192 self.ws_subscribe(request, category, move |event| {
193 if let WebsocketEvents::OrderBookEvent(order_book) = event {
194 sender
195 .send(order_book)
196 .map_err(|e| BybitError::ChannelSendError {
197 underlying: e.to_string(),
198 })?;
199 }
200 Ok(())
201 })
202 .await
203 }
204
205 pub async fn ws_trades(
220 &self,
221 subs: Vec<&str>,
222 category: Category,
223 sender: mpsc::UnboundedSender<WsTrade>,
224 ) -> Result<(), BybitError> {
225 let arr: Vec<String> = subs
226 .iter()
227 .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
228 .collect();
229 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
230 let handler = move |event| {
231 if let WebsocketEvents::TradeEvent(trades) = event {
232 for trade in trades.data {
233 sender
234 .send(trade)
235 .map_err(|e| BybitError::ChannelSendError {
236 underlying: e.to_string(),
237 })?;
238 }
239 }
240 Ok(())
241 };
242
243 self.ws_subscribe(request, category, handler).await
244 }
245
246 pub async fn ws_tickers(
263 &self,
264 subs: Vec<&str>,
265 category: Category,
266 sender: mpsc::UnboundedSender<Ticker>,
267 ) -> Result<(), BybitError> {
268 self._ws_tickers(subs, category, sender, |ws_ticker| ws_ticker.data)
269 .await
270 }
271
272 pub async fn ws_timed_tickers(
289 &self,
290 subs: Vec<&str>,
291 category: Category,
292 sender: mpsc::UnboundedSender<Timed<Ticker>>,
293 ) -> Result<(), BybitError> {
294 self._ws_tickers(subs, category, sender, |ticker| Timed {
295 time: ticker.ts,
296 data: ticker.data,
297 })
298 .await
299 }
300
301 async fn _ws_tickers<T, F>(
302 &self,
303 subs: Vec<&str>,
304 category: Category,
305 sender: mpsc::UnboundedSender<T>,
306 map: F,
307 ) -> Result<(), BybitError>
308 where
309 T: 'static + Sync + Send,
310 F: 'static + Sync + Send + Fn(WsTicker) -> T,
311 {
312 let arr: Vec<String> = subs
313 .into_iter()
314 .map(|sub| format!("tickers.{}", sub.to_uppercase()))
315 .collect();
316 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
317
318 let handler = move |event| {
319 if let WebsocketEvents::TickerEvent(ticker) = event {
320 let mapped = map(ticker);
321 sender
322 .send(mapped)
323 .map_err(|e| BybitError::ChannelSendError {
324 underlying: e.to_string(),
325 })?;
326 }
327 Ok(())
328 };
329
330 self.ws_subscribe(request, category, handler).await
331 }
332 pub async fn ws_liquidations(
333 &self,
334 subs: Vec<&str>,
335 category: Category,
336 sender: mpsc::UnboundedSender<LiquidationData>,
337 ) -> Result<(), BybitError> {
338 let arr: Vec<String> = subs
339 .into_iter()
340 .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
341 .collect();
342 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
343
344 let handler = move |event| {
345 if let WebsocketEvents::LiquidationEvent(liquidation) = event {
346 sender
347 .send(liquidation.data)
348 .map_err(|e| BybitError::ChannelSendError {
349 underlying: e.to_string(),
350 })?;
351 }
352 Ok(())
353 };
354
355 self.ws_subscribe(request, category, handler).await
356 }
357 pub async fn ws_klines(
358 &self,
359 subs: Vec<(&str, &str)>,
360 category: Category,
361 sender: mpsc::UnboundedSender<WsKline>,
362 ) -> Result<(), BybitError> {
363 let arr: Vec<String> = subs
364 .into_iter()
365 .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
366 .collect();
367 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
368 self.ws_subscribe(request, category, move |event| {
369 if let WebsocketEvents::KlineEvent(kline) = event {
370 sender
371 .send(kline)
372 .map_err(|e| BybitError::ChannelSendError {
373 underlying: e.to_string(),
374 })?;
375 }
376 Ok(())
377 })
378 .await
379 }
380
381 pub async fn ws_position(
382 &self,
383 cat: Option<Category>,
384 sender: mpsc::UnboundedSender<PositionData>,
385 ) -> Result<(), BybitError> {
386 let sub_str = if let Some(v) = cat {
387 match v {
388 Category::Linear => "position.linear",
389 Category::Inverse => "position.inverse",
390 _ => "",
391 }
392 } else {
393 "position"
394 };
395
396 let request = Subscription::new("subscribe", vec![sub_str]);
397 self.ws_priv_subscribe(request, move |event| {
398 if let WebsocketEvents::PositionEvent(position) = event {
399 for v in position.data {
400 sender.send(v).map_err(|e| BybitError::ChannelSendError {
401 underlying: e.to_string(),
402 })?;
403 }
404 }
405 Ok(())
406 })
407 .await
408 }
409
410 pub async fn ws_executions(
411 &self,
412 cat: Option<Category>,
413 sender: mpsc::UnboundedSender<ExecutionData>,
414 ) -> Result<(), BybitError> {
415 let sub_str = if let Some(v) = cat {
416 match v {
417 Category::Linear => "execution.linear",
418 Category::Inverse => "execution.inverse",
419 Category::Spot => "execution.spot",
420 Category::Option => "execution.option",
421 }
422 } else {
423 "execution"
424 };
425
426 let request = Subscription::new("subscribe", vec![sub_str]);
427 self.ws_priv_subscribe(request, move |event| {
428 if let WebsocketEvents::ExecutionEvent(execute) = event {
429 for v in execute.data {
430 sender.send(v).map_err(|e| BybitError::ChannelSendError {
431 underlying: e.to_string(),
432 })?;
433 }
434 }
435 Ok(())
436 })
437 .await
438 }
439
440 pub async fn ws_fast_exec(
441 &self,
442 sender: mpsc::UnboundedSender<FastExecData>,
443 ) -> Result<(), BybitError> {
444 let sub_str = "execution.fast";
445 let request = Subscription::new("subscribe", vec![sub_str]);
446
447 self.ws_priv_subscribe(request, move |event| {
448 if let WebsocketEvents::FastExecEvent(execution) = event {
449 for v in execution.data {
450 sender.send(v).map_err(|e| BybitError::ChannelSendError {
451 underlying: e.to_string(),
452 })?;
453 }
454 }
455 Ok(())
456 })
457 .await
458 }
459
460 pub async fn ws_orders(
461 &self,
462 cat: Option<Category>,
463 sender: mpsc::UnboundedSender<OrderData>,
464 ) -> Result<(), BybitError> {
465 let sub_str = if let Some(v) = cat {
466 match v {
467 Category::Linear => "order.linear",
468 Category::Inverse => "order.inverse",
469 Category::Spot => "order.spot",
470 Category::Option => "order.option",
471 }
472 } else {
473 "order"
474 };
475
476 let request = Subscription::new("subscribe", vec![sub_str]);
477 self.ws_priv_subscribe(request, move |event| {
478 if let WebsocketEvents::OrderEvent(order) = event {
479 for v in order.data {
480 sender.send(v).map_err(|e| BybitError::ChannelSendError {
481 underlying: e.to_string(),
482 })?;
483 }
484 }
485 Ok(())
486 })
487 .await
488 }
489
490 pub async fn ws_wallet(
491 &self,
492 sender: mpsc::UnboundedSender<WalletData>,
493 ) -> Result<(), BybitError> {
494 let sub_str = "wallet";
495 let request = Subscription::new("subscribe", vec![sub_str]);
496 self.ws_priv_subscribe(request, move |event| {
497 if let WebsocketEvents::Wallet(wallet) = event {
498 for v in wallet.data {
499 sender.send(v).map_err(|e| BybitError::ChannelSendError {
500 underlying: e.to_string(),
501 })?;
502 }
503 }
504 Ok(())
505 })
506 .await
507 }
508
509 pub async fn ws_trade_stream<'a, F>(
510 &self,
511 req: mpsc::UnboundedReceiver<RequestType<'a>>,
512 handler: F,
513 ) -> Result<(), BybitError>
514 where
515 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
516 'a: 'static,
517 {
518 let response = self
519 .client
520 .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
521 .await?;
522 Self::event_loop(response, handler, Some(req)).await?;
523
524 Ok(())
525 }
526
527 pub async fn event_loop<'a, H>(
528 mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
529 mut handler: H,
530 mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
531 ) -> Result<(), BybitError>
532 where
533 H: WebSocketHandler,
534 {
535 let mut interval = Instant::now();
536 loop {
537 let msg = stream.next().await;
538 match msg {
539 Some(Ok(WsMessage::Text(msg))) => {
540 if let Err(_) = handler.handle_msg(&msg) {
541 return Err(BybitError::Base(
542 "Error handling stream message".to_string(),
543 ));
544 }
545 }
546 Some(Err(e)) => {
547 return Err(BybitError::from(e.to_string()));
548 }
549 None => {
550 return Err(BybitError::Base("Stream was closed".to_string()));
551 }
552 _ => {}
553 }
554 if let Some(sender) = order_sender.as_mut() {
555 if let Some(v) = sender.recv().await {
556 let order_req = Self::build_trade_subscription(v, Some(3000));
557 stream.send(WsMessage::Text(order_req)).await?;
558 }
559 }
560
561 if interval.elapsed() > Duration::from_secs(300) {
562 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
563 if order_sender.is_none() {
564 parameters.insert("req_id".into(), generate_random_uid(8).into());
565 }
566 parameters.insert("op".into(), "ping".into());
567 let request = build_json_request(¶meters);
568 let _ = stream
569 .send(WsMessage::Text(request))
570 .await
571 .map_err(BybitError::from);
572 interval = Instant::now();
573 }
574 }
575 }
576}
577
578pub trait WebSocketHandler {
579 type Event;
580 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
581}
582
583impl<F> WebSocketHandler for F
584where
585 F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
586{
587 type Event = WebsocketEvents;
588 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
589 let update: Value = serde_json::from_str(msg)?;
590 if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
591 self(event)?;
592 }
593
594 Ok(())
595 }
596}