1use crate::api::WebsocketAPI;
2use crate::client::Client;
3use crate::errors::BybitError;
4use crate::model::{
5 Category, ExecutionData, FastExecData, LiquidationData, OrderBookUpdate, OrderData,
6 PongResponse, PositionData, RequestType, Subscription, Tickers, WalletData, WebsocketEvents,
7 WsKline, WsTrade,
8};
9use crate::trade::build_ws_orders;
10use crate::util::{build_json_request, generate_random_uid, get_timestamp};
11use futures::{SinkExt, StreamExt};
12use log::trace;
13use serde_json::{json, Value};
14use std::collections::BTreeMap;
15use std::time::Instant;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc;
18use tokio::time::Duration;
19use tokio_tungstenite::WebSocketStream;
20use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
21
22#[derive(Clone)]
23pub struct Stream {
24 pub client: Client,
25}
26
27impl Stream {
28 pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
36 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
37 parameters.insert("req_id".into(), generate_random_uid(8).into());
38 parameters.insert("op".into(), "ping".into());
39 let request = build_json_request(¶meters);
40 let endpoint = if private {
41 WebsocketAPI::Private
42 } else {
43 WebsocketAPI::PublicLinear
44 };
45 let mut response = self
46 .client
47 .wss_connect(endpoint, Some(request), private, None)
48 .await?;
49 let Some(data) = response.next().await else {
50 return Err(BybitError::Base(
51 "Failed to receive ping response".to_string(),
52 ));
53 };
54
55 let data = data
56 .map_err(|e| BybitError::Base(format!("Failed to get ping response, error {}", e)))?;
57 match data {
58 WsMessage::Text(data) => {
59 let response: PongResponse = serde_json::from_str(&data)?;
60 match response {
61 PongResponse::PublicPong(pong) => {
62 trace!("Pong received successfully: {:#?}", pong);
63 }
64 PongResponse::PrivatePong(pong) => {
65 trace!("Pong received successfully: {:#?}", pong);
66 }
67 }
68 }
69 _ => {}
70 }
71 Ok(())
72 }
73
74 pub async fn ws_priv_subscribe<'b, F>(
75 &self,
76 req: Subscription<'_>,
77 handler: F,
78 ) -> Result<(), BybitError>
79 where
80 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
81 {
82 let request = Self::build_subscription(req);
83 let response = self
84 .client
85 .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
86 .await?;
87 match Self::event_loop(response, handler, None).await {
88 Ok(_) => {}
89 Err(_) => {}
90 }
91 Ok(())
92 }
93
94 pub async fn ws_subscribe<'b, F>(
95 &self,
96 req: Subscription<'_>,
97 category: Category,
98 handler: F,
99 ) -> Result<(), BybitError>
100 where
101 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
102 {
103 let endpoint = {
104 match category {
105 Category::Linear => WebsocketAPI::PublicLinear,
106 Category::Inverse => WebsocketAPI::PublicInverse,
107 Category::Spot => WebsocketAPI::PublicSpot,
108 _ => unimplemented!("Option has not been implemented"),
109 }
110 };
111 let request = Self::build_subscription(req);
112 let response = self
113 .client
114 .wss_connect(endpoint, Some(request), false, None)
115 .await?;
116 Self::event_loop(response, handler, None).await?;
117 Ok(())
118 }
119
120 pub fn build_subscription(action: Subscription) -> String {
121 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
122 parameters.insert("req_id".into(), generate_random_uid(8).into());
123 parameters.insert("op".into(), action.op.into());
124 let args_value: Value = action
125 .args
126 .iter()
127 .map(ToString::to_string)
128 .collect::<Vec<_>>()
129 .into();
130 parameters.insert("args".into(), args_value);
131
132 build_json_request(¶meters)
133 }
134
135 pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
136 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
137 parameters.insert("reqId".into(), generate_random_uid(16).into());
138 let mut header_map: BTreeMap<String, String> = BTreeMap::new();
139 header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
140 header_map.insert(
141 "X-BAPI-RECV-WINDOW".into(),
142 recv_window.unwrap_or(5000).to_string(),
143 );
144 parameters.insert("header".into(), json!(header_map).into());
145 match orders {
146 RequestType::Create(order) => {
147 parameters.insert("op".into(), "order.create".into());
148 parameters.insert(
149 "args".into(),
150 build_ws_orders(RequestType::Create(order)).into(),
151 );
152 }
153 RequestType::Cancel(order) => {
154 parameters.insert("op".into(), "order.cancel".into());
155 parameters.insert(
156 "args".into(),
157 build_ws_orders(RequestType::Cancel(order)).into(),
158 );
159 }
160
161 RequestType::Amend(order) => {
162 parameters.insert("op".into(), "order.amend".into());
163 parameters.insert(
164 "args".into(),
165 build_ws_orders(RequestType::Amend(order)).into(),
166 );
167 }
168 }
169 build_json_request(¶meters)
170 }
171
172 pub async fn ws_orderbook(
186 &self,
187 subs: Vec<(i32, &str)>,
188 category: Category,
189 sender: mpsc::UnboundedSender<OrderBookUpdate>,
190 ) -> Result<(), BybitError> {
191 let arr: Vec<String> = subs
192 .into_iter()
193 .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
194 .collect();
195 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
196 self.ws_subscribe(request, category, move |event| {
197 if let WebsocketEvents::OrderBookEvent(order_book) = event {
198 sender
199 .send(order_book)
200 .map_err(|e| BybitError::ChannelSendError {
201 underlying: e.to_string(),
202 })?;
203 }
204 Ok(())
205 })
206 .await
207 }
208
209 pub async fn ws_trades(
224 &self,
225 subs: Vec<&str>,
226 category: Category,
227 sender: mpsc::UnboundedSender<WsTrade>,
228 ) -> Result<(), BybitError> {
229 let arr: Vec<String> = subs
230 .iter()
231 .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
232 .collect();
233 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
234 let handler = move |event| {
235 if let WebsocketEvents::TradeEvent(trades) = event {
236 for trade in trades.data {
237 sender
238 .send(trade)
239 .map_err(|e| BybitError::ChannelSendError {
240 underlying: e.to_string(),
241 })?;
242 }
243 }
244 Ok(())
245 };
246
247 self.ws_subscribe(request, category, handler).await
248 }
249
250 pub async fn ws_tickers(
267 &self,
268 subs: Vec<&str>,
269 category: Category,
270 sender: mpsc::UnboundedSender<Tickers>,
271 ) -> Result<(), BybitError> {
272 let arr: Vec<String> = subs
273 .into_iter()
274 .map(|sub| format!("tickers.{}", sub.to_uppercase()))
275 .collect();
276 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
277
278 let handler =
279 move |event| {
280 if let WebsocketEvents::TickerEvent(tickers) = event {
281 match tickers.data {
282 Tickers::Linear(linear_ticker) => {
283 sender.send(Tickers::Linear(linear_ticker)).map_err(|e| {
284 BybitError::ChannelSendError {
285 underlying: e.to_string(),
286 }
287 })?;
288 }
289 Tickers::Spot(spot_ticker) => sender
290 .send(Tickers::Spot(spot_ticker))
291 .map_err(|e| BybitError::ChannelSendError {
292 underlying: e.to_string(),
293 })?,
294 }
295 }
296 Ok(())
297 };
298
299 self.ws_subscribe(request, category, handler).await
300 }
301 pub async fn ws_liquidations(
302 &self,
303 subs: Vec<&str>,
304 category: Category,
305 sender: mpsc::UnboundedSender<LiquidationData>,
306 ) -> Result<(), BybitError> {
307 let arr: Vec<String> = subs
308 .into_iter()
309 .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
310 .collect();
311 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
312
313 let handler = move |event| {
314 if let WebsocketEvents::LiquidationEvent(liquidation) = event {
315 sender
316 .send(liquidation.data)
317 .map_err(|e| BybitError::ChannelSendError {
318 underlying: e.to_string(),
319 })?;
320 }
321 Ok(())
322 };
323
324 self.ws_subscribe(request, category, handler).await
325 }
326 pub async fn ws_klines(
327 &self,
328 subs: Vec<(&str, &str)>,
329 category: Category,
330 sender: mpsc::UnboundedSender<WsKline>,
331 ) -> Result<(), BybitError> {
332 let arr: Vec<String> = subs
333 .into_iter()
334 .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
335 .collect();
336 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
337 self.ws_subscribe(request, category, move |event| {
338 if let WebsocketEvents::KlineEvent(kline) = event {
339 sender
340 .send(kline)
341 .map_err(|e| BybitError::ChannelSendError {
342 underlying: e.to_string(),
343 })?;
344 }
345 Ok(())
346 })
347 .await
348 }
349
350 pub async fn ws_position(
351 &self,
352 cat: Option<Category>,
353 sender: mpsc::UnboundedSender<PositionData>,
354 ) -> Result<(), BybitError> {
355 let sub_str = if let Some(v) = cat {
356 match v {
357 Category::Linear => "position.linear",
358 Category::Inverse => "position.inverse",
359 _ => "",
360 }
361 } else {
362 "position"
363 };
364
365 let request = Subscription::new("subscribe", vec![sub_str]);
366 self.ws_priv_subscribe(request, move |event| {
367 if let WebsocketEvents::PositionEvent(position) = event {
368 for v in position.data {
369 sender.send(v).map_err(|e| BybitError::ChannelSendError {
370 underlying: e.to_string(),
371 })?;
372 }
373 }
374 Ok(())
375 })
376 .await
377 }
378
379 pub async fn ws_executions(
380 &self,
381 cat: Option<Category>,
382 sender: mpsc::UnboundedSender<ExecutionData>,
383 ) -> Result<(), BybitError> {
384 let sub_str = if let Some(v) = cat {
385 match v {
386 Category::Linear => "execution.linear",
387 Category::Inverse => "execution.inverse",
388 Category::Spot => "execution.spot",
389 Category::Option => "execution.option",
390 }
391 } else {
392 "execution"
393 };
394
395 let request = Subscription::new("subscribe", vec![sub_str]);
396 self.ws_priv_subscribe(request, move |event| {
397 if let WebsocketEvents::ExecutionEvent(execute) = event {
398 for v in execute.data {
399 sender.send(v).map_err(|e| BybitError::ChannelSendError {
400 underlying: e.to_string(),
401 })?;
402 }
403 }
404 Ok(())
405 })
406 .await
407 }
408
409 pub async fn ws_fast_exec(
410 &self,
411 sender: mpsc::UnboundedSender<FastExecData>,
412 ) -> Result<(), BybitError> {
413 let sub_str = "execution.fast";
414 let request = Subscription::new("subscribe", vec![sub_str]);
415
416 self.ws_priv_subscribe(request, move |event| {
417 if let WebsocketEvents::FastExecEvent(execution) = event {
418 for v in execution.data {
419 sender.send(v).map_err(|e| BybitError::ChannelSendError {
420 underlying: e.to_string(),
421 })?;
422 }
423 }
424 Ok(())
425 })
426 .await
427 }
428
429 pub async fn ws_orders(
430 &self,
431 cat: Option<Category>,
432 sender: mpsc::UnboundedSender<OrderData>,
433 ) -> Result<(), BybitError> {
434 let sub_str = if let Some(v) = cat {
435 match v {
436 Category::Linear => "order.linear",
437 Category::Inverse => "order.inverse",
438 Category::Spot => "order.spot",
439 Category::Option => "order.option",
440 }
441 } else {
442 "order"
443 };
444
445 let request = Subscription::new("subscribe", vec![sub_str]);
446 self.ws_priv_subscribe(request, move |event| {
447 if let WebsocketEvents::OrderEvent(order) = event {
448 for v in order.data {
449 sender.send(v).map_err(|e| BybitError::ChannelSendError {
450 underlying: e.to_string(),
451 })?;
452 }
453 }
454 Ok(())
455 })
456 .await
457 }
458
459 pub async fn ws_wallet(
460 &self,
461 sender: mpsc::UnboundedSender<WalletData>,
462 ) -> Result<(), BybitError> {
463 let sub_str = "wallet";
464 let request = Subscription::new("subscribe", vec![sub_str]);
465 self.ws_priv_subscribe(request, move |event| {
466 if let WebsocketEvents::Wallet(wallet) = event {
467 for v in wallet.data {
468 sender.send(v).map_err(|e| BybitError::ChannelSendError {
469 underlying: e.to_string(),
470 })?;
471 }
472 }
473 Ok(())
474 })
475 .await
476 }
477
478 pub async fn ws_trade_stream<'a, F>(
479 &self,
480 req: mpsc::UnboundedReceiver<RequestType<'a>>,
481 handler: F,
482 ) -> Result<(), BybitError>
483 where
484 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
485 'a: 'static,
486 {
487 let response = self
488 .client
489 .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
490 .await?;
491 Self::event_loop(response, handler, Some(req)).await?;
492
493 Ok(())
494 }
495
496 pub async fn event_loop<'a, H>(
497 mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
498 mut handler: H,
499 mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
500 ) -> Result<(), BybitError>
501 where
502 H: WebSocketHandler,
503 {
504 let mut interval = Instant::now();
505 loop {
506 let msg = stream.next().await;
507 match msg {
508 Some(Ok(WsMessage::Text(msg))) => {
509 if let Err(_) = handler.handle_msg(&msg) {
510 return Err(BybitError::Base(
511 "Error handling stream message".to_string(),
512 ));
513 }
514 }
515 Some(Err(e)) => {
516 return Err(BybitError::from(e.to_string()));
517 }
518 None => {
519 return Err(BybitError::Base("Stream was closed".to_string()));
520 }
521 _ => {}
522 }
523 if let Some(sender) = order_sender.as_mut() {
524 if let Some(v) = sender.recv().await {
525 let order_req = Self::build_trade_subscription(v, Some(3000));
526 stream.send(WsMessage::Text(order_req)).await?;
527 }
528 }
529
530 if interval.elapsed() > Duration::from_secs(300) {
531 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
532 if order_sender.is_none() {
533 parameters.insert("req_id".into(), generate_random_uid(8).into());
534 }
535 parameters.insert("op".into(), "ping".into());
536 let request = build_json_request(¶meters);
537 let _ = stream
538 .send(WsMessage::Text(request))
539 .await
540 .map_err(BybitError::from);
541 interval = Instant::now();
542 }
543 }
544 }
545}
546
547pub trait WebSocketHandler {
548 type Event;
549 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
550}
551
552impl<F> WebSocketHandler for F
553where
554 F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
555{
556 type Event = WebsocketEvents;
557 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
558 let update: Value = serde_json::from_str(msg)?;
559 if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
560 self(event)?;
561 }
562
563 Ok(())
564 }
565}