1use std::str::FromStr;
2use std::sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5};
6use std::time::Duration;
7
8use chrono::{DateTime, TimeZone, Utc};
9use futures::{SinkExt, StreamExt};
10use hmac::{Hmac, Mac};
11use rust_decimal::Decimal;
12use serde::{Deserialize, Serialize};
13use serde_json::{self, json, Value};
14use sha2::Sha256;
15use tokio::net::TcpStream;
16use tokio::sync::mpsc::error::TryRecvError;
17use tokio::sync::{mpsc, Mutex};
18use tokio::time::{interval, MissedTickBehavior};
19use tokio_tungstenite::{
20 connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
21};
22use tracing::{debug, error, info, warn};
23
24use crate::{millis_to_datetime as parse_millis, BybitCredentials};
25
26type HmacSha256 = Hmac<Sha256>;
27
28use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
29use tesser_core::{Candle, Fill, Interval, Order, OrderRequest, OrderType, Side, Tick};
30
31#[derive(Clone, Copy, Debug)]
32pub enum PublicChannel {
33 Linear,
34 Inverse,
35 Spot,
36 Option,
37 Spread,
38}
39
40impl PublicChannel {
41 pub fn as_path(&self) -> &'static str {
42 match self {
43 Self::Linear => "linear",
44 Self::Inverse => "inverse",
45 Self::Spot => "spot",
46 Self::Option => "option",
47 Self::Spread => "spread",
48 }
49 }
50}
51
52impl FromStr for PublicChannel {
53 type Err = BrokerError;
54
55 fn from_str(value: &str) -> Result<Self, Self::Err> {
56 match value.to_lowercase().as_str() {
57 "linear" => Ok(Self::Linear),
58 "inverse" => Ok(Self::Inverse),
59 "spot" => Ok(Self::Spot),
60 "option" => Ok(Self::Option),
61 "spread" => Ok(Self::Spread),
62 other => Err(BrokerError::InvalidRequest(format!(
63 "unsupported Bybit public channel '{other}'"
64 ))),
65 }
66 }
67}
68
69#[derive(Clone, Debug, Serialize)]
70pub enum BybitSubscription {
71 Trades { symbol: String },
72 Kline { symbol: String, interval: Interval },
73 OrderBook { symbol: String, depth: usize },
74}
75
76impl BybitSubscription {
77 fn topic(&self) -> String {
78 match self {
79 Self::Kline { symbol, interval } => {
80 format!("kline.{}.{}", interval.to_bybit(), symbol)
81 }
82 Self::Trades { symbol } => format!("publicTrade.{symbol}"),
83 Self::OrderBook { symbol, depth } => {
84 format!("orderbook.{depth}.{symbol}")
85 }
86 }
87 }
88}
89
90enum WsCommand {
91 Subscribe(String),
92 Shutdown,
93}
94
95pub struct BybitMarketStream {
96 info: BrokerInfo,
97 command_tx: mpsc::UnboundedSender<WsCommand>,
98 tick_rx: Mutex<mpsc::Receiver<Tick>>,
99 candle_rx: Mutex<mpsc::Receiver<Candle>>,
100 order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
101 connection_status: Option<Arc<AtomicBool>>,
102}
103
104impl BybitMarketStream {
105 pub async fn connect_public(
106 base_url: &str,
107 channel: PublicChannel,
108 connection_status: Option<Arc<AtomicBool>>,
109 ) -> BrokerResult<Self> {
110 let endpoint = format!(
111 "{}/v5/public/{}",
112 base_url.trim_end_matches('/'),
113 channel.as_path()
114 );
115 let (ws, _) = connect_async(&endpoint)
116 .await
117 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
118 if let Some(flag) = &connection_status {
119 flag.store(true, Ordering::SeqCst);
120 }
121 let (command_tx, command_rx) = mpsc::unbounded_channel();
122 let (tick_tx, tick_rx) = mpsc::channel(2048);
123 let (candle_tx, candle_rx) = mpsc::channel(1024);
124 let (order_book_tx, order_book_rx) = mpsc::channel(256);
125 let status_for_loop = connection_status.clone();
126 tokio::spawn(async move {
127 if let Err(err) = run_ws_loop(
128 ws,
129 command_rx,
130 tick_tx,
131 candle_tx,
132 order_book_tx,
133 status_for_loop,
134 )
135 .await
136 {
137 error!(error = %err, "bybit ws loop exited unexpectedly");
138 }
139 });
140 Ok(Self {
141 info: BrokerInfo {
142 name: format!("bybit-{}", channel.as_path()),
143 markets: vec![channel.as_path().to_string()],
144 supports_testnet: endpoint.contains("testnet"),
145 },
146 command_tx,
147 tick_rx: Mutex::new(tick_rx),
148 candle_rx: Mutex::new(candle_rx),
149 order_book_rx: Mutex::new(order_book_rx),
150 connection_status,
151 })
152 }
153
154 pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
155 self.connection_status.clone()
156 }
157}
158
159pub async fn connect_private(
160 base_url: &str,
161 creds: &BybitCredentials,
162 connection_status: Option<Arc<AtomicBool>>,
163) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
164 let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
165 let (mut socket, _) = match connect_async(&endpoint).await {
166 Ok(value) => {
167 if let Some(flag) = &connection_status {
168 flag.store(true, Ordering::SeqCst);
169 }
170 value
171 }
172 Err(err) => {
173 if let Some(flag) = &connection_status {
174 flag.store(false, Ordering::SeqCst);
175 }
176 return Err(BrokerError::Transport(err.to_string()));
177 }
178 };
179
180 let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
181 let payload = format!("GET/realtime{expires}");
182 let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
183 .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
184 mac.update(payload.as_bytes());
185 let signature = hex::encode(mac.finalize().into_bytes());
186
187 let auth_payload = json!({
188 "op": "auth",
189 "args": [creds.api_key.clone(), expires, signature],
190 });
191
192 socket
193 .send(Message::Text(auth_payload.to_string()))
194 .await
195 .map_err(|e| BrokerError::Transport(e.to_string()))?;
196
197 if let Some(Ok(Message::Text(text))) = socket.next().await {
198 if let Ok(value) = serde_json::from_str::<Value>(&text) {
199 if value
200 .get("success")
201 .and_then(|v| v.as_bool())
202 .unwrap_or(false)
203 {
204 info!("Private websocket authenticated");
205 } else {
206 warn!(payload = text, "Private websocket auth failed");
207 return Err(BrokerError::Authentication(
208 "private websocket auth failed".into(),
209 ));
210 }
211 }
212 }
213
214 let sub_payload = json!({
215 "op": "subscribe",
216 "args": ["order", "execution"],
217 });
218 socket
219 .send(Message::Text(sub_payload.to_string()))
220 .await
221 .map_err(|e| BrokerError::Transport(e.to_string()))?;
222
223 info!("Subscribed to private order/execution channels");
224
225 Ok(socket)
226}
227
228#[async_trait::async_trait]
229impl MarketStream for BybitMarketStream {
230 type Subscription = BybitSubscription;
231
232 fn name(&self) -> &str {
233 &self.info.name
234 }
235
236 fn info(&self) -> Option<&BrokerInfo> {
237 Some(&self.info)
238 }
239
240 async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
241 let topic = subscription.topic();
242 self.command_tx
243 .send(WsCommand::Subscribe(topic.clone()))
244 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
245 info!(topic, "subscribed to Bybit stream");
246 Ok(())
247 }
248
249 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
250 let mut rx = self.tick_rx.lock().await;
251 match rx.try_recv() {
252 Ok(tick) => Ok(Some(tick)),
253 Err(TryRecvError::Empty) => Ok(None),
254 Err(TryRecvError::Disconnected) => Ok(None),
255 }
256 }
257
258 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
259 let mut rx = self.candle_rx.lock().await;
260 match rx.try_recv() {
261 Ok(candle) => Ok(Some(candle)),
262 Err(TryRecvError::Empty) => Ok(None),
263 Err(TryRecvError::Disconnected) => Ok(None),
264 }
265 }
266
267 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
268 let mut rx = self.order_book_rx.lock().await;
269 match rx.try_recv() {
270 Ok(book) => Ok(Some(book)),
271 Err(TryRecvError::Empty) => Ok(None),
272 Err(TryRecvError::Disconnected) => Ok(None),
273 }
274 }
275}
276
277type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
278
279async fn run_ws_loop(
280 mut socket: WsStream,
281 mut commands: mpsc::UnboundedReceiver<WsCommand>,
282 tick_tx: mpsc::Sender<Tick>,
283 candle_tx: mpsc::Sender<Candle>,
284 order_book_tx: mpsc::Sender<tesser_core::OrderBook>,
285 connection_status: Option<Arc<AtomicBool>>,
286) -> BrokerResult<()> {
287 let mut heartbeat = interval(Duration::from_secs(20));
288 heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
289
290 if let Some(flag) = &connection_status {
291 flag.store(true, Ordering::SeqCst);
292 }
293
294 loop {
295 tokio::select! {
296 cmd = commands.recv() => {
297 match cmd {
298 Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
299 Some(WsCommand::Shutdown) => {
300 let _ = socket.send(Message::Close(None)).await;
301 break;
302 }
303 None => break,
304 }
305 }
306 msg = socket.next() => {
307 match msg {
308 Some(Ok(Message::Ping(payload))) => {
309 socket
310 .send(Message::Pong(payload))
311 .await
312 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
313 }
314 Some(Ok(message)) => handle_message(message, &tick_tx, &candle_tx, &order_book_tx).await?,
315 Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
316 None => break,
317 }
318 }
319 _ = heartbeat.tick() => {
320 send_ping(&mut socket).await?;
321 }
322 }
323 }
324
325 if let Some(flag) = connection_status {
326 flag.store(false, Ordering::SeqCst);
327 }
328
329 Ok(())
330}
331
332async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
333 let payload = json!({
334 "op": "subscribe",
335 "args": [topic],
336 });
337 socket
338 .send(Message::Text(payload.to_string()))
339 .await
340 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
341}
342
343async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
344 let payload = json!({ "op": "ping" });
345 socket
346 .send(Message::Text(payload.to_string()))
347 .await
348 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
349}
350
351async fn handle_message(
352 message: Message,
353 tick_tx: &mpsc::Sender<Tick>,
354 candle_tx: &mpsc::Sender<Candle>,
355 order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
356) -> BrokerResult<()> {
357 match message {
358 Message::Text(text) => {
359 process_text_message(&text, tick_tx, candle_tx, order_book_tx).await;
360 }
361 Message::Binary(bytes) => {
362 if let Ok(text) = String::from_utf8(bytes) {
363 process_text_message(&text, tick_tx, candle_tx, order_book_tx).await;
364 } else {
365 warn!("received non UTF-8 binary payload from Bybit ws");
366 }
367 }
368 Message::Ping(payload) => {
369 debug!(size = payload.len(), "received ping from Bybit");
370 }
371 Message::Pong(_) => {
372 debug!("received pong from Bybit");
373 }
374 Message::Close(frame) => {
375 debug!(?frame, "bybit stream closed");
376 return Ok(());
377 }
378 Message::Frame(_) => {}
379 }
380 Ok(())
381}
382
383async fn process_text_message(
384 text: &str,
385 tick_tx: &mpsc::Sender<Tick>,
386 candle_tx: &mpsc::Sender<Candle>,
387 order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
388) {
389 if let Ok(value) = serde_json::from_str::<Value>(text) {
390 if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
391 if topic.starts_with("publicTrade") {
392 if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
393 forward_trades(payload, tick_tx).await;
394 }
395 } else if topic.starts_with("kline") {
396 if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
397 forward_klines(payload, candle_tx).await;
398 }
399 } else if topic.starts_with("orderbook") {
400 if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
401 forward_order_books(payload, order_book_tx).await;
402 }
403 } else if topic == "order" {
404 if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
405 for order in payload.data {
406 debug!(
407 order_id = %order.order_id,
408 status = %order.order_status,
409 "received ws order update"
410 );
411 }
412 }
413 } else if topic == "execution" {
414 if let Ok(payload) =
415 serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
416 {
417 for exec in payload.data {
418 debug!(exec_id = %exec.exec_id, "received ws execution");
419 }
420 }
421 } else {
422 debug!(topic, "ignoring unsupported topic from Bybit");
423 }
424 return;
425 }
426
427 if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
428 match op {
429 "subscribe" => {
430 let success = value
431 .get("success")
432 .and_then(|v| v.as_bool())
433 .unwrap_or(true);
434 if success {
435 debug!("subscription acknowledged by Bybit");
436 } else {
437 let msg = value
438 .get("ret_msg")
439 .and_then(|v| v.as_str())
440 .unwrap_or("unknown error");
441 warn!(message = msg, "Bybit rejected subscription request");
442 }
443 }
444 "ping" | "pong" => {
445 debug!(payload = ?value, "heartbeat ack from Bybit");
446 }
447 _ => {
448 debug!(payload = ?value, "command response from Bybit");
449 }
450 }
451 }
452 } else {
453 warn!(payload = text, "failed to parse Bybit ws payload");
454 }
455}
456
457#[derive(Deserialize, Debug)]
458struct TradeMessage {
459 _topic: String,
460 data: Vec<TradeEntry>,
461}
462
463#[derive(Deserialize, Debug)]
464struct TradeEntry {
465 #[serde(rename = "T")]
466 timestamp: i64,
467 #[serde(rename = "s")]
468 symbol: String,
469 #[serde(rename = "S")]
470 side: String,
471 #[serde(rename = "v")]
472 size: String,
473 #[serde(rename = "p")]
474 price: String,
475}
476
477#[derive(Deserialize, Debug)]
478struct KlineMessage {
479 topic: String,
480 data: Vec<KlineEntry>,
481}
482
483#[derive(Deserialize, Debug)]
484struct KlineEntry {
485 _start: i64,
486 _end: i64,
487 interval: String,
488 open: String,
489 high: String,
490 low: String,
491 close: String,
492 volume: String,
493 confirm: bool,
494 timestamp: i64,
495}
496
497#[derive(Deserialize, Debug)]
498pub struct PrivateMessage<T> {
499 pub topic: String,
500 pub data: Vec<T>,
501}
502
503#[derive(Deserialize, Debug)]
504pub struct BybitWsOrder {
505 #[serde(rename = "orderId")]
506 pub order_id: String,
507 #[serde(rename = "symbol")]
508 pub symbol: String,
509 #[serde(rename = "side")]
510 pub side: String,
511 #[serde(rename = "orderStatus")]
512 pub order_status: String,
513}
514
515async fn forward_trades(payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
516 for trade in payload.data {
517 if let Some(tick) = build_tick(&trade) {
518 if tick_tx.send(tick).await.is_err() {
519 warn!("dropping trade tick; downstream receiver closed");
520 break;
521 }
522 }
523 }
524}
525
526fn build_tick(entry: &TradeEntry) -> Option<Tick> {
527 let price = entry.price.parse().ok()?;
528 let size = entry.size.parse().ok()?;
529 let side = match entry.side.as_str() {
530 "Buy" => Side::Buy,
531 "Sell" => Side::Sell,
532 _ => return None,
533 };
534 let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
535 Some(Tick {
536 symbol: entry.symbol.clone(),
537 price,
538 size,
539 side,
540 exchange_timestamp,
541 received_at: Utc::now(),
542 })
543}
544
545async fn forward_klines(payload: KlineMessage, candle_tx: &mpsc::Sender<Candle>) {
546 for kline in payload.data {
547 if !kline.confirm {
548 continue;
549 }
550 if let Some(candle) = build_candle(&payload.topic, &kline) {
551 if candle_tx.send(candle).await.is_err() {
552 warn!("dropping kline; downstream receiver closed");
553 break;
554 }
555 }
556 }
557}
558
559fn build_candle(topic: &str, entry: &KlineEntry) -> Option<Candle> {
560 let interval = parse_interval(&entry.interval)?;
561 let symbol = topic.split('.').next_back()?.to_string();
562 Some(Candle {
563 symbol,
564 interval,
565 open: entry.open.parse().ok()?,
566 high: entry.high.parse().ok()?,
567 low: entry.low.parse().ok()?,
568 close: entry.close.parse().ok()?,
569 volume: entry.volume.parse().ok()?,
570 timestamp: millis_to_datetime(entry.timestamp)?,
571 })
572}
573
574fn parse_interval(value: &str) -> Option<Interval> {
575 match value {
576 "1" => Some(Interval::OneMinute),
577 "5" => Some(Interval::FiveMinutes),
578 "15" => Some(Interval::FifteenMinutes),
579 "60" => Some(Interval::OneHour),
580 "240" => Some(Interval::FourHours),
581 "D" | "d" => Some(Interval::OneDay),
582 _ => None,
583 }
584}
585
586#[derive(Deserialize, Debug)]
587struct OrderbookMessage {
588 #[serde(rename = "topic")]
589 _topic: String,
590 #[serde(rename = "type")]
591 _msg_type: String, ts: i64,
593 data: OrderbookData,
594}
595
596#[derive(Deserialize, Debug)]
597struct OrderbookData {
598 s: String,
599 b: Vec<[String; 2]>, a: Vec<[String; 2]>, #[serde(rename = "u")]
602 _u: i64,
603}
604
605async fn forward_order_books(
606 payload: OrderbookMessage,
607 order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
608) {
609 if let Some(book) = build_order_book(payload) {
613 if order_book_tx.send(book).await.is_err() {
614 warn!("dropping order book; downstream receiver closed");
615 }
616 }
617}
618
619fn build_order_book(msg: OrderbookMessage) -> Option<tesser_core::OrderBook> {
620 let to_levels = |entries: &[[String; 2]]| -> Vec<tesser_core::OrderBookLevel> {
621 entries
622 .iter()
623 .filter_map(|entry| {
624 Some(tesser_core::OrderBookLevel {
625 price: entry.first()?.parse().ok()?,
626 size: entry.get(1)?.parse().ok()?,
627 })
628 })
629 .collect()
630 };
631
632 Some(tesser_core::OrderBook {
633 symbol: msg.data.s,
634 bids: to_levels(&msg.data.b),
635 asks: to_levels(&msg.data.a),
636 timestamp: millis_to_datetime(msg.ts)?,
637 })
638}
639
640fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
641 Utc.timestamp_millis_opt(value).single()
642}
643
644impl Drop for BybitMarketStream {
645 fn drop(&mut self) {
646 let _ = self.command_tx.send(WsCommand::Shutdown);
647 }
648}
649
650#[derive(Deserialize, Debug)]
651pub struct BybitWsExecution {
652 #[serde(rename = "execId")]
653 pub exec_id: String,
654 #[serde(rename = "orderId")]
655 pub order_id: String,
656 #[serde(rename = "symbol")]
657 pub symbol: String,
658 #[serde(rename = "execPrice")]
659 pub exec_price: String,
660 #[serde(rename = "execQty")]
661 pub exec_qty: String,
662 #[serde(rename = "side")]
663 pub side: String,
664 #[serde(rename = "execFee")]
665 pub exec_fee: String,
666 #[serde(rename = "execTime")]
667 pub exec_time: String,
668 #[serde(rename = "cumExecQty")]
669 pub cum_exec_qty: String,
670 #[serde(rename = "avgPrice")]
671 pub avg_price: String,
672}
673
674impl BybitWsOrder {
675 pub fn to_tesser_order(&self, existing: Option<&Order>) -> Result<Order, BrokerError> {
676 Ok(Order {
677 id: self.order_id.clone(),
678 request: existing
679 .map(|o| o.request.clone())
680 .unwrap_or_else(|| OrderRequest {
681 symbol: self.symbol.clone(),
682 side: if self.side == "Buy" {
683 Side::Buy
684 } else {
685 Side::Sell
686 },
687 order_type: OrderType::Market,
688 quantity: Decimal::ZERO,
689 price: None,
690 trigger_price: None,
691 time_in_force: None,
692 client_order_id: None,
693 take_profit: None,
694 stop_loss: None,
695 display_quantity: None,
696 }),
697 status: crate::BybitClient::map_order_status(&self.order_status),
698 filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
699 avg_fill_price: existing.and_then(|o| o.avg_fill_price),
700 created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
701 updated_at: Utc::now(),
702 })
703 }
704}
705
706impl BybitWsExecution {
707 pub fn to_tesser_fill(&self) -> Result<Fill, BrokerError> {
708 let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
709 BrokerError::Serialization(format!(
710 "failed to parse exec price {}: {e}",
711 self.exec_price
712 ))
713 })?;
714 let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
715 BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
716 })?;
717 let fee = self.exec_fee.parse::<Decimal>().ok();
718 let timestamp = parse_millis(&self.exec_time);
719 let side = match self.side.as_str() {
720 "Buy" => Side::Buy,
721 "Sell" => Side::Sell,
722 other => {
723 return Err(BrokerError::Serialization(format!(
724 "unhandled execution side: {other}"
725 )))
726 }
727 };
728
729 Ok(Fill {
730 order_id: self.order_id.clone(),
731 symbol: self.symbol.clone(),
732 side,
733 fill_price,
734 fill_quantity,
735 fee,
736 timestamp,
737 })
738 }
739}