tesser_bybit/
ws.rs

1use std::str::FromStr;
2use std::sync::{
3    atomic::{AtomicBool, Ordering},
4    Arc,
5};
6use std::time::Duration;
7
8use chrono::{DateTime, TimeZone, Utc};
9use futures::{SinkExt, StreamExt};
10use hmac::{Hmac, Mac};
11use rust_decimal::Decimal;
12use serde::{Deserialize, Serialize};
13use serde_json::{self, json, Value};
14use sha2::Sha256;
15use tokio::net::TcpStream;
16use tokio::sync::mpsc::error::TryRecvError;
17use tokio::sync::{mpsc, Mutex};
18use tokio::time::{interval, MissedTickBehavior};
19use tokio_tungstenite::{
20    connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
21};
22use tracing::{debug, error, info, warn};
23
24use crate::{millis_to_datetime as parse_millis, BybitCredentials};
25
26type HmacSha256 = Hmac<Sha256>;
27
28use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
29use tesser_core::{Candle, Fill, Interval, Order, OrderRequest, OrderType, Side, Tick};
30
31#[derive(Clone, Copy, Debug)]
32pub enum PublicChannel {
33    Linear,
34    Inverse,
35    Spot,
36    Option,
37    Spread,
38}
39
40impl PublicChannel {
41    pub fn as_path(&self) -> &'static str {
42        match self {
43            Self::Linear => "linear",
44            Self::Inverse => "inverse",
45            Self::Spot => "spot",
46            Self::Option => "option",
47            Self::Spread => "spread",
48        }
49    }
50}
51
52impl FromStr for PublicChannel {
53    type Err = BrokerError;
54
55    fn from_str(value: &str) -> Result<Self, Self::Err> {
56        match value.to_lowercase().as_str() {
57            "linear" => Ok(Self::Linear),
58            "inverse" => Ok(Self::Inverse),
59            "spot" => Ok(Self::Spot),
60            "option" => Ok(Self::Option),
61            "spread" => Ok(Self::Spread),
62            other => Err(BrokerError::InvalidRequest(format!(
63                "unsupported Bybit public channel '{other}'"
64            ))),
65        }
66    }
67}
68
69#[derive(Clone, Debug, Serialize)]
70pub enum BybitSubscription {
71    Trades { symbol: String },
72    Kline { symbol: String, interval: Interval },
73    OrderBook { symbol: String, depth: usize },
74}
75
76impl BybitSubscription {
77    fn topic(&self) -> String {
78        match self {
79            Self::Kline { symbol, interval } => {
80                format!("kline.{}.{}", interval.to_bybit(), symbol)
81            }
82            Self::Trades { symbol } => format!("publicTrade.{symbol}"),
83            Self::OrderBook { symbol, depth } => {
84                format!("orderbook.{depth}.{symbol}")
85            }
86        }
87    }
88}
89
90enum WsCommand {
91    Subscribe(String),
92    Shutdown,
93}
94
95pub struct BybitMarketStream {
96    info: BrokerInfo,
97    command_tx: mpsc::UnboundedSender<WsCommand>,
98    tick_rx: Mutex<mpsc::Receiver<Tick>>,
99    candle_rx: Mutex<mpsc::Receiver<Candle>>,
100    order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
101    connection_status: Option<Arc<AtomicBool>>,
102}
103
104impl BybitMarketStream {
105    pub async fn connect_public(
106        base_url: &str,
107        channel: PublicChannel,
108        connection_status: Option<Arc<AtomicBool>>,
109    ) -> BrokerResult<Self> {
110        let endpoint = format!(
111            "{}/v5/public/{}",
112            base_url.trim_end_matches('/'),
113            channel.as_path()
114        );
115        let (ws, _) = connect_async(&endpoint)
116            .await
117            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
118        if let Some(flag) = &connection_status {
119            flag.store(true, Ordering::SeqCst);
120        }
121        let (command_tx, command_rx) = mpsc::unbounded_channel();
122        let (tick_tx, tick_rx) = mpsc::channel(2048);
123        let (candle_tx, candle_rx) = mpsc::channel(1024);
124        let (order_book_tx, order_book_rx) = mpsc::channel(256);
125        let status_for_loop = connection_status.clone();
126        tokio::spawn(async move {
127            if let Err(err) = run_ws_loop(
128                ws,
129                command_rx,
130                tick_tx,
131                candle_tx,
132                order_book_tx,
133                status_for_loop,
134            )
135            .await
136            {
137                error!(error = %err, "bybit ws loop exited unexpectedly");
138            }
139        });
140        Ok(Self {
141            info: BrokerInfo {
142                name: format!("bybit-{}", channel.as_path()),
143                markets: vec![channel.as_path().to_string()],
144                supports_testnet: endpoint.contains("testnet"),
145            },
146            command_tx,
147            tick_rx: Mutex::new(tick_rx),
148            candle_rx: Mutex::new(candle_rx),
149            order_book_rx: Mutex::new(order_book_rx),
150            connection_status,
151        })
152    }
153
154    pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
155        self.connection_status.clone()
156    }
157}
158
159pub async fn connect_private(
160    base_url: &str,
161    creds: &BybitCredentials,
162    connection_status: Option<Arc<AtomicBool>>,
163) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
164    let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
165    let (mut socket, _) = match connect_async(&endpoint).await {
166        Ok(value) => {
167            if let Some(flag) = &connection_status {
168                flag.store(true, Ordering::SeqCst);
169            }
170            value
171        }
172        Err(err) => {
173            if let Some(flag) = &connection_status {
174                flag.store(false, Ordering::SeqCst);
175            }
176            return Err(BrokerError::Transport(err.to_string()));
177        }
178    };
179
180    let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
181    let payload = format!("GET/realtime{expires}");
182    let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
183        .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
184    mac.update(payload.as_bytes());
185    let signature = hex::encode(mac.finalize().into_bytes());
186
187    let auth_payload = json!({
188        "op": "auth",
189        "args": [creds.api_key.clone(), expires, signature],
190    });
191
192    socket
193        .send(Message::Text(auth_payload.to_string()))
194        .await
195        .map_err(|e| BrokerError::Transport(e.to_string()))?;
196
197    if let Some(Ok(Message::Text(text))) = socket.next().await {
198        if let Ok(value) = serde_json::from_str::<Value>(&text) {
199            if value
200                .get("success")
201                .and_then(|v| v.as_bool())
202                .unwrap_or(false)
203            {
204                info!("Private websocket authenticated");
205            } else {
206                warn!(payload = text, "Private websocket auth failed");
207                return Err(BrokerError::Authentication(
208                    "private websocket auth failed".into(),
209                ));
210            }
211        }
212    }
213
214    let sub_payload = json!({
215        "op": "subscribe",
216        "args": ["order", "execution"],
217    });
218    socket
219        .send(Message::Text(sub_payload.to_string()))
220        .await
221        .map_err(|e| BrokerError::Transport(e.to_string()))?;
222
223    info!("Subscribed to private order/execution channels");
224
225    Ok(socket)
226}
227
228#[async_trait::async_trait]
229impl MarketStream for BybitMarketStream {
230    type Subscription = BybitSubscription;
231
232    fn name(&self) -> &str {
233        &self.info.name
234    }
235
236    fn info(&self) -> Option<&BrokerInfo> {
237        Some(&self.info)
238    }
239
240    async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
241        let topic = subscription.topic();
242        self.command_tx
243            .send(WsCommand::Subscribe(topic.clone()))
244            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
245        info!(topic, "subscribed to Bybit stream");
246        Ok(())
247    }
248
249    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
250        let mut rx = self.tick_rx.lock().await;
251        match rx.try_recv() {
252            Ok(tick) => Ok(Some(tick)),
253            Err(TryRecvError::Empty) => Ok(None),
254            Err(TryRecvError::Disconnected) => Ok(None),
255        }
256    }
257
258    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
259        let mut rx = self.candle_rx.lock().await;
260        match rx.try_recv() {
261            Ok(candle) => Ok(Some(candle)),
262            Err(TryRecvError::Empty) => Ok(None),
263            Err(TryRecvError::Disconnected) => Ok(None),
264        }
265    }
266
267    async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
268        let mut rx = self.order_book_rx.lock().await;
269        match rx.try_recv() {
270            Ok(book) => Ok(Some(book)),
271            Err(TryRecvError::Empty) => Ok(None),
272            Err(TryRecvError::Disconnected) => Ok(None),
273        }
274    }
275}
276
277type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
278
279async fn run_ws_loop(
280    mut socket: WsStream,
281    mut commands: mpsc::UnboundedReceiver<WsCommand>,
282    tick_tx: mpsc::Sender<Tick>,
283    candle_tx: mpsc::Sender<Candle>,
284    order_book_tx: mpsc::Sender<tesser_core::OrderBook>,
285    connection_status: Option<Arc<AtomicBool>>,
286) -> BrokerResult<()> {
287    let mut heartbeat = interval(Duration::from_secs(20));
288    heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
289
290    if let Some(flag) = &connection_status {
291        flag.store(true, Ordering::SeqCst);
292    }
293
294    loop {
295        tokio::select! {
296            cmd = commands.recv() => {
297                match cmd {
298                    Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
299                    Some(WsCommand::Shutdown) => {
300                        let _ = socket.send(Message::Close(None)).await;
301                        break;
302                    }
303                    None => break,
304                }
305            }
306            msg = socket.next() => {
307                match msg {
308                    Some(Ok(Message::Ping(payload))) => {
309                        socket
310                            .send(Message::Pong(payload))
311                            .await
312                            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
313                    }
314                    Some(Ok(message)) => handle_message(message, &tick_tx, &candle_tx, &order_book_tx).await?,
315                    Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
316                    None => break,
317                }
318            }
319            _ = heartbeat.tick() => {
320                send_ping(&mut socket).await?;
321            }
322        }
323    }
324
325    if let Some(flag) = connection_status {
326        flag.store(false, Ordering::SeqCst);
327    }
328
329    Ok(())
330}
331
332async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
333    let payload = json!({
334        "op": "subscribe",
335        "args": [topic],
336    });
337    socket
338        .send(Message::Text(payload.to_string()))
339        .await
340        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
341}
342
343async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
344    let payload = json!({ "op": "ping" });
345    socket
346        .send(Message::Text(payload.to_string()))
347        .await
348        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
349}
350
351async fn handle_message(
352    message: Message,
353    tick_tx: &mpsc::Sender<Tick>,
354    candle_tx: &mpsc::Sender<Candle>,
355    order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
356) -> BrokerResult<()> {
357    match message {
358        Message::Text(text) => {
359            process_text_message(&text, tick_tx, candle_tx, order_book_tx).await;
360        }
361        Message::Binary(bytes) => {
362            if let Ok(text) = String::from_utf8(bytes) {
363                process_text_message(&text, tick_tx, candle_tx, order_book_tx).await;
364            } else {
365                warn!("received non UTF-8 binary payload from Bybit ws");
366            }
367        }
368        Message::Ping(payload) => {
369            debug!(size = payload.len(), "received ping from Bybit");
370        }
371        Message::Pong(_) => {
372            debug!("received pong from Bybit");
373        }
374        Message::Close(frame) => {
375            debug!(?frame, "bybit stream closed");
376            return Ok(());
377        }
378        Message::Frame(_) => {}
379    }
380    Ok(())
381}
382
383async fn process_text_message(
384    text: &str,
385    tick_tx: &mpsc::Sender<Tick>,
386    candle_tx: &mpsc::Sender<Candle>,
387    order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
388) {
389    if let Ok(value) = serde_json::from_str::<Value>(text) {
390        if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
391            if topic.starts_with("publicTrade") {
392                if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
393                    forward_trades(payload, tick_tx).await;
394                }
395            } else if topic.starts_with("kline") {
396                if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
397                    forward_klines(payload, candle_tx).await;
398                }
399            } else if topic.starts_with("orderbook") {
400                if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
401                    forward_order_books(payload, order_book_tx).await;
402                }
403            } else if topic == "order" {
404                if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
405                    for order in payload.data {
406                        debug!(
407                            order_id = %order.order_id,
408                            status = %order.order_status,
409                            "received ws order update"
410                        );
411                    }
412                }
413            } else if topic == "execution" {
414                if let Ok(payload) =
415                    serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
416                {
417                    for exec in payload.data {
418                        debug!(exec_id = %exec.exec_id, "received ws execution");
419                    }
420                }
421            } else {
422                debug!(topic, "ignoring unsupported topic from Bybit");
423            }
424            return;
425        }
426
427        if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
428            match op {
429                "subscribe" => {
430                    let success = value
431                        .get("success")
432                        .and_then(|v| v.as_bool())
433                        .unwrap_or(true);
434                    if success {
435                        debug!("subscription acknowledged by Bybit");
436                    } else {
437                        let msg = value
438                            .get("ret_msg")
439                            .and_then(|v| v.as_str())
440                            .unwrap_or("unknown error");
441                        warn!(message = msg, "Bybit rejected subscription request");
442                    }
443                }
444                "ping" | "pong" => {
445                    debug!(payload = ?value, "heartbeat ack from Bybit");
446                }
447                _ => {
448                    debug!(payload = ?value, "command response from Bybit");
449                }
450            }
451        }
452    } else {
453        warn!(payload = text, "failed to parse Bybit ws payload");
454    }
455}
456
457#[derive(Deserialize, Debug)]
458struct TradeMessage {
459    _topic: String,
460    data: Vec<TradeEntry>,
461}
462
463#[derive(Deserialize, Debug)]
464struct TradeEntry {
465    #[serde(rename = "T")]
466    timestamp: i64,
467    #[serde(rename = "s")]
468    symbol: String,
469    #[serde(rename = "S")]
470    side: String,
471    #[serde(rename = "v")]
472    size: String,
473    #[serde(rename = "p")]
474    price: String,
475}
476
477#[derive(Deserialize, Debug)]
478struct KlineMessage {
479    topic: String,
480    data: Vec<KlineEntry>,
481}
482
483#[derive(Deserialize, Debug)]
484struct KlineEntry {
485    _start: i64,
486    _end: i64,
487    interval: String,
488    open: String,
489    high: String,
490    low: String,
491    close: String,
492    volume: String,
493    confirm: bool,
494    timestamp: i64,
495}
496
497#[derive(Deserialize, Debug)]
498pub struct PrivateMessage<T> {
499    pub topic: String,
500    pub data: Vec<T>,
501}
502
503#[derive(Deserialize, Debug)]
504pub struct BybitWsOrder {
505    #[serde(rename = "orderId")]
506    pub order_id: String,
507    #[serde(rename = "symbol")]
508    pub symbol: String,
509    #[serde(rename = "side")]
510    pub side: String,
511    #[serde(rename = "orderStatus")]
512    pub order_status: String,
513}
514
515async fn forward_trades(payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
516    for trade in payload.data {
517        if let Some(tick) = build_tick(&trade) {
518            if tick_tx.send(tick).await.is_err() {
519                warn!("dropping trade tick; downstream receiver closed");
520                break;
521            }
522        }
523    }
524}
525
526fn build_tick(entry: &TradeEntry) -> Option<Tick> {
527    let price = entry.price.parse().ok()?;
528    let size = entry.size.parse().ok()?;
529    let side = match entry.side.as_str() {
530        "Buy" => Side::Buy,
531        "Sell" => Side::Sell,
532        _ => return None,
533    };
534    let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
535    Some(Tick {
536        symbol: entry.symbol.clone(),
537        price,
538        size,
539        side,
540        exchange_timestamp,
541        received_at: Utc::now(),
542    })
543}
544
545async fn forward_klines(payload: KlineMessage, candle_tx: &mpsc::Sender<Candle>) {
546    for kline in payload.data {
547        if !kline.confirm {
548            continue;
549        }
550        if let Some(candle) = build_candle(&payload.topic, &kline) {
551            if candle_tx.send(candle).await.is_err() {
552                warn!("dropping kline; downstream receiver closed");
553                break;
554            }
555        }
556    }
557}
558
559fn build_candle(topic: &str, entry: &KlineEntry) -> Option<Candle> {
560    let interval = parse_interval(&entry.interval)?;
561    let symbol = topic.split('.').next_back()?.to_string();
562    Some(Candle {
563        symbol,
564        interval,
565        open: entry.open.parse().ok()?,
566        high: entry.high.parse().ok()?,
567        low: entry.low.parse().ok()?,
568        close: entry.close.parse().ok()?,
569        volume: entry.volume.parse().ok()?,
570        timestamp: millis_to_datetime(entry.timestamp)?,
571    })
572}
573
574fn parse_interval(value: &str) -> Option<Interval> {
575    match value {
576        "1" => Some(Interval::OneMinute),
577        "5" => Some(Interval::FiveMinutes),
578        "15" => Some(Interval::FifteenMinutes),
579        "60" => Some(Interval::OneHour),
580        "240" => Some(Interval::FourHours),
581        "D" | "d" => Some(Interval::OneDay),
582        _ => None,
583    }
584}
585
586#[derive(Deserialize, Debug)]
587struct OrderbookMessage {
588    #[serde(rename = "topic")]
589    _topic: String,
590    #[serde(rename = "type")]
591    _msg_type: String, // "snapshot" or "delta"
592    ts: i64,
593    data: OrderbookData,
594}
595
596#[derive(Deserialize, Debug)]
597struct OrderbookData {
598    s: String,
599    b: Vec<[String; 2]>, // Bids
600    a: Vec<[String; 2]>, // Asks
601    #[serde(rename = "u")]
602    _u: i64,
603}
604
605async fn forward_order_books(
606    payload: OrderbookMessage,
607    order_book_tx: &mpsc::Sender<tesser_core::OrderBook>,
608) {
609    // Note: For a production system, you'd want to maintain a local book and apply deltas.
610    // For simplicity here, we'll treat both snapshots and deltas as full snapshots,
611    // which is sufficient for many microstructure strategies that just need the latest state.
612    if let Some(book) = build_order_book(payload) {
613        if order_book_tx.send(book).await.is_err() {
614            warn!("dropping order book; downstream receiver closed");
615        }
616    }
617}
618
619fn build_order_book(msg: OrderbookMessage) -> Option<tesser_core::OrderBook> {
620    let to_levels = |entries: &[[String; 2]]| -> Vec<tesser_core::OrderBookLevel> {
621        entries
622            .iter()
623            .filter_map(|entry| {
624                Some(tesser_core::OrderBookLevel {
625                    price: entry.first()?.parse().ok()?,
626                    size: entry.get(1)?.parse().ok()?,
627                })
628            })
629            .collect()
630    };
631
632    Some(tesser_core::OrderBook {
633        symbol: msg.data.s,
634        bids: to_levels(&msg.data.b),
635        asks: to_levels(&msg.data.a),
636        timestamp: millis_to_datetime(msg.ts)?,
637    })
638}
639
640fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
641    Utc.timestamp_millis_opt(value).single()
642}
643
644impl Drop for BybitMarketStream {
645    fn drop(&mut self) {
646        let _ = self.command_tx.send(WsCommand::Shutdown);
647    }
648}
649
650#[derive(Deserialize, Debug)]
651pub struct BybitWsExecution {
652    #[serde(rename = "execId")]
653    pub exec_id: String,
654    #[serde(rename = "orderId")]
655    pub order_id: String,
656    #[serde(rename = "symbol")]
657    pub symbol: String,
658    #[serde(rename = "execPrice")]
659    pub exec_price: String,
660    #[serde(rename = "execQty")]
661    pub exec_qty: String,
662    #[serde(rename = "side")]
663    pub side: String,
664    #[serde(rename = "execFee")]
665    pub exec_fee: String,
666    #[serde(rename = "execTime")]
667    pub exec_time: String,
668    #[serde(rename = "cumExecQty")]
669    pub cum_exec_qty: String,
670    #[serde(rename = "avgPrice")]
671    pub avg_price: String,
672}
673
674impl BybitWsOrder {
675    pub fn to_tesser_order(&self, existing: Option<&Order>) -> Result<Order, BrokerError> {
676        Ok(Order {
677            id: self.order_id.clone(),
678            request: existing
679                .map(|o| o.request.clone())
680                .unwrap_or_else(|| OrderRequest {
681                    symbol: self.symbol.clone(),
682                    side: if self.side == "Buy" {
683                        Side::Buy
684                    } else {
685                        Side::Sell
686                    },
687                    order_type: OrderType::Market,
688                    quantity: Decimal::ZERO,
689                    price: None,
690                    trigger_price: None,
691                    time_in_force: None,
692                    client_order_id: None,
693                    take_profit: None,
694                    stop_loss: None,
695                    display_quantity: None,
696                }),
697            status: crate::BybitClient::map_order_status(&self.order_status),
698            filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
699            avg_fill_price: existing.and_then(|o| o.avg_fill_price),
700            created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
701            updated_at: Utc::now(),
702        })
703    }
704}
705
706impl BybitWsExecution {
707    pub fn to_tesser_fill(&self) -> Result<Fill, BrokerError> {
708        let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
709            BrokerError::Serialization(format!(
710                "failed to parse exec price {}: {e}",
711                self.exec_price
712            ))
713        })?;
714        let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
715            BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
716        })?;
717        let fee = self.exec_fee.parse::<Decimal>().ok();
718        let timestamp = parse_millis(&self.exec_time);
719        let side = match self.side.as_str() {
720            "Buy" => Side::Buy,
721            "Sell" => Side::Sell,
722            other => {
723                return Err(BrokerError::Serialization(format!(
724                    "unhandled execution side: {other}"
725                )))
726            }
727        };
728
729        Ok(Fill {
730            order_id: self.order_id.clone(),
731            symbol: self.symbol.clone(),
732            side,
733            fill_price,
734            fill_quantity,
735            fee,
736            timestamp,
737        })
738    }
739}