tesser_bybit/
ws.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc,
6};
7use std::time::Duration;
8
9use chrono::{DateTime, TimeZone, Utc};
10use futures::{SinkExt, StreamExt};
11use hmac::{Hmac, Mac};
12use rust_decimal::Decimal;
13use serde::{Deserialize, Serialize};
14use serde_json::{self, json, Value};
15use sha2::Sha256;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc::error::TryRecvError;
18use tokio::sync::{mpsc, Mutex};
19use tokio::time::{interval, MissedTickBehavior};
20use tokio_tungstenite::{
21    connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
22};
23use tracing::{debug, error, info, warn};
24
25use crate::{millis_to_datetime as parse_millis, BybitCredentials};
26
27type HmacSha256 = Hmac<Sha256>;
28
29use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
30use tesser_core::{
31    AssetId, Candle, ExchangeId, Fill, Interval, LocalOrderBook, Order, OrderBook, OrderBookLevel,
32    OrderRequest, OrderType, Side, Symbol, Tick,
33};
34
35#[derive(Clone, Copy, Debug)]
36pub enum PublicChannel {
37    Linear,
38    Inverse,
39    Spot,
40    Option,
41    Spread,
42}
43
44impl PublicChannel {
45    pub fn as_path(&self) -> &'static str {
46        match self {
47            Self::Linear => "linear",
48            Self::Inverse => "inverse",
49            Self::Spot => "spot",
50            Self::Option => "option",
51            Self::Spread => "spread",
52        }
53    }
54}
55
56impl FromStr for PublicChannel {
57    type Err = BrokerError;
58
59    fn from_str(value: &str) -> Result<Self, Self::Err> {
60        match value.to_lowercase().as_str() {
61            "linear" => Ok(Self::Linear),
62            "inverse" => Ok(Self::Inverse),
63            "spot" => Ok(Self::Spot),
64            "option" => Ok(Self::Option),
65            "spread" => Ok(Self::Spread),
66            other => Err(BrokerError::InvalidRequest(format!(
67                "unsupported Bybit public channel '{other}'"
68            ))),
69        }
70    }
71}
72
73#[derive(Clone, Debug, Serialize)]
74pub enum BybitSubscription {
75    Trades { symbol: String },
76    Kline { symbol: String, interval: Interval },
77    OrderBook { symbol: String, depth: usize },
78}
79
80impl BybitSubscription {
81    fn topic(&self) -> String {
82        match self {
83            Self::Kline { symbol, interval } => {
84                format!("kline.{}.{}", interval.to_bybit(), symbol)
85            }
86            Self::Trades { symbol } => format!("publicTrade.{symbol}"),
87            Self::OrderBook { symbol, depth } => {
88                format!("orderbook.{depth}.{symbol}")
89            }
90        }
91    }
92}
93
94#[derive(Clone, Debug)]
95enum WsCommand {
96    Subscribe(String),
97    Shutdown,
98}
99
100pub struct BybitMarketStream {
101    info: BrokerInfo,
102    command_tx: mpsc::UnboundedSender<WsCommand>,
103    tick_rx: Mutex<mpsc::Receiver<Tick>>,
104    candle_rx: Mutex<mpsc::Receiver<Candle>>,
105    order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
106    connection_status: Option<Arc<AtomicBool>>,
107}
108
109impl BybitMarketStream {
110    pub async fn connect_public(
111        base_url: &str,
112        channel: PublicChannel,
113        connection_status: Option<Arc<AtomicBool>>,
114        exchange: ExchangeId,
115    ) -> BrokerResult<Self> {
116        let endpoint = format!(
117            "{}/v5/public/{}",
118            base_url.trim_end_matches('/'),
119            channel.as_path()
120        );
121        let (ws, _) = connect_async(&endpoint)
122            .await
123            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
124        if let Some(flag) = &connection_status {
125            flag.store(true, Ordering::SeqCst);
126        }
127        let (command_tx, command_rx) = mpsc::unbounded_channel();
128        let command_loop = command_tx.clone();
129        let (tick_tx, tick_rx) = mpsc::channel(2048);
130        let (candle_tx, candle_rx) = mpsc::channel(1024);
131        let (order_book_tx, order_book_rx) = mpsc::channel(256);
132        let status_for_loop = connection_status.clone();
133        let exchange_id = exchange;
134        tokio::spawn(async move {
135            if let Err(err) = run_ws_loop(
136                ws,
137                command_rx,
138                command_loop,
139                tick_tx,
140                candle_tx,
141                order_book_tx,
142                status_for_loop,
143                exchange_id,
144            )
145            .await
146            {
147                error!(error = %err, "bybit ws loop exited unexpectedly");
148            }
149        });
150        Ok(Self {
151            info: BrokerInfo {
152                name: format!("bybit-{}", channel.as_path()),
153                markets: vec![channel.as_path().to_string()],
154                supports_testnet: endpoint.contains("testnet"),
155            },
156            command_tx,
157            tick_rx: Mutex::new(tick_rx),
158            candle_rx: Mutex::new(candle_rx),
159            order_book_rx: Mutex::new(order_book_rx),
160            connection_status,
161        })
162    }
163
164    pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
165        self.connection_status.clone()
166    }
167}
168
169pub async fn connect_private(
170    base_url: &str,
171    creds: &BybitCredentials,
172    connection_status: Option<Arc<AtomicBool>>,
173) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
174    let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
175    let (mut socket, _) = match connect_async(&endpoint).await {
176        Ok(value) => {
177            if let Some(flag) = &connection_status {
178                flag.store(true, Ordering::SeqCst);
179            }
180            value
181        }
182        Err(err) => {
183            if let Some(flag) = &connection_status {
184                flag.store(false, Ordering::SeqCst);
185            }
186            return Err(BrokerError::Transport(err.to_string()));
187        }
188    };
189
190    let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
191    let payload = format!("GET/realtime{expires}");
192    let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
193        .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
194    mac.update(payload.as_bytes());
195    let signature = hex::encode(mac.finalize().into_bytes());
196
197    let auth_payload = json!({
198        "op": "auth",
199        "args": [creds.api_key.clone(), expires, signature],
200    });
201
202    socket
203        .send(Message::Text(auth_payload.to_string()))
204        .await
205        .map_err(|e| BrokerError::Transport(e.to_string()))?;
206
207    if let Some(Ok(Message::Text(text))) = socket.next().await {
208        if let Ok(value) = serde_json::from_str::<Value>(&text) {
209            if value
210                .get("success")
211                .and_then(|v| v.as_bool())
212                .unwrap_or(false)
213            {
214                info!("Private websocket authenticated");
215            } else {
216                warn!(payload = text, "Private websocket auth failed");
217                return Err(BrokerError::Authentication(
218                    "private websocket auth failed".into(),
219                ));
220            }
221        }
222    }
223
224    let sub_payload = json!({
225        "op": "subscribe",
226        "args": ["order", "execution"],
227    });
228    socket
229        .send(Message::Text(sub_payload.to_string()))
230        .await
231        .map_err(|e| BrokerError::Transport(e.to_string()))?;
232
233    info!("Subscribed to private order/execution channels");
234
235    Ok(socket)
236}
237
238#[async_trait::async_trait]
239impl MarketStream for BybitMarketStream {
240    type Subscription = BybitSubscription;
241
242    fn name(&self) -> &str {
243        &self.info.name
244    }
245
246    fn info(&self) -> Option<&BrokerInfo> {
247        Some(&self.info)
248    }
249
250    async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
251        let topic = subscription.topic();
252        self.command_tx
253            .send(WsCommand::Subscribe(topic.clone()))
254            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
255        info!(topic, "subscribed to Bybit stream");
256        Ok(())
257    }
258
259    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
260        let mut rx = self.tick_rx.lock().await;
261        match rx.try_recv() {
262            Ok(tick) => Ok(Some(tick)),
263            Err(TryRecvError::Empty) => Ok(None),
264            Err(TryRecvError::Disconnected) => Ok(None),
265        }
266    }
267
268    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
269        let mut rx = self.candle_rx.lock().await;
270        match rx.try_recv() {
271            Ok(candle) => Ok(Some(candle)),
272            Err(TryRecvError::Empty) => Ok(None),
273            Err(TryRecvError::Disconnected) => Ok(None),
274        }
275    }
276
277    async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
278        let mut rx = self.order_book_rx.lock().await;
279        match rx.try_recv() {
280            Ok(book) => Ok(Some(book)),
281            Err(TryRecvError::Empty) => Ok(None),
282            Err(TryRecvError::Disconnected) => Ok(None),
283        }
284    }
285}
286
287type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
288
289#[allow(clippy::too_many_arguments)]
290async fn run_ws_loop(
291    mut socket: WsStream,
292    mut commands: mpsc::UnboundedReceiver<WsCommand>,
293    command_tx: mpsc::UnboundedSender<WsCommand>,
294    tick_tx: mpsc::Sender<Tick>,
295    candle_tx: mpsc::Sender<Candle>,
296    order_book_tx: mpsc::Sender<OrderBook>,
297    connection_status: Option<Arc<AtomicBool>>,
298    exchange: ExchangeId,
299) -> BrokerResult<()> {
300    let mut heartbeat = interval(Duration::from_secs(20));
301    heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
302
303    if let Some(flag) = &connection_status {
304        flag.store(true, Ordering::SeqCst);
305    }
306
307    let mut book_manager = BookManager::new(exchange, order_book_tx.clone(), command_tx);
308
309    loop {
310        tokio::select! {
311            cmd = commands.recv() => {
312                match cmd {
313                    Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
314                    Some(WsCommand::Shutdown) => {
315                        let _ = socket.send(Message::Close(None)).await;
316                        break;
317                    }
318                    None => break,
319                }
320            }
321            msg = socket.next() => {
322                match msg {
323                    Some(Ok(Message::Ping(payload))) => {
324                        socket
325                            .send(Message::Pong(payload))
326                            .await
327                            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
328                    }
329                    Some(Ok(message)) => {
330                        handle_message(
331                            message,
332                            &tick_tx,
333                            &candle_tx,
334                            &mut book_manager,
335                            exchange,
336                        )
337                        .await?
338                    }
339                    Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
340                    None => break,
341                }
342            }
343            _ = heartbeat.tick() => {
344                send_ping(&mut socket).await?;
345            }
346        }
347    }
348
349    if let Some(flag) = connection_status {
350        flag.store(false, Ordering::SeqCst);
351    }
352
353    Ok(())
354}
355
356async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
357    let payload = json!({
358        "op": "subscribe",
359        "args": [topic],
360    });
361    socket
362        .send(Message::Text(payload.to_string()))
363        .await
364        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
365}
366
367async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
368    let payload = json!({ "op": "ping" });
369    socket
370        .send(Message::Text(payload.to_string()))
371        .await
372        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
373}
374
375async fn handle_message(
376    message: Message,
377    tick_tx: &mpsc::Sender<Tick>,
378    candle_tx: &mpsc::Sender<Candle>,
379    book_manager: &mut BookManager,
380    exchange: ExchangeId,
381) -> BrokerResult<()> {
382    match message {
383        Message::Text(text) => {
384            process_text_message(&text, tick_tx, candle_tx, book_manager, exchange).await;
385        }
386        Message::Binary(bytes) => {
387            if let Ok(text) = String::from_utf8(bytes) {
388                process_text_message(&text, tick_tx, candle_tx, book_manager, exchange).await;
389            } else {
390                warn!("received non UTF-8 binary payload from Bybit ws");
391            }
392        }
393        Message::Ping(payload) => {
394            debug!(size = payload.len(), "received ping from Bybit");
395        }
396        Message::Pong(_) => {
397            debug!("received pong from Bybit");
398        }
399        Message::Close(frame) => {
400            debug!(?frame, "bybit stream closed");
401            return Ok(());
402        }
403        Message::Frame(_) => {}
404    }
405    Ok(())
406}
407
408async fn process_text_message(
409    text: &str,
410    tick_tx: &mpsc::Sender<Tick>,
411    candle_tx: &mpsc::Sender<Candle>,
412    book_manager: &mut BookManager,
413    exchange: ExchangeId,
414) {
415    if let Ok(value) = serde_json::from_str::<Value>(text) {
416        if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
417            if topic.starts_with("publicTrade") {
418                if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
419                    forward_trades(exchange, payload, tick_tx).await;
420                }
421            } else if topic.starts_with("kline") {
422                if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
423                    forward_klines(exchange, payload, candle_tx).await;
424                }
425            } else if topic.starts_with("orderbook") {
426                if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
427                    book_manager.handle(payload).await;
428                }
429            } else if topic == "order" {
430                if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
431                    for order in payload.data {
432                        debug!(
433                            order_id = %order.order_id,
434                            status = %order.order_status,
435                            "received ws order update"
436                        );
437                    }
438                }
439            } else if topic == "execution" {
440                if let Ok(payload) =
441                    serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
442                {
443                    for exec in payload.data {
444                        debug!(exec_id = %exec.exec_id, "received ws execution");
445                    }
446                }
447            } else {
448                debug!(topic, "ignoring unsupported topic from Bybit");
449            }
450            return;
451        }
452
453        if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
454            match op {
455                "subscribe" => {
456                    let success = value
457                        .get("success")
458                        .and_then(|v| v.as_bool())
459                        .unwrap_or(true);
460                    if success {
461                        debug!("subscription acknowledged by Bybit");
462                    } else {
463                        let msg = value
464                            .get("ret_msg")
465                            .and_then(|v| v.as_str())
466                            .unwrap_or("unknown error");
467                        warn!(message = msg, "Bybit rejected subscription request");
468                    }
469                }
470                "ping" | "pong" => {
471                    debug!(payload = ?value, "heartbeat ack from Bybit");
472                }
473                _ => {
474                    debug!(payload = ?value, "command response from Bybit");
475                }
476            }
477        }
478    } else {
479        warn!(payload = text, "failed to parse Bybit ws payload");
480    }
481}
482
483#[derive(Deserialize, Debug)]
484struct TradeMessage {
485    _topic: String,
486    data: Vec<TradeEntry>,
487}
488
489#[derive(Deserialize, Debug)]
490struct TradeEntry {
491    #[serde(rename = "T")]
492    timestamp: i64,
493    #[serde(rename = "s")]
494    symbol: String,
495    #[serde(rename = "S")]
496    side: String,
497    #[serde(rename = "v")]
498    size: String,
499    #[serde(rename = "p")]
500    price: String,
501}
502
503#[derive(Deserialize, Debug)]
504struct KlineMessage {
505    topic: String,
506    data: Vec<KlineEntry>,
507}
508
509#[derive(Deserialize, Debug)]
510struct KlineEntry {
511    _start: i64,
512    _end: i64,
513    interval: String,
514    open: String,
515    high: String,
516    low: String,
517    close: String,
518    volume: String,
519    confirm: bool,
520    timestamp: i64,
521}
522
523#[derive(Deserialize, Debug)]
524pub struct PrivateMessage<T> {
525    pub topic: String,
526    pub data: Vec<T>,
527}
528
529#[derive(Deserialize, Debug)]
530pub struct BybitWsOrder {
531    #[serde(rename = "orderId")]
532    pub order_id: String,
533    #[serde(rename = "symbol")]
534    pub symbol: String,
535    #[serde(rename = "side")]
536    pub side: String,
537    #[serde(rename = "orderStatus")]
538    pub order_status: String,
539}
540
541async fn forward_trades(exchange: ExchangeId, payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
542    for trade in payload.data {
543        if let Some(tick) = build_tick(exchange, &trade) {
544            if tick_tx.send(tick).await.is_err() {
545                warn!("dropping trade tick; downstream receiver closed");
546                break;
547            }
548        }
549    }
550}
551
552fn build_tick(exchange: ExchangeId, entry: &TradeEntry) -> Option<Tick> {
553    let price = entry.price.parse().ok()?;
554    let size = entry.size.parse().ok()?;
555    let side = match entry.side.as_str() {
556        "Buy" => Side::Buy,
557        "Sell" => Side::Sell,
558        _ => return None,
559    };
560    let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
561    Some(Tick {
562        symbol: Symbol::from_code(exchange, &entry.symbol),
563        price,
564        size,
565        side,
566        exchange_timestamp,
567        received_at: Utc::now(),
568    })
569}
570
571async fn forward_klines(
572    exchange: ExchangeId,
573    payload: KlineMessage,
574    candle_tx: &mpsc::Sender<Candle>,
575) {
576    for kline in payload.data {
577        if !kline.confirm {
578            continue;
579        }
580        if let Some(candle) = build_candle(exchange, &payload.topic, &kline) {
581            if candle_tx.send(candle).await.is_err() {
582                warn!("dropping kline; downstream receiver closed");
583                break;
584            }
585        }
586    }
587}
588
589fn build_candle(exchange: ExchangeId, topic: &str, entry: &KlineEntry) -> Option<Candle> {
590    let interval = parse_interval(&entry.interval)?;
591    let symbol = topic.split('.').next_back()?.to_string();
592    Some(Candle {
593        symbol: Symbol::from_code(exchange, &symbol),
594        interval,
595        open: entry.open.parse().ok()?,
596        high: entry.high.parse().ok()?,
597        low: entry.low.parse().ok()?,
598        close: entry.close.parse().ok()?,
599        volume: entry.volume.parse().ok()?,
600        timestamp: millis_to_datetime(entry.timestamp)?,
601    })
602}
603
604fn parse_interval(value: &str) -> Option<Interval> {
605    match value {
606        "1" => Some(Interval::OneMinute),
607        "5" => Some(Interval::FiveMinutes),
608        "15" => Some(Interval::FifteenMinutes),
609        "60" => Some(Interval::OneHour),
610        "240" => Some(Interval::FourHours),
611        "D" | "d" => Some(Interval::OneDay),
612        _ => None,
613    }
614}
615
616fn parse_levels(entries: &[[String; 2]]) -> Option<Vec<(Decimal, Decimal)>> {
617    let mut out = Vec::with_capacity(entries.len());
618    for entry in entries {
619        let price = entry.first()?.parse().ok()?;
620        let qty = entry.get(1)?.parse().ok()?;
621        out.push((price, qty));
622    }
623    Some(out)
624}
625
626fn parse_topic(topic: &str) -> Option<(usize, String)> {
627    let mut parts = topic.split('.');
628    let kind = parts.next()?;
629    if kind != "orderbook" {
630        return None;
631    }
632    let depth = parts.next()?.parse().ok()?;
633    let symbol = parts.next()?.to_string();
634    Some((depth, symbol))
635}
636
637#[derive(Deserialize, Debug)]
638struct OrderbookMessage {
639    topic: String,
640    #[serde(rename = "type")]
641    msg_type: String, // "snapshot" or "delta"
642    ts: i64,
643    data: Vec<OrderbookData>,
644}
645
646#[derive(Clone, Deserialize, Debug)]
647struct OrderbookData {
648    s: String,
649    b: Vec<[String; 2]>, // Bids
650    a: Vec<[String; 2]>, // Asks
651    #[serde(rename = "u")]
652    update_id: i64,
653    #[serde(rename = "seq", default)]
654    seq: Option<i64>,
655    #[serde(rename = "prev_seq", default)]
656    prev_seq: Option<i64>,
657    #[serde(rename = "pu", default)]
658    prev_update_id: Option<i64>,
659    #[serde(rename = "checksum", default)]
660    checksum: Option<u32>,
661}
662
663impl OrderbookData {
664    fn sequence(&self) -> i64 {
665        self.seq.unwrap_or(self.update_id)
666    }
667
668    fn previous_sequence(&self) -> Option<i64> {
669        self.prev_seq.or(self.prev_update_id)
670    }
671}
672
673struct BookManager {
674    streams: HashMap<String, SymbolBook>,
675    order_book_tx: mpsc::Sender<OrderBook>,
676    command_tx: mpsc::UnboundedSender<WsCommand>,
677    exchange: ExchangeId,
678}
679
680impl BookManager {
681    fn new(
682        exchange: ExchangeId,
683        order_book_tx: mpsc::Sender<OrderBook>,
684        command_tx: mpsc::UnboundedSender<WsCommand>,
685    ) -> Self {
686        Self {
687            streams: HashMap::new(),
688            order_book_tx,
689            command_tx,
690            exchange,
691        }
692    }
693
694    async fn handle(&mut self, payload: OrderbookMessage) {
695        let Some((depth, _)) = parse_topic(&payload.topic) else {
696            return;
697        };
698        let Some(data) = payload.data.into_iter().next() else {
699            return;
700        };
701        let symbol = data.s.clone();
702        let stream = self
703            .streams
704            .entry(payload.topic.clone())
705            .or_insert_with(|| {
706                SymbolBook::new(self.exchange, payload.topic.clone(), symbol, depth)
707            });
708
709        match stream.ingest(payload.msg_type.as_str(), data, payload.ts) {
710            BookUpdate::Pending => {}
711            BookUpdate::OutOfSync => {
712                warn!(topic = %payload.topic, "order book sequence gap detected; resubscribing");
713                let _ = self.command_tx.send(WsCommand::Subscribe(payload.topic));
714            }
715            BookUpdate::Updates(mut books) => {
716                for book in books.drain(..) {
717                    if self.order_book_tx.send(book).await.is_err() {
718                        warn!("dropping order book; downstream receiver closed");
719                        break;
720                    }
721                }
722            }
723        }
724    }
725}
726
727#[derive(Clone)]
728struct BookLevel {
729    price: Decimal,
730    quantity: Decimal,
731}
732
733struct PendingDelta {
734    bids: Vec<BookLevel>,
735    asks: Vec<BookLevel>,
736    seq: i64,
737    prev_seq: Option<i64>,
738    ts: i64,
739}
740
741impl PendingDelta {
742    fn from_data(data: OrderbookData, ts: i64) -> Option<Self> {
743        let bids = parse_levels(&data.b)?
744            .into_iter()
745            .map(|(price, quantity)| BookLevel { price, quantity })
746            .collect();
747        let asks = parse_levels(&data.a)?
748            .into_iter()
749            .map(|(price, quantity)| BookLevel { price, quantity })
750            .collect();
751        Some(Self {
752            bids,
753            asks,
754            seq: data.sequence(),
755            prev_seq: data.previous_sequence(),
756            ts,
757        })
758    }
759}
760
761struct SymbolBook {
762    exchange: ExchangeId,
763    symbol: String,
764    depth: usize,
765    book: LocalOrderBook,
766    last_seq: Option<i64>,
767    synced: bool,
768    pending: Vec<PendingDelta>,
769    last_checksum: Option<u32>,
770}
771
772impl SymbolBook {
773    fn new(exchange: ExchangeId, _topic: String, symbol: String, depth: usize) -> Self {
774        Self {
775            exchange,
776            symbol,
777            depth,
778            book: LocalOrderBook::new(),
779            last_seq: None,
780            synced: false,
781            pending: Vec::new(),
782            last_checksum: None,
783        }
784    }
785
786    fn ingest(&mut self, msg_type: &str, data: OrderbookData, ts: i64) -> BookUpdate {
787        match msg_type {
788            "snapshot" => self.apply_snapshot(data, ts),
789            "delta" => self.apply_delta(data, ts),
790            _ => BookUpdate::Pending,
791        }
792    }
793
794    fn apply_snapshot(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
795        self.last_checksum = data.checksum;
796        let Some(snapshot_bids) = parse_levels(&data.b) else {
797            return BookUpdate::Pending;
798        };
799        let Some(snapshot_asks) = parse_levels(&data.a) else {
800            return BookUpdate::Pending;
801        };
802        self.book.load_snapshot(&snapshot_bids, &snapshot_asks);
803        self.last_seq = Some(data.sequence());
804        self.synced = true;
805        let mut updates = Vec::new();
806        if let Some(book) = self.snapshot(ts) {
807            updates.push(book);
808        }
809        let pending = std::mem::take(&mut self.pending);
810        for delta in pending {
811            match self.apply_pending(delta) {
812                ApplyOutcome::Gap => return BookUpdate::OutOfSync,
813                ApplyOutcome::Updates(mut book_updates) => updates.append(&mut book_updates),
814                ApplyOutcome::Pending => {}
815            }
816        }
817        BookUpdate::Updates(updates)
818    }
819
820    fn apply_delta(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
821        self.last_checksum = data.checksum;
822        let Some(delta) = PendingDelta::from_data(data, ts) else {
823            return BookUpdate::Pending;
824        };
825        if !self.synced {
826            self.pending.push(delta);
827            return BookUpdate::Pending;
828        }
829        match self.apply_pending(delta) {
830            ApplyOutcome::Gap => BookUpdate::OutOfSync,
831            ApplyOutcome::Pending => BookUpdate::Pending,
832            ApplyOutcome::Updates(updates) => BookUpdate::Updates(updates),
833        }
834    }
835
836    fn apply_pending(&mut self, delta: PendingDelta) -> ApplyOutcome {
837        if let Some(last) = self.last_seq {
838            if let Some(prev) = delta.prev_seq {
839                if prev != last {
840                    self.reset();
841                    return ApplyOutcome::Gap;
842                }
843            } else if delta.seq - 1 != last {
844                self.reset();
845                return ApplyOutcome::Gap;
846            }
847        } else {
848            self.pending.push(delta);
849            return ApplyOutcome::Pending;
850        }
851
852        for level in &delta.bids {
853            self.book
854                .apply_delta(Side::Buy, level.price, level.quantity);
855        }
856        for level in &delta.asks {
857            self.book
858                .apply_delta(Side::Sell, level.price, level.quantity);
859        }
860        self.last_seq = Some(delta.seq);
861
862        if let Some(book) = self.snapshot(delta.ts) {
863            ApplyOutcome::Updates(vec![book])
864        } else {
865            ApplyOutcome::Updates(Vec::new())
866        }
867    }
868
869    fn snapshot(&self, ts: i64) -> Option<OrderBook> {
870        if self.book.is_empty() {
871            return None;
872        }
873        let timestamp = millis_to_datetime(ts)?;
874        let bids = self
875            .book
876            .bid_levels(self.depth)
877            .into_iter()
878            .map(|(price, size)| OrderBookLevel { price, size })
879            .collect::<Vec<_>>();
880        let asks = self
881            .book
882            .ask_levels(self.depth)
883            .into_iter()
884            .map(|(price, size)| OrderBookLevel { price, size })
885            .collect::<Vec<_>>();
886        Some(OrderBook {
887            symbol: Symbol::from_code(self.exchange, &self.symbol),
888            bids,
889            asks,
890            timestamp,
891            exchange_checksum: self.last_checksum,
892            local_checksum: Some(self.book.checksum(self.depth)),
893        })
894    }
895
896    fn reset(&mut self) {
897        self.synced = false;
898        self.last_seq = None;
899        self.pending.clear();
900    }
901}
902
903enum ApplyOutcome {
904    Updates(Vec<OrderBook>),
905    Pending,
906    Gap,
907}
908
909enum BookUpdate {
910    Updates(Vec<OrderBook>),
911    Pending,
912    OutOfSync,
913}
914
915fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
916    Utc.timestamp_millis_opt(value).single()
917}
918
919impl Drop for BybitMarketStream {
920    fn drop(&mut self) {
921        let _ = self.command_tx.send(WsCommand::Shutdown);
922    }
923}
924
925#[derive(Deserialize, Debug)]
926pub struct BybitWsExecution {
927    #[serde(rename = "execId")]
928    pub exec_id: String,
929    #[serde(rename = "orderId")]
930    pub order_id: String,
931    #[serde(rename = "symbol")]
932    pub symbol: String,
933    #[serde(rename = "execPrice")]
934    pub exec_price: String,
935    #[serde(rename = "execQty")]
936    pub exec_qty: String,
937    #[serde(rename = "side")]
938    pub side: String,
939    #[serde(rename = "execFee")]
940    pub exec_fee: String,
941    #[serde(rename = "feeCurrency")]
942    pub fee_currency: Option<String>,
943    #[serde(rename = "execTime")]
944    pub exec_time: String,
945    #[serde(rename = "cumExecQty")]
946    pub cum_exec_qty: String,
947    #[serde(rename = "avgPrice")]
948    pub avg_price: String,
949}
950
951impl BybitWsOrder {
952    pub fn to_tesser_order(
953        &self,
954        exchange: ExchangeId,
955        existing: Option<&Order>,
956    ) -> Result<Order, BrokerError> {
957        Ok(Order {
958            id: self.order_id.clone(),
959            request: existing
960                .map(|o| o.request.clone())
961                .unwrap_or_else(|| OrderRequest {
962                    symbol: Symbol::from_code(exchange, &self.symbol),
963                    side: if self.side == "Buy" {
964                        Side::Buy
965                    } else {
966                        Side::Sell
967                    },
968                    order_type: OrderType::Market,
969                    quantity: Decimal::ZERO,
970                    price: None,
971                    trigger_price: None,
972                    time_in_force: None,
973                    client_order_id: None,
974                    take_profit: None,
975                    stop_loss: None,
976                    display_quantity: None,
977                }),
978            status: crate::BybitClient::map_order_status(&self.order_status),
979            filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
980            avg_fill_price: existing.and_then(|o| o.avg_fill_price),
981            created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
982            updated_at: Utc::now(),
983        })
984    }
985}
986
987impl BybitWsExecution {
988    pub fn to_tesser_fill(&self, exchange: ExchangeId) -> Result<Fill, BrokerError> {
989        let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
990            BrokerError::Serialization(format!(
991                "failed to parse exec price {}: {e}",
992                self.exec_price
993            ))
994        })?;
995        let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
996            BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
997        })?;
998        let fee = self.exec_fee.parse::<Decimal>().ok();
999        let timestamp = parse_millis(&self.exec_time);
1000        let side = match self.side.as_str() {
1001            "Buy" => Side::Buy,
1002            "Sell" => Side::Sell,
1003            other => {
1004                return Err(BrokerError::Serialization(format!(
1005                    "unhandled execution side: {other}"
1006                )))
1007            }
1008        };
1009
1010        let fee_asset = self
1011            .fee_currency
1012            .as_deref()
1013            .filter(|code| !code.is_empty())
1014            .map(|code| AssetId::from_code(exchange, code));
1015        Ok(Fill {
1016            order_id: self.order_id.clone(),
1017            symbol: Symbol::from_code(exchange, &self.symbol),
1018            side,
1019            fill_price,
1020            fill_quantity,
1021            fee,
1022            fee_asset,
1023            timestamp,
1024        })
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031
1032    fn sample_levels(levels: &[(&str, &str)]) -> Vec<[String; 2]> {
1033        levels
1034            .iter()
1035            .map(|(price, qty)| [price.to_string(), qty.to_string()])
1036            .collect()
1037    }
1038
1039    fn sample_data(
1040        symbol: &str,
1041        bids: &[(&str, &str)],
1042        asks: &[(&str, &str)],
1043        seq: i64,
1044        prev_seq: Option<i64>,
1045    ) -> OrderbookData {
1046        OrderbookData {
1047            s: symbol.into(),
1048            b: sample_levels(bids),
1049            a: sample_levels(asks),
1050            update_id: seq,
1051            seq: Some(seq),
1052            prev_seq,
1053            prev_update_id: None,
1054            checksum: None,
1055        }
1056    }
1057
1058    #[tokio::test]
1059    async fn book_manager_applies_snapshot_and_deltas() {
1060        let (book_tx, mut book_rx) = mpsc::channel(8);
1061        let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
1062        let exchange = ExchangeId::from("bybit_linear");
1063        let mut manager = BookManager::new(exchange, book_tx, cmd_tx);
1064
1065        let snapshot = OrderbookMessage {
1066            topic: "orderbook.2.BTCUSDT".into(),
1067            msg_type: "snapshot".into(),
1068            ts: 1,
1069            data: vec![sample_data(
1070                "BTCUSDT",
1071                &[("100", "1"), ("99", "2")],
1072                &[("101", "1"), ("102", "2")],
1073                10,
1074                Some(9),
1075            )],
1076        };
1077        manager.handle(snapshot).await;
1078        let first = book_rx.recv().await.expect("snapshot missing");
1079        assert_eq!(first.bids[0].price, Decimal::from(100));
1080        assert_eq!(first.asks[0].price, Decimal::from(101));
1081
1082        let delta = OrderbookMessage {
1083            topic: "orderbook.2.BTCUSDT".into(),
1084            msg_type: "delta".into(),
1085            ts: 2,
1086            data: vec![sample_data(
1087                "BTCUSDT",
1088                &[("100", "0"), ("98", "1")],
1089                &[("101", "2")],
1090                11,
1091                Some(10),
1092            )],
1093        };
1094        manager.handle(delta).await;
1095        let update = book_rx.recv().await.expect("delta missing");
1096        assert_eq!(update.bids.len(), 2);
1097        assert_eq!(update.bids[1].price, Decimal::from(98));
1098        assert_eq!(update.asks[0].size, Decimal::from(2));
1099    }
1100
1101    #[tokio::test]
1102    async fn book_manager_requests_resub_on_gap() {
1103        let (book_tx, mut book_rx) = mpsc::channel(8);
1104        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel();
1105        let exchange = ExchangeId::from("bybit_linear");
1106        let mut manager = BookManager::new(exchange, book_tx, cmd_tx.clone());
1107
1108        let snapshot = OrderbookMessage {
1109            topic: "orderbook.1.BTCUSDT".into(),
1110            msg_type: "snapshot".into(),
1111            ts: 1,
1112            data: vec![sample_data(
1113                "BTCUSDT",
1114                &[("100", "1")],
1115                &[("101", "1")],
1116                5,
1117                Some(4),
1118            )],
1119        };
1120        manager.handle(snapshot).await;
1121        book_rx.recv().await.expect("snapshot missing");
1122
1123        let gap_delta = OrderbookMessage {
1124            topic: "orderbook.1.BTCUSDT".into(),
1125            msg_type: "delta".into(),
1126            ts: 2,
1127            data: vec![sample_data(
1128                "BTCUSDT",
1129                &[("100", "0")],
1130                &[("101", "2")],
1131                8,
1132                Some(6),
1133            )],
1134        };
1135        manager.handle(gap_delta).await;
1136
1137        let resub = cmd_rx.recv().await.expect("resubscribe missing");
1138        match resub {
1139            WsCommand::Subscribe(topic) => assert_eq!(topic, "orderbook.1.BTCUSDT"),
1140            _ => panic!("unexpected command {:?}", resub),
1141        }
1142    }
1143}