tesser_bybit/
ws.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc,
6};
7use std::time::Duration;
8
9use chrono::{DateTime, TimeZone, Utc};
10use futures::{SinkExt, StreamExt};
11use hmac::{Hmac, Mac};
12use rust_decimal::Decimal;
13use serde::{Deserialize, Serialize};
14use serde_json::{self, json, Value};
15use sha2::Sha256;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc::error::TryRecvError;
18use tokio::sync::{mpsc, Mutex};
19use tokio::time::{interval, MissedTickBehavior};
20use tokio_tungstenite::{
21    connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
22};
23use tracing::{debug, error, info, warn};
24
25use crate::{millis_to_datetime as parse_millis, BybitCredentials};
26
27type HmacSha256 = Hmac<Sha256>;
28
29use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
30use tesser_core::{
31    Candle, Fill, Interval, LocalOrderBook, Order, OrderBook, OrderBookLevel, OrderRequest,
32    OrderType, Side, Tick,
33};
34
35#[derive(Clone, Copy, Debug)]
36pub enum PublicChannel {
37    Linear,
38    Inverse,
39    Spot,
40    Option,
41    Spread,
42}
43
44impl PublicChannel {
45    pub fn as_path(&self) -> &'static str {
46        match self {
47            Self::Linear => "linear",
48            Self::Inverse => "inverse",
49            Self::Spot => "spot",
50            Self::Option => "option",
51            Self::Spread => "spread",
52        }
53    }
54}
55
56impl FromStr for PublicChannel {
57    type Err = BrokerError;
58
59    fn from_str(value: &str) -> Result<Self, Self::Err> {
60        match value.to_lowercase().as_str() {
61            "linear" => Ok(Self::Linear),
62            "inverse" => Ok(Self::Inverse),
63            "spot" => Ok(Self::Spot),
64            "option" => Ok(Self::Option),
65            "spread" => Ok(Self::Spread),
66            other => Err(BrokerError::InvalidRequest(format!(
67                "unsupported Bybit public channel '{other}'"
68            ))),
69        }
70    }
71}
72
73#[derive(Clone, Debug, Serialize)]
74pub enum BybitSubscription {
75    Trades { symbol: String },
76    Kline { symbol: String, interval: Interval },
77    OrderBook { symbol: String, depth: usize },
78}
79
80impl BybitSubscription {
81    fn topic(&self) -> String {
82        match self {
83            Self::Kline { symbol, interval } => {
84                format!("kline.{}.{}", interval.to_bybit(), symbol)
85            }
86            Self::Trades { symbol } => format!("publicTrade.{symbol}"),
87            Self::OrderBook { symbol, depth } => {
88                format!("orderbook.{depth}.{symbol}")
89            }
90        }
91    }
92}
93
94#[derive(Clone, Debug)]
95enum WsCommand {
96    Subscribe(String),
97    Shutdown,
98}
99
100pub struct BybitMarketStream {
101    info: BrokerInfo,
102    command_tx: mpsc::UnboundedSender<WsCommand>,
103    tick_rx: Mutex<mpsc::Receiver<Tick>>,
104    candle_rx: Mutex<mpsc::Receiver<Candle>>,
105    order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
106    connection_status: Option<Arc<AtomicBool>>,
107}
108
109impl BybitMarketStream {
110    pub async fn connect_public(
111        base_url: &str,
112        channel: PublicChannel,
113        connection_status: Option<Arc<AtomicBool>>,
114    ) -> BrokerResult<Self> {
115        let endpoint = format!(
116            "{}/v5/public/{}",
117            base_url.trim_end_matches('/'),
118            channel.as_path()
119        );
120        let (ws, _) = connect_async(&endpoint)
121            .await
122            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
123        if let Some(flag) = &connection_status {
124            flag.store(true, Ordering::SeqCst);
125        }
126        let (command_tx, command_rx) = mpsc::unbounded_channel();
127        let command_loop = command_tx.clone();
128        let (tick_tx, tick_rx) = mpsc::channel(2048);
129        let (candle_tx, candle_rx) = mpsc::channel(1024);
130        let (order_book_tx, order_book_rx) = mpsc::channel(256);
131        let status_for_loop = connection_status.clone();
132        tokio::spawn(async move {
133            if let Err(err) = run_ws_loop(
134                ws,
135                command_rx,
136                command_loop,
137                tick_tx,
138                candle_tx,
139                order_book_tx,
140                status_for_loop,
141            )
142            .await
143            {
144                error!(error = %err, "bybit ws loop exited unexpectedly");
145            }
146        });
147        Ok(Self {
148            info: BrokerInfo {
149                name: format!("bybit-{}", channel.as_path()),
150                markets: vec![channel.as_path().to_string()],
151                supports_testnet: endpoint.contains("testnet"),
152            },
153            command_tx,
154            tick_rx: Mutex::new(tick_rx),
155            candle_rx: Mutex::new(candle_rx),
156            order_book_rx: Mutex::new(order_book_rx),
157            connection_status,
158        })
159    }
160
161    pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
162        self.connection_status.clone()
163    }
164}
165
166pub async fn connect_private(
167    base_url: &str,
168    creds: &BybitCredentials,
169    connection_status: Option<Arc<AtomicBool>>,
170) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
171    let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
172    let (mut socket, _) = match connect_async(&endpoint).await {
173        Ok(value) => {
174            if let Some(flag) = &connection_status {
175                flag.store(true, Ordering::SeqCst);
176            }
177            value
178        }
179        Err(err) => {
180            if let Some(flag) = &connection_status {
181                flag.store(false, Ordering::SeqCst);
182            }
183            return Err(BrokerError::Transport(err.to_string()));
184        }
185    };
186
187    let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
188    let payload = format!("GET/realtime{expires}");
189    let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
190        .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
191    mac.update(payload.as_bytes());
192    let signature = hex::encode(mac.finalize().into_bytes());
193
194    let auth_payload = json!({
195        "op": "auth",
196        "args": [creds.api_key.clone(), expires, signature],
197    });
198
199    socket
200        .send(Message::Text(auth_payload.to_string()))
201        .await
202        .map_err(|e| BrokerError::Transport(e.to_string()))?;
203
204    if let Some(Ok(Message::Text(text))) = socket.next().await {
205        if let Ok(value) = serde_json::from_str::<Value>(&text) {
206            if value
207                .get("success")
208                .and_then(|v| v.as_bool())
209                .unwrap_or(false)
210            {
211                info!("Private websocket authenticated");
212            } else {
213                warn!(payload = text, "Private websocket auth failed");
214                return Err(BrokerError::Authentication(
215                    "private websocket auth failed".into(),
216                ));
217            }
218        }
219    }
220
221    let sub_payload = json!({
222        "op": "subscribe",
223        "args": ["order", "execution"],
224    });
225    socket
226        .send(Message::Text(sub_payload.to_string()))
227        .await
228        .map_err(|e| BrokerError::Transport(e.to_string()))?;
229
230    info!("Subscribed to private order/execution channels");
231
232    Ok(socket)
233}
234
235#[async_trait::async_trait]
236impl MarketStream for BybitMarketStream {
237    type Subscription = BybitSubscription;
238
239    fn name(&self) -> &str {
240        &self.info.name
241    }
242
243    fn info(&self) -> Option<&BrokerInfo> {
244        Some(&self.info)
245    }
246
247    async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
248        let topic = subscription.topic();
249        self.command_tx
250            .send(WsCommand::Subscribe(topic.clone()))
251            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
252        info!(topic, "subscribed to Bybit stream");
253        Ok(())
254    }
255
256    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
257        let mut rx = self.tick_rx.lock().await;
258        match rx.try_recv() {
259            Ok(tick) => Ok(Some(tick)),
260            Err(TryRecvError::Empty) => Ok(None),
261            Err(TryRecvError::Disconnected) => Ok(None),
262        }
263    }
264
265    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
266        let mut rx = self.candle_rx.lock().await;
267        match rx.try_recv() {
268            Ok(candle) => Ok(Some(candle)),
269            Err(TryRecvError::Empty) => Ok(None),
270            Err(TryRecvError::Disconnected) => Ok(None),
271        }
272    }
273
274    async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
275        let mut rx = self.order_book_rx.lock().await;
276        match rx.try_recv() {
277            Ok(book) => Ok(Some(book)),
278            Err(TryRecvError::Empty) => Ok(None),
279            Err(TryRecvError::Disconnected) => Ok(None),
280        }
281    }
282}
283
284type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
285
286async fn run_ws_loop(
287    mut socket: WsStream,
288    mut commands: mpsc::UnboundedReceiver<WsCommand>,
289    command_tx: mpsc::UnboundedSender<WsCommand>,
290    tick_tx: mpsc::Sender<Tick>,
291    candle_tx: mpsc::Sender<Candle>,
292    order_book_tx: mpsc::Sender<OrderBook>,
293    connection_status: Option<Arc<AtomicBool>>,
294) -> BrokerResult<()> {
295    let mut heartbeat = interval(Duration::from_secs(20));
296    heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
297
298    if let Some(flag) = &connection_status {
299        flag.store(true, Ordering::SeqCst);
300    }
301
302    let mut book_manager = BookManager::new(order_book_tx.clone(), command_tx);
303
304    loop {
305        tokio::select! {
306            cmd = commands.recv() => {
307                match cmd {
308                    Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
309                    Some(WsCommand::Shutdown) => {
310                        let _ = socket.send(Message::Close(None)).await;
311                        break;
312                    }
313                    None => break,
314                }
315            }
316            msg = socket.next() => {
317                match msg {
318                    Some(Ok(Message::Ping(payload))) => {
319                        socket
320                            .send(Message::Pong(payload))
321                            .await
322                            .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
323                    }
324                    Some(Ok(message)) => handle_message(message, &tick_tx, &candle_tx, &mut book_manager).await?,
325                    Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
326                    None => break,
327                }
328            }
329            _ = heartbeat.tick() => {
330                send_ping(&mut socket).await?;
331            }
332        }
333    }
334
335    if let Some(flag) = connection_status {
336        flag.store(false, Ordering::SeqCst);
337    }
338
339    Ok(())
340}
341
342async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
343    let payload = json!({
344        "op": "subscribe",
345        "args": [topic],
346    });
347    socket
348        .send(Message::Text(payload.to_string()))
349        .await
350        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
351}
352
353async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
354    let payload = json!({ "op": "ping" });
355    socket
356        .send(Message::Text(payload.to_string()))
357        .await
358        .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
359}
360
361async fn handle_message(
362    message: Message,
363    tick_tx: &mpsc::Sender<Tick>,
364    candle_tx: &mpsc::Sender<Candle>,
365    book_manager: &mut BookManager,
366) -> BrokerResult<()> {
367    match message {
368        Message::Text(text) => {
369            process_text_message(&text, tick_tx, candle_tx, book_manager).await;
370        }
371        Message::Binary(bytes) => {
372            if let Ok(text) = String::from_utf8(bytes) {
373                process_text_message(&text, tick_tx, candle_tx, book_manager).await;
374            } else {
375                warn!("received non UTF-8 binary payload from Bybit ws");
376            }
377        }
378        Message::Ping(payload) => {
379            debug!(size = payload.len(), "received ping from Bybit");
380        }
381        Message::Pong(_) => {
382            debug!("received pong from Bybit");
383        }
384        Message::Close(frame) => {
385            debug!(?frame, "bybit stream closed");
386            return Ok(());
387        }
388        Message::Frame(_) => {}
389    }
390    Ok(())
391}
392
393async fn process_text_message(
394    text: &str,
395    tick_tx: &mpsc::Sender<Tick>,
396    candle_tx: &mpsc::Sender<Candle>,
397    book_manager: &mut BookManager,
398) {
399    if let Ok(value) = serde_json::from_str::<Value>(text) {
400        if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
401            if topic.starts_with("publicTrade") {
402                if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
403                    forward_trades(payload, tick_tx).await;
404                }
405            } else if topic.starts_with("kline") {
406                if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
407                    forward_klines(payload, candle_tx).await;
408                }
409            } else if topic.starts_with("orderbook") {
410                if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
411                    book_manager.handle(payload).await;
412                }
413            } else if topic == "order" {
414                if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
415                    for order in payload.data {
416                        debug!(
417                            order_id = %order.order_id,
418                            status = %order.order_status,
419                            "received ws order update"
420                        );
421                    }
422                }
423            } else if topic == "execution" {
424                if let Ok(payload) =
425                    serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
426                {
427                    for exec in payload.data {
428                        debug!(exec_id = %exec.exec_id, "received ws execution");
429                    }
430                }
431            } else {
432                debug!(topic, "ignoring unsupported topic from Bybit");
433            }
434            return;
435        }
436
437        if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
438            match op {
439                "subscribe" => {
440                    let success = value
441                        .get("success")
442                        .and_then(|v| v.as_bool())
443                        .unwrap_or(true);
444                    if success {
445                        debug!("subscription acknowledged by Bybit");
446                    } else {
447                        let msg = value
448                            .get("ret_msg")
449                            .and_then(|v| v.as_str())
450                            .unwrap_or("unknown error");
451                        warn!(message = msg, "Bybit rejected subscription request");
452                    }
453                }
454                "ping" | "pong" => {
455                    debug!(payload = ?value, "heartbeat ack from Bybit");
456                }
457                _ => {
458                    debug!(payload = ?value, "command response from Bybit");
459                }
460            }
461        }
462    } else {
463        warn!(payload = text, "failed to parse Bybit ws payload");
464    }
465}
466
467#[derive(Deserialize, Debug)]
468struct TradeMessage {
469    _topic: String,
470    data: Vec<TradeEntry>,
471}
472
473#[derive(Deserialize, Debug)]
474struct TradeEntry {
475    #[serde(rename = "T")]
476    timestamp: i64,
477    #[serde(rename = "s")]
478    symbol: String,
479    #[serde(rename = "S")]
480    side: String,
481    #[serde(rename = "v")]
482    size: String,
483    #[serde(rename = "p")]
484    price: String,
485}
486
487#[derive(Deserialize, Debug)]
488struct KlineMessage {
489    topic: String,
490    data: Vec<KlineEntry>,
491}
492
493#[derive(Deserialize, Debug)]
494struct KlineEntry {
495    _start: i64,
496    _end: i64,
497    interval: String,
498    open: String,
499    high: String,
500    low: String,
501    close: String,
502    volume: String,
503    confirm: bool,
504    timestamp: i64,
505}
506
507#[derive(Deserialize, Debug)]
508pub struct PrivateMessage<T> {
509    pub topic: String,
510    pub data: Vec<T>,
511}
512
513#[derive(Deserialize, Debug)]
514pub struct BybitWsOrder {
515    #[serde(rename = "orderId")]
516    pub order_id: String,
517    #[serde(rename = "symbol")]
518    pub symbol: String,
519    #[serde(rename = "side")]
520    pub side: String,
521    #[serde(rename = "orderStatus")]
522    pub order_status: String,
523}
524
525async fn forward_trades(payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
526    for trade in payload.data {
527        if let Some(tick) = build_tick(&trade) {
528            if tick_tx.send(tick).await.is_err() {
529                warn!("dropping trade tick; downstream receiver closed");
530                break;
531            }
532        }
533    }
534}
535
536fn build_tick(entry: &TradeEntry) -> Option<Tick> {
537    let price = entry.price.parse().ok()?;
538    let size = entry.size.parse().ok()?;
539    let side = match entry.side.as_str() {
540        "Buy" => Side::Buy,
541        "Sell" => Side::Sell,
542        _ => return None,
543    };
544    let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
545    Some(Tick {
546        symbol: entry.symbol.clone(),
547        price,
548        size,
549        side,
550        exchange_timestamp,
551        received_at: Utc::now(),
552    })
553}
554
555async fn forward_klines(payload: KlineMessage, candle_tx: &mpsc::Sender<Candle>) {
556    for kline in payload.data {
557        if !kline.confirm {
558            continue;
559        }
560        if let Some(candle) = build_candle(&payload.topic, &kline) {
561            if candle_tx.send(candle).await.is_err() {
562                warn!("dropping kline; downstream receiver closed");
563                break;
564            }
565        }
566    }
567}
568
569fn build_candle(topic: &str, entry: &KlineEntry) -> Option<Candle> {
570    let interval = parse_interval(&entry.interval)?;
571    let symbol = topic.split('.').next_back()?.to_string();
572    Some(Candle {
573        symbol,
574        interval,
575        open: entry.open.parse().ok()?,
576        high: entry.high.parse().ok()?,
577        low: entry.low.parse().ok()?,
578        close: entry.close.parse().ok()?,
579        volume: entry.volume.parse().ok()?,
580        timestamp: millis_to_datetime(entry.timestamp)?,
581    })
582}
583
584fn parse_interval(value: &str) -> Option<Interval> {
585    match value {
586        "1" => Some(Interval::OneMinute),
587        "5" => Some(Interval::FiveMinutes),
588        "15" => Some(Interval::FifteenMinutes),
589        "60" => Some(Interval::OneHour),
590        "240" => Some(Interval::FourHours),
591        "D" | "d" => Some(Interval::OneDay),
592        _ => None,
593    }
594}
595
596fn parse_levels(entries: &[[String; 2]]) -> Option<Vec<(Decimal, Decimal)>> {
597    let mut out = Vec::with_capacity(entries.len());
598    for entry in entries {
599        let price = entry.first()?.parse().ok()?;
600        let qty = entry.get(1)?.parse().ok()?;
601        out.push((price, qty));
602    }
603    Some(out)
604}
605
606fn parse_topic(topic: &str) -> Option<(usize, String)> {
607    let mut parts = topic.split('.');
608    let kind = parts.next()?;
609    if kind != "orderbook" {
610        return None;
611    }
612    let depth = parts.next()?.parse().ok()?;
613    let symbol = parts.next()?.to_string();
614    Some((depth, symbol))
615}
616
617#[derive(Deserialize, Debug)]
618struct OrderbookMessage {
619    topic: String,
620    #[serde(rename = "type")]
621    msg_type: String, // "snapshot" or "delta"
622    ts: i64,
623    data: Vec<OrderbookData>,
624}
625
626#[derive(Clone, Deserialize, Debug)]
627struct OrderbookData {
628    s: String,
629    b: Vec<[String; 2]>, // Bids
630    a: Vec<[String; 2]>, // Asks
631    #[serde(rename = "u")]
632    update_id: i64,
633    #[serde(rename = "seq", default)]
634    seq: Option<i64>,
635    #[serde(rename = "prev_seq", default)]
636    prev_seq: Option<i64>,
637    #[serde(rename = "pu", default)]
638    prev_update_id: Option<i64>,
639    #[serde(rename = "checksum", default)]
640    checksum: Option<u32>,
641}
642
643impl OrderbookData {
644    fn sequence(&self) -> i64 {
645        self.seq.unwrap_or(self.update_id)
646    }
647
648    fn previous_sequence(&self) -> Option<i64> {
649        self.prev_seq.or(self.prev_update_id)
650    }
651}
652
653struct BookManager {
654    streams: HashMap<String, SymbolBook>,
655    order_book_tx: mpsc::Sender<OrderBook>,
656    command_tx: mpsc::UnboundedSender<WsCommand>,
657}
658
659impl BookManager {
660    fn new(
661        order_book_tx: mpsc::Sender<OrderBook>,
662        command_tx: mpsc::UnboundedSender<WsCommand>,
663    ) -> Self {
664        Self {
665            streams: HashMap::new(),
666            order_book_tx,
667            command_tx,
668        }
669    }
670
671    async fn handle(&mut self, payload: OrderbookMessage) {
672        let Some((depth, _)) = parse_topic(&payload.topic) else {
673            return;
674        };
675        let Some(data) = payload.data.into_iter().next() else {
676            return;
677        };
678        let symbol = data.s.clone();
679        let stream = self
680            .streams
681            .entry(payload.topic.clone())
682            .or_insert_with(|| SymbolBook::new(payload.topic.clone(), symbol, depth));
683
684        match stream.ingest(payload.msg_type.as_str(), data, payload.ts) {
685            BookUpdate::Pending => {}
686            BookUpdate::OutOfSync => {
687                warn!(topic = %payload.topic, "order book sequence gap detected; resubscribing");
688                let _ = self.command_tx.send(WsCommand::Subscribe(payload.topic));
689            }
690            BookUpdate::Updates(mut books) => {
691                for book in books.drain(..) {
692                    if self.order_book_tx.send(book).await.is_err() {
693                        warn!("dropping order book; downstream receiver closed");
694                        break;
695                    }
696                }
697            }
698        }
699    }
700}
701
702#[derive(Clone)]
703struct BookLevel {
704    price: Decimal,
705    quantity: Decimal,
706}
707
708struct PendingDelta {
709    bids: Vec<BookLevel>,
710    asks: Vec<BookLevel>,
711    seq: i64,
712    prev_seq: Option<i64>,
713    ts: i64,
714}
715
716impl PendingDelta {
717    fn from_data(data: OrderbookData, ts: i64) -> Option<Self> {
718        let bids = parse_levels(&data.b)?
719            .into_iter()
720            .map(|(price, quantity)| BookLevel { price, quantity })
721            .collect();
722        let asks = parse_levels(&data.a)?
723            .into_iter()
724            .map(|(price, quantity)| BookLevel { price, quantity })
725            .collect();
726        Some(Self {
727            bids,
728            asks,
729            seq: data.sequence(),
730            prev_seq: data.previous_sequence(),
731            ts,
732        })
733    }
734}
735
736struct SymbolBook {
737    symbol: String,
738    depth: usize,
739    book: LocalOrderBook,
740    last_seq: Option<i64>,
741    synced: bool,
742    pending: Vec<PendingDelta>,
743    last_checksum: Option<u32>,
744}
745
746impl SymbolBook {
747    fn new(_topic: String, symbol: String, depth: usize) -> Self {
748        Self {
749            symbol,
750            depth,
751            book: LocalOrderBook::new(),
752            last_seq: None,
753            synced: false,
754            pending: Vec::new(),
755            last_checksum: None,
756        }
757    }
758
759    fn ingest(&mut self, msg_type: &str, data: OrderbookData, ts: i64) -> BookUpdate {
760        match msg_type {
761            "snapshot" => self.apply_snapshot(data, ts),
762            "delta" => self.apply_delta(data, ts),
763            _ => BookUpdate::Pending,
764        }
765    }
766
767    fn apply_snapshot(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
768        self.last_checksum = data.checksum;
769        let Some(snapshot_bids) = parse_levels(&data.b) else {
770            return BookUpdate::Pending;
771        };
772        let Some(snapshot_asks) = parse_levels(&data.a) else {
773            return BookUpdate::Pending;
774        };
775        self.book.load_snapshot(&snapshot_bids, &snapshot_asks);
776        self.last_seq = Some(data.sequence());
777        self.synced = true;
778        let mut updates = Vec::new();
779        if let Some(book) = self.snapshot(ts) {
780            updates.push(book);
781        }
782        let pending = std::mem::take(&mut self.pending);
783        for delta in pending {
784            match self.apply_pending(delta) {
785                ApplyOutcome::Gap => return BookUpdate::OutOfSync,
786                ApplyOutcome::Updates(mut book_updates) => updates.append(&mut book_updates),
787                ApplyOutcome::Pending => {}
788            }
789        }
790        BookUpdate::Updates(updates)
791    }
792
793    fn apply_delta(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
794        self.last_checksum = data.checksum;
795        let Some(delta) = PendingDelta::from_data(data, ts) else {
796            return BookUpdate::Pending;
797        };
798        if !self.synced {
799            self.pending.push(delta);
800            return BookUpdate::Pending;
801        }
802        match self.apply_pending(delta) {
803            ApplyOutcome::Gap => BookUpdate::OutOfSync,
804            ApplyOutcome::Pending => BookUpdate::Pending,
805            ApplyOutcome::Updates(updates) => BookUpdate::Updates(updates),
806        }
807    }
808
809    fn apply_pending(&mut self, delta: PendingDelta) -> ApplyOutcome {
810        if let Some(last) = self.last_seq {
811            if let Some(prev) = delta.prev_seq {
812                if prev != last {
813                    self.reset();
814                    return ApplyOutcome::Gap;
815                }
816            } else if delta.seq - 1 != last {
817                self.reset();
818                return ApplyOutcome::Gap;
819            }
820        } else {
821            self.pending.push(delta);
822            return ApplyOutcome::Pending;
823        }
824
825        for level in &delta.bids {
826            self.book
827                .apply_delta(Side::Buy, level.price, level.quantity);
828        }
829        for level in &delta.asks {
830            self.book
831                .apply_delta(Side::Sell, level.price, level.quantity);
832        }
833        self.last_seq = Some(delta.seq);
834
835        if let Some(book) = self.snapshot(delta.ts) {
836            ApplyOutcome::Updates(vec![book])
837        } else {
838            ApplyOutcome::Updates(Vec::new())
839        }
840    }
841
842    fn snapshot(&self, ts: i64) -> Option<OrderBook> {
843        if self.book.is_empty() {
844            return None;
845        }
846        let timestamp = millis_to_datetime(ts)?;
847        let bids = self
848            .book
849            .bid_levels(self.depth)
850            .into_iter()
851            .map(|(price, size)| OrderBookLevel { price, size })
852            .collect::<Vec<_>>();
853        let asks = self
854            .book
855            .ask_levels(self.depth)
856            .into_iter()
857            .map(|(price, size)| OrderBookLevel { price, size })
858            .collect::<Vec<_>>();
859        Some(OrderBook {
860            symbol: self.symbol.clone(),
861            bids,
862            asks,
863            timestamp,
864            exchange_checksum: self.last_checksum,
865            local_checksum: Some(self.book.checksum(self.depth)),
866        })
867    }
868
869    fn reset(&mut self) {
870        self.synced = false;
871        self.last_seq = None;
872        self.pending.clear();
873    }
874}
875
876enum ApplyOutcome {
877    Updates(Vec<OrderBook>),
878    Pending,
879    Gap,
880}
881
882enum BookUpdate {
883    Updates(Vec<OrderBook>),
884    Pending,
885    OutOfSync,
886}
887
888fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
889    Utc.timestamp_millis_opt(value).single()
890}
891
892impl Drop for BybitMarketStream {
893    fn drop(&mut self) {
894        let _ = self.command_tx.send(WsCommand::Shutdown);
895    }
896}
897
898#[derive(Deserialize, Debug)]
899pub struct BybitWsExecution {
900    #[serde(rename = "execId")]
901    pub exec_id: String,
902    #[serde(rename = "orderId")]
903    pub order_id: String,
904    #[serde(rename = "symbol")]
905    pub symbol: String,
906    #[serde(rename = "execPrice")]
907    pub exec_price: String,
908    #[serde(rename = "execQty")]
909    pub exec_qty: String,
910    #[serde(rename = "side")]
911    pub side: String,
912    #[serde(rename = "execFee")]
913    pub exec_fee: String,
914    #[serde(rename = "execTime")]
915    pub exec_time: String,
916    #[serde(rename = "cumExecQty")]
917    pub cum_exec_qty: String,
918    #[serde(rename = "avgPrice")]
919    pub avg_price: String,
920}
921
922impl BybitWsOrder {
923    pub fn to_tesser_order(&self, existing: Option<&Order>) -> Result<Order, BrokerError> {
924        Ok(Order {
925            id: self.order_id.clone(),
926            request: existing
927                .map(|o| o.request.clone())
928                .unwrap_or_else(|| OrderRequest {
929                    symbol: self.symbol.clone(),
930                    side: if self.side == "Buy" {
931                        Side::Buy
932                    } else {
933                        Side::Sell
934                    },
935                    order_type: OrderType::Market,
936                    quantity: Decimal::ZERO,
937                    price: None,
938                    trigger_price: None,
939                    time_in_force: None,
940                    client_order_id: None,
941                    take_profit: None,
942                    stop_loss: None,
943                    display_quantity: None,
944                }),
945            status: crate::BybitClient::map_order_status(&self.order_status),
946            filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
947            avg_fill_price: existing.and_then(|o| o.avg_fill_price),
948            created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
949            updated_at: Utc::now(),
950        })
951    }
952}
953
954impl BybitWsExecution {
955    pub fn to_tesser_fill(&self) -> Result<Fill, BrokerError> {
956        let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
957            BrokerError::Serialization(format!(
958                "failed to parse exec price {}: {e}",
959                self.exec_price
960            ))
961        })?;
962        let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
963            BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
964        })?;
965        let fee = self.exec_fee.parse::<Decimal>().ok();
966        let timestamp = parse_millis(&self.exec_time);
967        let side = match self.side.as_str() {
968            "Buy" => Side::Buy,
969            "Sell" => Side::Sell,
970            other => {
971                return Err(BrokerError::Serialization(format!(
972                    "unhandled execution side: {other}"
973                )))
974            }
975        };
976
977        Ok(Fill {
978            order_id: self.order_id.clone(),
979            symbol: self.symbol.clone(),
980            side,
981            fill_price,
982            fill_quantity,
983            fee,
984            timestamp,
985        })
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992
993    fn sample_levels(levels: &[(&str, &str)]) -> Vec<[String; 2]> {
994        levels
995            .iter()
996            .map(|(price, qty)| [price.to_string(), qty.to_string()])
997            .collect()
998    }
999
1000    fn sample_data(
1001        symbol: &str,
1002        bids: &[(&str, &str)],
1003        asks: &[(&str, &str)],
1004        seq: i64,
1005        prev_seq: Option<i64>,
1006    ) -> OrderbookData {
1007        OrderbookData {
1008            s: symbol.into(),
1009            b: sample_levels(bids),
1010            a: sample_levels(asks),
1011            update_id: seq,
1012            seq: Some(seq),
1013            prev_seq,
1014            prev_update_id: None,
1015            checksum: None,
1016        }
1017    }
1018
1019    #[tokio::test]
1020    async fn book_manager_applies_snapshot_and_deltas() {
1021        let (book_tx, mut book_rx) = mpsc::channel(8);
1022        let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
1023        let mut manager = BookManager::new(book_tx, cmd_tx);
1024
1025        let snapshot = OrderbookMessage {
1026            topic: "orderbook.2.BTCUSDT".into(),
1027            msg_type: "snapshot".into(),
1028            ts: 1,
1029            data: vec![sample_data(
1030                "BTCUSDT",
1031                &[("100", "1"), ("99", "2")],
1032                &[("101", "1"), ("102", "2")],
1033                10,
1034                Some(9),
1035            )],
1036        };
1037        manager.handle(snapshot).await;
1038        let first = book_rx.recv().await.expect("snapshot missing");
1039        assert_eq!(first.bids[0].price, Decimal::from(100));
1040        assert_eq!(first.asks[0].price, Decimal::from(101));
1041
1042        let delta = OrderbookMessage {
1043            topic: "orderbook.2.BTCUSDT".into(),
1044            msg_type: "delta".into(),
1045            ts: 2,
1046            data: vec![sample_data(
1047                "BTCUSDT",
1048                &[("100", "0"), ("98", "1")],
1049                &[("101", "2")],
1050                11,
1051                Some(10),
1052            )],
1053        };
1054        manager.handle(delta).await;
1055        let update = book_rx.recv().await.expect("delta missing");
1056        assert_eq!(update.bids.len(), 2);
1057        assert_eq!(update.bids[1].price, Decimal::from(98));
1058        assert_eq!(update.asks[0].size, Decimal::from(2));
1059    }
1060
1061    #[tokio::test]
1062    async fn book_manager_requests_resub_on_gap() {
1063        let (book_tx, mut book_rx) = mpsc::channel(8);
1064        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel();
1065        let mut manager = BookManager::new(book_tx, cmd_tx.clone());
1066
1067        let snapshot = OrderbookMessage {
1068            topic: "orderbook.1.BTCUSDT".into(),
1069            msg_type: "snapshot".into(),
1070            ts: 1,
1071            data: vec![sample_data(
1072                "BTCUSDT",
1073                &[("100", "1")],
1074                &[("101", "1")],
1075                5,
1076                Some(4),
1077            )],
1078        };
1079        manager.handle(snapshot).await;
1080        book_rx.recv().await.expect("snapshot missing");
1081
1082        let gap_delta = OrderbookMessage {
1083            topic: "orderbook.1.BTCUSDT".into(),
1084            msg_type: "delta".into(),
1085            ts: 2,
1086            data: vec![sample_data(
1087                "BTCUSDT",
1088                &[("100", "0")],
1089                &[("101", "2")],
1090                8,
1091                Some(6),
1092            )],
1093        };
1094        manager.handle(gap_delta).await;
1095
1096        let resub = cmd_rx.recv().await.expect("resubscribe missing");
1097        match resub {
1098            WsCommand::Subscribe(topic) => assert_eq!(topic, "orderbook.1.BTCUSDT"),
1099            _ => panic!("unexpected command {:?}", resub),
1100        }
1101    }
1102}