1use crate::api::WebsocketAPI;
2use crate::client::Client;
3use crate::errors::BybitError;
4use crate::model::{
5 Category, ExecutionData, FastExecData, LiquidationData, OrderBookUpdate, OrderData,
6 PongResponse, PositionData, RequestType, Subscription, Tickers, WalletData, WebsocketEvents,
7 WsKline, WsTrade,
8};
9use crate::trade::build_ws_orders;
10use crate::util::{build_json_request, generate_random_uid, get_timestamp};
11use futures::{SinkExt, StreamExt};
12use serde_json::{json, Value};
13use std::collections::BTreeMap;
14use std::time::Instant;
15use tokio::net::TcpStream;
16use tokio::sync::mpsc;
17use tokio::time::Duration;
18use tokio_tungstenite::WebSocketStream;
19use tokio_tungstenite::{tungstenite::Message as WsMessage, MaybeTlsStream};
20
21#[derive(Clone)]
22pub struct Stream {
23 pub client: Client,
24}
25
26impl Stream {
27 pub async fn ws_ping(&self, private: bool) -> Result<(), BybitError> {
35 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
36 parameters.insert("req_id".into(), generate_random_uid(8).into());
37 parameters.insert("op".into(), "ping".into());
38 let request = build_json_request(¶meters);
39 let endpoint = if private {
40 WebsocketAPI::Private
41 } else {
42 WebsocketAPI::PublicLinear
43 };
44 let mut response = self
45 .client
46 .wss_connect(endpoint, Some(request), private, None)
47 .await?;
48 let data = response.next().await.unwrap()?;
49 match data {
50 WsMessage::Text(data) => {
51 let response: PongResponse = serde_json::from_str(&data)?;
52 match response {
53 PongResponse::PublicPong(pong) => {
54 println!("Pong received successfully: {:#?}", pong);
55 }
56 PongResponse::PrivatePong(pong) => {
57 println!("Pong received successfully: {:#?}", pong);
58 }
59 }
60 }
61 _ => {}
62 }
63 Ok(())
64 }
65
66 pub async fn ws_priv_subscribe<'b, F>(
67 &self,
68 req: Subscription<'_>,
69 handler: F,
70 ) -> Result<(), BybitError>
71 where
72 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
73 {
74 let request = Self::build_subscription(req);
75 let response = self
76 .client
77 .wss_connect(WebsocketAPI::Private, Some(request), true, Some(10))
78 .await?;
79 match Self::event_loop(response, handler, None).await {
80 Ok(_) => {}
81 Err(_) => {}
82 }
83 Ok(())
84 }
85
86 pub async fn ws_subscribe<'b, F>(
87 &self,
88 req: Subscription<'_>,
89 category: Category,
90 handler: F,
91 ) -> Result<(), BybitError>
92 where
93 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
94 {
95 let endpoint = {
96 match category {
97 Category::Linear => WebsocketAPI::PublicLinear,
98 Category::Inverse => WebsocketAPI::PublicInverse,
99 Category::Spot => WebsocketAPI::PublicSpot,
100 _ => unimplemented!("Option has not been implemented"),
101 }
102 };
103 let request = Self::build_subscription(req);
104 let response = self
105 .client
106 .wss_connect(endpoint, Some(request), false, None)
107 .await?;
108 Self::event_loop(response, handler, None).await?;
109 Ok(())
110 }
111
112 pub fn build_subscription(action: Subscription) -> String {
113 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
114 parameters.insert("req_id".into(), generate_random_uid(8).into());
115 parameters.insert("op".into(), action.op.into());
116 let args_value: Value = action
117 .args
118 .iter()
119 .map(ToString::to_string)
120 .collect::<Vec<_>>()
121 .into();
122 parameters.insert("args".into(), args_value);
123
124 build_json_request(¶meters)
125 }
126
127 pub fn build_trade_subscription(orders: RequestType, recv_window: Option<u64>) -> String {
128 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
129 parameters.insert("reqId".into(), generate_random_uid(16).into());
130 let mut header_map: BTreeMap<String, String> = BTreeMap::new();
131 header_map.insert("X-BAPI-TIMESTAMP".into(), get_timestamp().to_string());
132 header_map.insert(
133 "X-BAPI-RECV-WINDOW".into(),
134 recv_window.unwrap_or(5000).to_string(),
135 );
136 parameters.insert("header".into(), json!(header_map).into());
137 match orders {
138 RequestType::Create(order) => {
139 parameters.insert("op".into(), "order.create".into());
140 parameters.insert(
141 "args".into(),
142 build_ws_orders(RequestType::Create(order)).into(),
143 );
144 }
145 RequestType::Cancel(order) => {
146 parameters.insert("op".into(), "order.cancel".into());
147 parameters.insert(
148 "args".into(),
149 build_ws_orders(RequestType::Cancel(order)).into(),
150 );
151 }
152
153 RequestType::Amend(order) => {
154 parameters.insert("op".into(), "order.amend".into());
155 parameters.insert(
156 "args".into(),
157 build_ws_orders(RequestType::Amend(order)).into(),
158 );
159 }
160 }
161 build_json_request(¶meters)
162 }
163
164 pub async fn ws_orderbook(
178 &self,
179 subs: Vec<(i32, &str)>,
180 category: Category,
181 sender: mpsc::UnboundedSender<OrderBookUpdate>,
182 ) -> Result<(), BybitError> {
183 let arr: Vec<String> = subs
184 .into_iter()
185 .map(|(num, sym)| format!("orderbook.{}.{}", num, sym.to_uppercase()))
186 .collect();
187 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
188 self.ws_subscribe(request, category, move |event| {
189 if let WebsocketEvents::OrderBookEvent(order_book) = event {
190 sender.send(order_book).unwrap();
191 }
192 Ok(())
193 })
194 .await
195 }
196
197 pub async fn ws_trades(
212 &self,
213 subs: Vec<&str>,
214 category: Category,
215 sender: mpsc::UnboundedSender<WsTrade>,
216 ) -> Result<(), BybitError> {
217 let arr: Vec<String> = subs
218 .iter()
219 .map(|&sub| format!("publicTrade.{}", sub.to_uppercase()))
220 .collect();
221 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
222 let handler = move |event| {
223 if let WebsocketEvents::TradeEvent(trades) = event {
224 for trade in trades.data {
225 sender.send(trade).unwrap();
226 }
227 }
228 Ok(())
229 };
230
231 self.ws_subscribe(request, category, handler).await
232 }
233
234 pub async fn ws_tickers(
251 &self,
252 subs: Vec<&str>,
253 category: Category,
254 sender: mpsc::UnboundedSender<Tickers>,
255 ) -> Result<(), BybitError> {
256 let arr: Vec<String> = subs
257 .into_iter()
258 .map(|sub| format!("tickers.{}", sub.to_uppercase()))
259 .collect();
260 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
261
262 let handler = move |event| {
263 if let WebsocketEvents::TickerEvent(tickers) = event {
264 match tickers.data {
265 Tickers::Linear(linear_ticker) => {
266 sender.send(Tickers::Linear(linear_ticker)).unwrap()
267 }
268 Tickers::Spot(spot_ticker) => sender.send(Tickers::Spot(spot_ticker)).unwrap(),
269 }
270 }
271 Ok(())
272 };
273
274 self.ws_subscribe(request, category, handler).await
275 }
276 pub async fn ws_liquidations(
277 &self,
278 subs: Vec<&str>,
279 category: Category,
280 sender: mpsc::UnboundedSender<LiquidationData>,
281 ) -> Result<(), BybitError> {
282 let arr: Vec<String> = subs
283 .into_iter()
284 .map(|sub| format!("liquidation.{}", sub.to_uppercase()))
285 .collect();
286 let request = Subscription::new("subscribe", arr.iter().map(String::as_str).collect());
287
288 let handler = move |event| {
289 if let WebsocketEvents::LiquidationEvent(liquidation) = event {
290 sender.send(liquidation.data).unwrap();
291 }
292 Ok(())
293 };
294
295 self.ws_subscribe(request, category, handler).await
296 }
297 pub async fn ws_klines(
298 &self,
299 subs: Vec<(&str, &str)>,
300 category: Category,
301 sender: mpsc::UnboundedSender<WsKline>,
302 ) -> Result<(), BybitError> {
303 let arr: Vec<String> = subs
304 .into_iter()
305 .map(|(interval, sym)| format!("kline.{}.{}", interval, sym.to_uppercase()))
306 .collect();
307 let request = Subscription::new("subscribe", arr.iter().map(AsRef::as_ref).collect());
308 self.ws_subscribe(request, category, move |event| {
309 if let WebsocketEvents::KlineEvent(kline) = event {
310 sender.send(kline).unwrap();
311 }
312 Ok(())
313 })
314 .await
315 }
316
317 pub async fn ws_position(
318 &self,
319 cat: Option<Category>,
320 sender: mpsc::UnboundedSender<PositionData>,
321 ) -> Result<(), BybitError> {
322 let sub_str = if let Some(v) = cat {
323 match v {
324 Category::Linear => "position.linear",
325 Category::Inverse => "position.inverse",
326 _ => "",
327 }
328 } else {
329 "position"
330 };
331
332 let request = Subscription::new("subscribe", vec![sub_str]);
333 self.ws_priv_subscribe(request, move |event| {
334 if let WebsocketEvents::PositionEvent(position) = event {
335 for v in position.data {
336 sender.send(v).unwrap();
337 }
338 }
339 Ok(())
340 })
341 .await
342 }
343
344 pub async fn ws_executions(
345 &self,
346 cat: Option<Category>,
347 sender: mpsc::UnboundedSender<ExecutionData>,
348 ) -> Result<(), BybitError> {
349 let sub_str = if let Some(v) = cat {
350 match v {
351 Category::Linear => "execution.linear",
352 Category::Inverse => "execution.inverse",
353 Category::Spot => "execution.spot",
354 Category::Option => "execution.option",
355 }
356 } else {
357 "execution"
358 };
359
360 let request = Subscription::new("subscribe", vec![sub_str]);
361 self.ws_priv_subscribe(request, move |event| {
362 if let WebsocketEvents::ExecutionEvent(execute) = event {
363 for v in execute.data {
364 sender.send(v).unwrap();
365 }
366 }
367 Ok(())
368 })
369 .await
370 }
371
372 pub async fn ws_fast_exec(
373 &self,
374 sender: mpsc::UnboundedSender<FastExecData>,
375 ) -> Result<(), BybitError> {
376 let sub_str = "execution.fast";
377 let request = Subscription::new("subscribe", vec![sub_str]);
378
379 self.ws_priv_subscribe(request, move |event| {
380 if let WebsocketEvents::FastExecEvent(execution) = event {
381 for v in execution.data {
382 sender.send(v).unwrap();
383 }
384 }
385 Ok(())
386 })
387 .await
388 }
389
390 pub async fn ws_orders(
391 &self,
392 cat: Option<Category>,
393 sender: mpsc::UnboundedSender<OrderData>,
394 ) -> Result<(), BybitError> {
395 let sub_str = if let Some(v) = cat {
396 match v {
397 Category::Linear => "order.linear",
398 Category::Inverse => "order.inverse",
399 Category::Spot => "order.spot",
400 Category::Option => "order.option",
401 }
402 } else {
403 "order"
404 };
405
406 let request = Subscription::new("subscribe", vec![sub_str]);
407 self.ws_priv_subscribe(request, move |event| {
408 if let WebsocketEvents::OrderEvent(order) = event {
409 for v in order.data {
410 sender.send(v).unwrap();
411 }
412 }
413 Ok(())
414 })
415 .await
416 }
417
418 pub async fn ws_wallet(
419 &self,
420 sender: mpsc::UnboundedSender<WalletData>,
421 ) -> Result<(), BybitError> {
422 let sub_str = "wallet";
423 let request = Subscription::new("subscribe", vec![sub_str]);
424 self.ws_priv_subscribe(request, move |event| {
425 if let WebsocketEvents::Wallet(wallet) = event {
426 for v in wallet.data {
427 sender.send(v).unwrap();
428 }
429 }
430 Ok(())
431 })
432 .await
433 }
434
435 pub async fn ws_trade_stream<'a, F>(
436 &self,
437 req: mpsc::UnboundedReceiver<RequestType<'a>>,
438 handler: F,
439 ) -> Result<(), BybitError>
440 where
441 F: FnMut(WebsocketEvents) -> Result<(), BybitError> + 'static + Send,
442 'a: 'static,
443 {
444 let response = self
445 .client
446 .wss_connect(WebsocketAPI::TradeStream, None, true, Some(10))
447 .await?;
448 Self::event_loop(response, handler, Some(req)).await?;
449
450 Ok(())
451 }
452
453 pub async fn event_loop<'a, H>(
454 mut stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
455 mut handler: H,
456 mut order_sender: Option<mpsc::UnboundedReceiver<RequestType<'_>>>,
457 ) -> Result<(), BybitError>
458 where
459 H: WebSocketHandler,
460 {
461 let mut interval = Instant::now();
462 loop {
463 let msg = stream.next().await;
464 match msg {
465 Some(Ok(WsMessage::Text(msg))) => {
466 if let Err(_) = handler.handle_msg(&msg) {
467 return Err(BybitError::Base(
468 "Error handling stream message".to_string(),
469 ));
470 }
471 }
472 Some(Err(e)) => {
473 return Err(BybitError::from(e.to_string()));
474 }
475 None => {
476 return Err(BybitError::Base("Stream was closed".to_string()));
477 }
478 _ => {}
479 }
480 if let Some(sender) = order_sender.as_mut() {
481 if let Some(v) = sender.recv().await {
482 let order_req = Self::build_trade_subscription(v, Some(3000));
483 stream.send(WsMessage::Text(order_req)).await?;
484 }
485 }
486
487 if interval.elapsed() > Duration::from_secs(300) {
488 let mut parameters: BTreeMap<String, Value> = BTreeMap::new();
489 if order_sender.is_none() {
490 parameters.insert("req_id".into(), generate_random_uid(8).into());
491 }
492 parameters.insert("op".into(), "ping".into());
493 let request = build_json_request(¶meters);
494 let _ = stream
495 .send(WsMessage::Text(request))
496 .await
497 .map_err(BybitError::from);
498 interval = Instant::now();
499 }
500 }
501 }
502}
503
504pub trait WebSocketHandler {
505 type Event;
506 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError>;
507}
508
509impl<F> WebSocketHandler for F
510where
511 F: FnMut(WebsocketEvents) -> Result<(), BybitError>,
512{
513 type Event = WebsocketEvents;
514 fn handle_msg(&mut self, msg: &str) -> Result<(), BybitError> {
515 let update: Value = serde_json::from_str(msg)?;
516 if let Ok(event) = serde_json::from_value::<WebsocketEvents>(update.clone()) {
517 self(event)?;
518 }
519
520 Ok(())
521 }
522}