1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6};
7use std::time::Duration;
8
9use chrono::{DateTime, TimeZone, Utc};
10use futures::{SinkExt, StreamExt};
11use hmac::{Hmac, Mac};
12use rust_decimal::Decimal;
13use serde::{Deserialize, Serialize};
14use serde_json::{self, json, Value};
15use sha2::Sha256;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc::error::TryRecvError;
18use tokio::sync::{mpsc, Mutex};
19use tokio::time::{interval, MissedTickBehavior};
20use tokio_tungstenite::{
21 connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
22};
23use tracing::{debug, error, info, warn};
24
25use crate::{millis_to_datetime as parse_millis, BybitCredentials};
26
27type HmacSha256 = Hmac<Sha256>;
28
29use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
30use tesser_core::{
31 Candle, Fill, Interval, LocalOrderBook, Order, OrderBook, OrderBookLevel, OrderRequest,
32 OrderType, Side, Tick,
33};
34
35#[derive(Clone, Copy, Debug)]
36pub enum PublicChannel {
37 Linear,
38 Inverse,
39 Spot,
40 Option,
41 Spread,
42}
43
44impl PublicChannel {
45 pub fn as_path(&self) -> &'static str {
46 match self {
47 Self::Linear => "linear",
48 Self::Inverse => "inverse",
49 Self::Spot => "spot",
50 Self::Option => "option",
51 Self::Spread => "spread",
52 }
53 }
54}
55
56impl FromStr for PublicChannel {
57 type Err = BrokerError;
58
59 fn from_str(value: &str) -> Result<Self, Self::Err> {
60 match value.to_lowercase().as_str() {
61 "linear" => Ok(Self::Linear),
62 "inverse" => Ok(Self::Inverse),
63 "spot" => Ok(Self::Spot),
64 "option" => Ok(Self::Option),
65 "spread" => Ok(Self::Spread),
66 other => Err(BrokerError::InvalidRequest(format!(
67 "unsupported Bybit public channel '{other}'"
68 ))),
69 }
70 }
71}
72
73#[derive(Clone, Debug, Serialize)]
74pub enum BybitSubscription {
75 Trades { symbol: String },
76 Kline { symbol: String, interval: Interval },
77 OrderBook { symbol: String, depth: usize },
78}
79
80impl BybitSubscription {
81 fn topic(&self) -> String {
82 match self {
83 Self::Kline { symbol, interval } => {
84 format!("kline.{}.{}", interval.to_bybit(), symbol)
85 }
86 Self::Trades { symbol } => format!("publicTrade.{symbol}"),
87 Self::OrderBook { symbol, depth } => {
88 format!("orderbook.{depth}.{symbol}")
89 }
90 }
91 }
92}
93
94#[derive(Clone, Debug)]
95enum WsCommand {
96 Subscribe(String),
97 Shutdown,
98}
99
100pub struct BybitMarketStream {
101 info: BrokerInfo,
102 command_tx: mpsc::UnboundedSender<WsCommand>,
103 tick_rx: Mutex<mpsc::Receiver<Tick>>,
104 candle_rx: Mutex<mpsc::Receiver<Candle>>,
105 order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
106 connection_status: Option<Arc<AtomicBool>>,
107}
108
109impl BybitMarketStream {
110 pub async fn connect_public(
111 base_url: &str,
112 channel: PublicChannel,
113 connection_status: Option<Arc<AtomicBool>>,
114 ) -> BrokerResult<Self> {
115 let endpoint = format!(
116 "{}/v5/public/{}",
117 base_url.trim_end_matches('/'),
118 channel.as_path()
119 );
120 let (ws, _) = connect_async(&endpoint)
121 .await
122 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
123 if let Some(flag) = &connection_status {
124 flag.store(true, Ordering::SeqCst);
125 }
126 let (command_tx, command_rx) = mpsc::unbounded_channel();
127 let command_loop = command_tx.clone();
128 let (tick_tx, tick_rx) = mpsc::channel(2048);
129 let (candle_tx, candle_rx) = mpsc::channel(1024);
130 let (order_book_tx, order_book_rx) = mpsc::channel(256);
131 let status_for_loop = connection_status.clone();
132 tokio::spawn(async move {
133 if let Err(err) = run_ws_loop(
134 ws,
135 command_rx,
136 command_loop,
137 tick_tx,
138 candle_tx,
139 order_book_tx,
140 status_for_loop,
141 )
142 .await
143 {
144 error!(error = %err, "bybit ws loop exited unexpectedly");
145 }
146 });
147 Ok(Self {
148 info: BrokerInfo {
149 name: format!("bybit-{}", channel.as_path()),
150 markets: vec![channel.as_path().to_string()],
151 supports_testnet: endpoint.contains("testnet"),
152 },
153 command_tx,
154 tick_rx: Mutex::new(tick_rx),
155 candle_rx: Mutex::new(candle_rx),
156 order_book_rx: Mutex::new(order_book_rx),
157 connection_status,
158 })
159 }
160
161 pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
162 self.connection_status.clone()
163 }
164}
165
166pub async fn connect_private(
167 base_url: &str,
168 creds: &BybitCredentials,
169 connection_status: Option<Arc<AtomicBool>>,
170) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
171 let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
172 let (mut socket, _) = match connect_async(&endpoint).await {
173 Ok(value) => {
174 if let Some(flag) = &connection_status {
175 flag.store(true, Ordering::SeqCst);
176 }
177 value
178 }
179 Err(err) => {
180 if let Some(flag) = &connection_status {
181 flag.store(false, Ordering::SeqCst);
182 }
183 return Err(BrokerError::Transport(err.to_string()));
184 }
185 };
186
187 let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
188 let payload = format!("GET/realtime{expires}");
189 let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
190 .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
191 mac.update(payload.as_bytes());
192 let signature = hex::encode(mac.finalize().into_bytes());
193
194 let auth_payload = json!({
195 "op": "auth",
196 "args": [creds.api_key.clone(), expires, signature],
197 });
198
199 socket
200 .send(Message::Text(auth_payload.to_string()))
201 .await
202 .map_err(|e| BrokerError::Transport(e.to_string()))?;
203
204 if let Some(Ok(Message::Text(text))) = socket.next().await {
205 if let Ok(value) = serde_json::from_str::<Value>(&text) {
206 if value
207 .get("success")
208 .and_then(|v| v.as_bool())
209 .unwrap_or(false)
210 {
211 info!("Private websocket authenticated");
212 } else {
213 warn!(payload = text, "Private websocket auth failed");
214 return Err(BrokerError::Authentication(
215 "private websocket auth failed".into(),
216 ));
217 }
218 }
219 }
220
221 let sub_payload = json!({
222 "op": "subscribe",
223 "args": ["order", "execution"],
224 });
225 socket
226 .send(Message::Text(sub_payload.to_string()))
227 .await
228 .map_err(|e| BrokerError::Transport(e.to_string()))?;
229
230 info!("Subscribed to private order/execution channels");
231
232 Ok(socket)
233}
234
235#[async_trait::async_trait]
236impl MarketStream for BybitMarketStream {
237 type Subscription = BybitSubscription;
238
239 fn name(&self) -> &str {
240 &self.info.name
241 }
242
243 fn info(&self) -> Option<&BrokerInfo> {
244 Some(&self.info)
245 }
246
247 async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
248 let topic = subscription.topic();
249 self.command_tx
250 .send(WsCommand::Subscribe(topic.clone()))
251 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
252 info!(topic, "subscribed to Bybit stream");
253 Ok(())
254 }
255
256 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
257 let mut rx = self.tick_rx.lock().await;
258 match rx.try_recv() {
259 Ok(tick) => Ok(Some(tick)),
260 Err(TryRecvError::Empty) => Ok(None),
261 Err(TryRecvError::Disconnected) => Ok(None),
262 }
263 }
264
265 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
266 let mut rx = self.candle_rx.lock().await;
267 match rx.try_recv() {
268 Ok(candle) => Ok(Some(candle)),
269 Err(TryRecvError::Empty) => Ok(None),
270 Err(TryRecvError::Disconnected) => Ok(None),
271 }
272 }
273
274 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
275 let mut rx = self.order_book_rx.lock().await;
276 match rx.try_recv() {
277 Ok(book) => Ok(Some(book)),
278 Err(TryRecvError::Empty) => Ok(None),
279 Err(TryRecvError::Disconnected) => Ok(None),
280 }
281 }
282}
283
284type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
285
286async fn run_ws_loop(
287 mut socket: WsStream,
288 mut commands: mpsc::UnboundedReceiver<WsCommand>,
289 command_tx: mpsc::UnboundedSender<WsCommand>,
290 tick_tx: mpsc::Sender<Tick>,
291 candle_tx: mpsc::Sender<Candle>,
292 order_book_tx: mpsc::Sender<OrderBook>,
293 connection_status: Option<Arc<AtomicBool>>,
294) -> BrokerResult<()> {
295 let mut heartbeat = interval(Duration::from_secs(20));
296 heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
297
298 if let Some(flag) = &connection_status {
299 flag.store(true, Ordering::SeqCst);
300 }
301
302 let mut book_manager = BookManager::new(order_book_tx.clone(), command_tx);
303
304 loop {
305 tokio::select! {
306 cmd = commands.recv() => {
307 match cmd {
308 Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
309 Some(WsCommand::Shutdown) => {
310 let _ = socket.send(Message::Close(None)).await;
311 break;
312 }
313 None => break,
314 }
315 }
316 msg = socket.next() => {
317 match msg {
318 Some(Ok(Message::Ping(payload))) => {
319 socket
320 .send(Message::Pong(payload))
321 .await
322 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
323 }
324 Some(Ok(message)) => handle_message(message, &tick_tx, &candle_tx, &mut book_manager).await?,
325 Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
326 None => break,
327 }
328 }
329 _ = heartbeat.tick() => {
330 send_ping(&mut socket).await?;
331 }
332 }
333 }
334
335 if let Some(flag) = connection_status {
336 flag.store(false, Ordering::SeqCst);
337 }
338
339 Ok(())
340}
341
342async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
343 let payload = json!({
344 "op": "subscribe",
345 "args": [topic],
346 });
347 socket
348 .send(Message::Text(payload.to_string()))
349 .await
350 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
351}
352
353async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
354 let payload = json!({ "op": "ping" });
355 socket
356 .send(Message::Text(payload.to_string()))
357 .await
358 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
359}
360
361async fn handle_message(
362 message: Message,
363 tick_tx: &mpsc::Sender<Tick>,
364 candle_tx: &mpsc::Sender<Candle>,
365 book_manager: &mut BookManager,
366) -> BrokerResult<()> {
367 match message {
368 Message::Text(text) => {
369 process_text_message(&text, tick_tx, candle_tx, book_manager).await;
370 }
371 Message::Binary(bytes) => {
372 if let Ok(text) = String::from_utf8(bytes) {
373 process_text_message(&text, tick_tx, candle_tx, book_manager).await;
374 } else {
375 warn!("received non UTF-8 binary payload from Bybit ws");
376 }
377 }
378 Message::Ping(payload) => {
379 debug!(size = payload.len(), "received ping from Bybit");
380 }
381 Message::Pong(_) => {
382 debug!("received pong from Bybit");
383 }
384 Message::Close(frame) => {
385 debug!(?frame, "bybit stream closed");
386 return Ok(());
387 }
388 Message::Frame(_) => {}
389 }
390 Ok(())
391}
392
393async fn process_text_message(
394 text: &str,
395 tick_tx: &mpsc::Sender<Tick>,
396 candle_tx: &mpsc::Sender<Candle>,
397 book_manager: &mut BookManager,
398) {
399 if let Ok(value) = serde_json::from_str::<Value>(text) {
400 if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
401 if topic.starts_with("publicTrade") {
402 if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
403 forward_trades(payload, tick_tx).await;
404 }
405 } else if topic.starts_with("kline") {
406 if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
407 forward_klines(payload, candle_tx).await;
408 }
409 } else if topic.starts_with("orderbook") {
410 if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
411 book_manager.handle(payload).await;
412 }
413 } else if topic == "order" {
414 if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
415 for order in payload.data {
416 debug!(
417 order_id = %order.order_id,
418 status = %order.order_status,
419 "received ws order update"
420 );
421 }
422 }
423 } else if topic == "execution" {
424 if let Ok(payload) =
425 serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
426 {
427 for exec in payload.data {
428 debug!(exec_id = %exec.exec_id, "received ws execution");
429 }
430 }
431 } else {
432 debug!(topic, "ignoring unsupported topic from Bybit");
433 }
434 return;
435 }
436
437 if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
438 match op {
439 "subscribe" => {
440 let success = value
441 .get("success")
442 .and_then(|v| v.as_bool())
443 .unwrap_or(true);
444 if success {
445 debug!("subscription acknowledged by Bybit");
446 } else {
447 let msg = value
448 .get("ret_msg")
449 .and_then(|v| v.as_str())
450 .unwrap_or("unknown error");
451 warn!(message = msg, "Bybit rejected subscription request");
452 }
453 }
454 "ping" | "pong" => {
455 debug!(payload = ?value, "heartbeat ack from Bybit");
456 }
457 _ => {
458 debug!(payload = ?value, "command response from Bybit");
459 }
460 }
461 }
462 } else {
463 warn!(payload = text, "failed to parse Bybit ws payload");
464 }
465}
466
467#[derive(Deserialize, Debug)]
468struct TradeMessage {
469 _topic: String,
470 data: Vec<TradeEntry>,
471}
472
473#[derive(Deserialize, Debug)]
474struct TradeEntry {
475 #[serde(rename = "T")]
476 timestamp: i64,
477 #[serde(rename = "s")]
478 symbol: String,
479 #[serde(rename = "S")]
480 side: String,
481 #[serde(rename = "v")]
482 size: String,
483 #[serde(rename = "p")]
484 price: String,
485}
486
487#[derive(Deserialize, Debug)]
488struct KlineMessage {
489 topic: String,
490 data: Vec<KlineEntry>,
491}
492
493#[derive(Deserialize, Debug)]
494struct KlineEntry {
495 _start: i64,
496 _end: i64,
497 interval: String,
498 open: String,
499 high: String,
500 low: String,
501 close: String,
502 volume: String,
503 confirm: bool,
504 timestamp: i64,
505}
506
507#[derive(Deserialize, Debug)]
508pub struct PrivateMessage<T> {
509 pub topic: String,
510 pub data: Vec<T>,
511}
512
513#[derive(Deserialize, Debug)]
514pub struct BybitWsOrder {
515 #[serde(rename = "orderId")]
516 pub order_id: String,
517 #[serde(rename = "symbol")]
518 pub symbol: String,
519 #[serde(rename = "side")]
520 pub side: String,
521 #[serde(rename = "orderStatus")]
522 pub order_status: String,
523}
524
525async fn forward_trades(payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
526 for trade in payload.data {
527 if let Some(tick) = build_tick(&trade) {
528 if tick_tx.send(tick).await.is_err() {
529 warn!("dropping trade tick; downstream receiver closed");
530 break;
531 }
532 }
533 }
534}
535
536fn build_tick(entry: &TradeEntry) -> Option<Tick> {
537 let price = entry.price.parse().ok()?;
538 let size = entry.size.parse().ok()?;
539 let side = match entry.side.as_str() {
540 "Buy" => Side::Buy,
541 "Sell" => Side::Sell,
542 _ => return None,
543 };
544 let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
545 Some(Tick {
546 symbol: entry.symbol.clone(),
547 price,
548 size,
549 side,
550 exchange_timestamp,
551 received_at: Utc::now(),
552 })
553}
554
555async fn forward_klines(payload: KlineMessage, candle_tx: &mpsc::Sender<Candle>) {
556 for kline in payload.data {
557 if !kline.confirm {
558 continue;
559 }
560 if let Some(candle) = build_candle(&payload.topic, &kline) {
561 if candle_tx.send(candle).await.is_err() {
562 warn!("dropping kline; downstream receiver closed");
563 break;
564 }
565 }
566 }
567}
568
569fn build_candle(topic: &str, entry: &KlineEntry) -> Option<Candle> {
570 let interval = parse_interval(&entry.interval)?;
571 let symbol = topic.split('.').next_back()?.to_string();
572 Some(Candle {
573 symbol,
574 interval,
575 open: entry.open.parse().ok()?,
576 high: entry.high.parse().ok()?,
577 low: entry.low.parse().ok()?,
578 close: entry.close.parse().ok()?,
579 volume: entry.volume.parse().ok()?,
580 timestamp: millis_to_datetime(entry.timestamp)?,
581 })
582}
583
584fn parse_interval(value: &str) -> Option<Interval> {
585 match value {
586 "1" => Some(Interval::OneMinute),
587 "5" => Some(Interval::FiveMinutes),
588 "15" => Some(Interval::FifteenMinutes),
589 "60" => Some(Interval::OneHour),
590 "240" => Some(Interval::FourHours),
591 "D" | "d" => Some(Interval::OneDay),
592 _ => None,
593 }
594}
595
596fn parse_levels(entries: &[[String; 2]]) -> Option<Vec<(Decimal, Decimal)>> {
597 let mut out = Vec::with_capacity(entries.len());
598 for entry in entries {
599 let price = entry.first()?.parse().ok()?;
600 let qty = entry.get(1)?.parse().ok()?;
601 out.push((price, qty));
602 }
603 Some(out)
604}
605
606fn parse_topic(topic: &str) -> Option<(usize, String)> {
607 let mut parts = topic.split('.');
608 let kind = parts.next()?;
609 if kind != "orderbook" {
610 return None;
611 }
612 let depth = parts.next()?.parse().ok()?;
613 let symbol = parts.next()?.to_string();
614 Some((depth, symbol))
615}
616
617#[derive(Deserialize, Debug)]
618struct OrderbookMessage {
619 topic: String,
620 #[serde(rename = "type")]
621 msg_type: String, ts: i64,
623 data: Vec<OrderbookData>,
624}
625
626#[derive(Clone, Deserialize, Debug)]
627struct OrderbookData {
628 s: String,
629 b: Vec<[String; 2]>, a: Vec<[String; 2]>, #[serde(rename = "u")]
632 update_id: i64,
633 #[serde(rename = "seq", default)]
634 seq: Option<i64>,
635 #[serde(rename = "prev_seq", default)]
636 prev_seq: Option<i64>,
637 #[serde(rename = "pu", default)]
638 prev_update_id: Option<i64>,
639 #[serde(rename = "checksum", default)]
640 checksum: Option<u32>,
641}
642
643impl OrderbookData {
644 fn sequence(&self) -> i64 {
645 self.seq.unwrap_or(self.update_id)
646 }
647
648 fn previous_sequence(&self) -> Option<i64> {
649 self.prev_seq.or(self.prev_update_id)
650 }
651}
652
653struct BookManager {
654 streams: HashMap<String, SymbolBook>,
655 order_book_tx: mpsc::Sender<OrderBook>,
656 command_tx: mpsc::UnboundedSender<WsCommand>,
657}
658
659impl BookManager {
660 fn new(
661 order_book_tx: mpsc::Sender<OrderBook>,
662 command_tx: mpsc::UnboundedSender<WsCommand>,
663 ) -> Self {
664 Self {
665 streams: HashMap::new(),
666 order_book_tx,
667 command_tx,
668 }
669 }
670
671 async fn handle(&mut self, payload: OrderbookMessage) {
672 let Some((depth, _)) = parse_topic(&payload.topic) else {
673 return;
674 };
675 let Some(data) = payload.data.into_iter().next() else {
676 return;
677 };
678 let symbol = data.s.clone();
679 let stream = self
680 .streams
681 .entry(payload.topic.clone())
682 .or_insert_with(|| SymbolBook::new(payload.topic.clone(), symbol, depth));
683
684 match stream.ingest(payload.msg_type.as_str(), data, payload.ts) {
685 BookUpdate::Pending => {}
686 BookUpdate::OutOfSync => {
687 warn!(topic = %payload.topic, "order book sequence gap detected; resubscribing");
688 let _ = self.command_tx.send(WsCommand::Subscribe(payload.topic));
689 }
690 BookUpdate::Updates(mut books) => {
691 for book in books.drain(..) {
692 if self.order_book_tx.send(book).await.is_err() {
693 warn!("dropping order book; downstream receiver closed");
694 break;
695 }
696 }
697 }
698 }
699 }
700}
701
702#[derive(Clone)]
703struct BookLevel {
704 price: Decimal,
705 quantity: Decimal,
706}
707
708struct PendingDelta {
709 bids: Vec<BookLevel>,
710 asks: Vec<BookLevel>,
711 seq: i64,
712 prev_seq: Option<i64>,
713 ts: i64,
714}
715
716impl PendingDelta {
717 fn from_data(data: OrderbookData, ts: i64) -> Option<Self> {
718 let bids = parse_levels(&data.b)?
719 .into_iter()
720 .map(|(price, quantity)| BookLevel { price, quantity })
721 .collect();
722 let asks = parse_levels(&data.a)?
723 .into_iter()
724 .map(|(price, quantity)| BookLevel { price, quantity })
725 .collect();
726 Some(Self {
727 bids,
728 asks,
729 seq: data.sequence(),
730 prev_seq: data.previous_sequence(),
731 ts,
732 })
733 }
734}
735
736struct SymbolBook {
737 symbol: String,
738 depth: usize,
739 book: LocalOrderBook,
740 last_seq: Option<i64>,
741 synced: bool,
742 pending: Vec<PendingDelta>,
743 last_checksum: Option<u32>,
744}
745
746impl SymbolBook {
747 fn new(_topic: String, symbol: String, depth: usize) -> Self {
748 Self {
749 symbol,
750 depth,
751 book: LocalOrderBook::new(),
752 last_seq: None,
753 synced: false,
754 pending: Vec::new(),
755 last_checksum: None,
756 }
757 }
758
759 fn ingest(&mut self, msg_type: &str, data: OrderbookData, ts: i64) -> BookUpdate {
760 match msg_type {
761 "snapshot" => self.apply_snapshot(data, ts),
762 "delta" => self.apply_delta(data, ts),
763 _ => BookUpdate::Pending,
764 }
765 }
766
767 fn apply_snapshot(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
768 self.last_checksum = data.checksum;
769 let Some(snapshot_bids) = parse_levels(&data.b) else {
770 return BookUpdate::Pending;
771 };
772 let Some(snapshot_asks) = parse_levels(&data.a) else {
773 return BookUpdate::Pending;
774 };
775 self.book.load_snapshot(&snapshot_bids, &snapshot_asks);
776 self.last_seq = Some(data.sequence());
777 self.synced = true;
778 let mut updates = Vec::new();
779 if let Some(book) = self.snapshot(ts) {
780 updates.push(book);
781 }
782 let pending = std::mem::take(&mut self.pending);
783 for delta in pending {
784 match self.apply_pending(delta) {
785 ApplyOutcome::Gap => return BookUpdate::OutOfSync,
786 ApplyOutcome::Updates(mut book_updates) => updates.append(&mut book_updates),
787 ApplyOutcome::Pending => {}
788 }
789 }
790 BookUpdate::Updates(updates)
791 }
792
793 fn apply_delta(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
794 self.last_checksum = data.checksum;
795 let Some(delta) = PendingDelta::from_data(data, ts) else {
796 return BookUpdate::Pending;
797 };
798 if !self.synced {
799 self.pending.push(delta);
800 return BookUpdate::Pending;
801 }
802 match self.apply_pending(delta) {
803 ApplyOutcome::Gap => BookUpdate::OutOfSync,
804 ApplyOutcome::Pending => BookUpdate::Pending,
805 ApplyOutcome::Updates(updates) => BookUpdate::Updates(updates),
806 }
807 }
808
809 fn apply_pending(&mut self, delta: PendingDelta) -> ApplyOutcome {
810 if let Some(last) = self.last_seq {
811 if let Some(prev) = delta.prev_seq {
812 if prev != last {
813 self.reset();
814 return ApplyOutcome::Gap;
815 }
816 } else if delta.seq - 1 != last {
817 self.reset();
818 return ApplyOutcome::Gap;
819 }
820 } else {
821 self.pending.push(delta);
822 return ApplyOutcome::Pending;
823 }
824
825 for level in &delta.bids {
826 self.book
827 .apply_delta(Side::Buy, level.price, level.quantity);
828 }
829 for level in &delta.asks {
830 self.book
831 .apply_delta(Side::Sell, level.price, level.quantity);
832 }
833 self.last_seq = Some(delta.seq);
834
835 if let Some(book) = self.snapshot(delta.ts) {
836 ApplyOutcome::Updates(vec![book])
837 } else {
838 ApplyOutcome::Updates(Vec::new())
839 }
840 }
841
842 fn snapshot(&self, ts: i64) -> Option<OrderBook> {
843 if self.book.is_empty() {
844 return None;
845 }
846 let timestamp = millis_to_datetime(ts)?;
847 let bids = self
848 .book
849 .bid_levels(self.depth)
850 .into_iter()
851 .map(|(price, size)| OrderBookLevel { price, size })
852 .collect::<Vec<_>>();
853 let asks = self
854 .book
855 .ask_levels(self.depth)
856 .into_iter()
857 .map(|(price, size)| OrderBookLevel { price, size })
858 .collect::<Vec<_>>();
859 Some(OrderBook {
860 symbol: self.symbol.clone(),
861 bids,
862 asks,
863 timestamp,
864 exchange_checksum: self.last_checksum,
865 local_checksum: Some(self.book.checksum(self.depth)),
866 })
867 }
868
869 fn reset(&mut self) {
870 self.synced = false;
871 self.last_seq = None;
872 self.pending.clear();
873 }
874}
875
876enum ApplyOutcome {
877 Updates(Vec<OrderBook>),
878 Pending,
879 Gap,
880}
881
882enum BookUpdate {
883 Updates(Vec<OrderBook>),
884 Pending,
885 OutOfSync,
886}
887
888fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
889 Utc.timestamp_millis_opt(value).single()
890}
891
892impl Drop for BybitMarketStream {
893 fn drop(&mut self) {
894 let _ = self.command_tx.send(WsCommand::Shutdown);
895 }
896}
897
898#[derive(Deserialize, Debug)]
899pub struct BybitWsExecution {
900 #[serde(rename = "execId")]
901 pub exec_id: String,
902 #[serde(rename = "orderId")]
903 pub order_id: String,
904 #[serde(rename = "symbol")]
905 pub symbol: String,
906 #[serde(rename = "execPrice")]
907 pub exec_price: String,
908 #[serde(rename = "execQty")]
909 pub exec_qty: String,
910 #[serde(rename = "side")]
911 pub side: String,
912 #[serde(rename = "execFee")]
913 pub exec_fee: String,
914 #[serde(rename = "execTime")]
915 pub exec_time: String,
916 #[serde(rename = "cumExecQty")]
917 pub cum_exec_qty: String,
918 #[serde(rename = "avgPrice")]
919 pub avg_price: String,
920}
921
922impl BybitWsOrder {
923 pub fn to_tesser_order(&self, existing: Option<&Order>) -> Result<Order, BrokerError> {
924 Ok(Order {
925 id: self.order_id.clone(),
926 request: existing
927 .map(|o| o.request.clone())
928 .unwrap_or_else(|| OrderRequest {
929 symbol: self.symbol.clone(),
930 side: if self.side == "Buy" {
931 Side::Buy
932 } else {
933 Side::Sell
934 },
935 order_type: OrderType::Market,
936 quantity: Decimal::ZERO,
937 price: None,
938 trigger_price: None,
939 time_in_force: None,
940 client_order_id: None,
941 take_profit: None,
942 stop_loss: None,
943 display_quantity: None,
944 }),
945 status: crate::BybitClient::map_order_status(&self.order_status),
946 filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
947 avg_fill_price: existing.and_then(|o| o.avg_fill_price),
948 created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
949 updated_at: Utc::now(),
950 })
951 }
952}
953
954impl BybitWsExecution {
955 pub fn to_tesser_fill(&self) -> Result<Fill, BrokerError> {
956 let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
957 BrokerError::Serialization(format!(
958 "failed to parse exec price {}: {e}",
959 self.exec_price
960 ))
961 })?;
962 let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
963 BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
964 })?;
965 let fee = self.exec_fee.parse::<Decimal>().ok();
966 let timestamp = parse_millis(&self.exec_time);
967 let side = match self.side.as_str() {
968 "Buy" => Side::Buy,
969 "Sell" => Side::Sell,
970 other => {
971 return Err(BrokerError::Serialization(format!(
972 "unhandled execution side: {other}"
973 )))
974 }
975 };
976
977 Ok(Fill {
978 order_id: self.order_id.clone(),
979 symbol: self.symbol.clone(),
980 side,
981 fill_price,
982 fill_quantity,
983 fee,
984 timestamp,
985 })
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992
993 fn sample_levels(levels: &[(&str, &str)]) -> Vec<[String; 2]> {
994 levels
995 .iter()
996 .map(|(price, qty)| [price.to_string(), qty.to_string()])
997 .collect()
998 }
999
1000 fn sample_data(
1001 symbol: &str,
1002 bids: &[(&str, &str)],
1003 asks: &[(&str, &str)],
1004 seq: i64,
1005 prev_seq: Option<i64>,
1006 ) -> OrderbookData {
1007 OrderbookData {
1008 s: symbol.into(),
1009 b: sample_levels(bids),
1010 a: sample_levels(asks),
1011 update_id: seq,
1012 seq: Some(seq),
1013 prev_seq,
1014 prev_update_id: None,
1015 checksum: None,
1016 }
1017 }
1018
1019 #[tokio::test]
1020 async fn book_manager_applies_snapshot_and_deltas() {
1021 let (book_tx, mut book_rx) = mpsc::channel(8);
1022 let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
1023 let mut manager = BookManager::new(book_tx, cmd_tx);
1024
1025 let snapshot = OrderbookMessage {
1026 topic: "orderbook.2.BTCUSDT".into(),
1027 msg_type: "snapshot".into(),
1028 ts: 1,
1029 data: vec![sample_data(
1030 "BTCUSDT",
1031 &[("100", "1"), ("99", "2")],
1032 &[("101", "1"), ("102", "2")],
1033 10,
1034 Some(9),
1035 )],
1036 };
1037 manager.handle(snapshot).await;
1038 let first = book_rx.recv().await.expect("snapshot missing");
1039 assert_eq!(first.bids[0].price, Decimal::from(100));
1040 assert_eq!(first.asks[0].price, Decimal::from(101));
1041
1042 let delta = OrderbookMessage {
1043 topic: "orderbook.2.BTCUSDT".into(),
1044 msg_type: "delta".into(),
1045 ts: 2,
1046 data: vec![sample_data(
1047 "BTCUSDT",
1048 &[("100", "0"), ("98", "1")],
1049 &[("101", "2")],
1050 11,
1051 Some(10),
1052 )],
1053 };
1054 manager.handle(delta).await;
1055 let update = book_rx.recv().await.expect("delta missing");
1056 assert_eq!(update.bids.len(), 2);
1057 assert_eq!(update.bids[1].price, Decimal::from(98));
1058 assert_eq!(update.asks[0].size, Decimal::from(2));
1059 }
1060
1061 #[tokio::test]
1062 async fn book_manager_requests_resub_on_gap() {
1063 let (book_tx, mut book_rx) = mpsc::channel(8);
1064 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel();
1065 let mut manager = BookManager::new(book_tx, cmd_tx.clone());
1066
1067 let snapshot = OrderbookMessage {
1068 topic: "orderbook.1.BTCUSDT".into(),
1069 msg_type: "snapshot".into(),
1070 ts: 1,
1071 data: vec![sample_data(
1072 "BTCUSDT",
1073 &[("100", "1")],
1074 &[("101", "1")],
1075 5,
1076 Some(4),
1077 )],
1078 };
1079 manager.handle(snapshot).await;
1080 book_rx.recv().await.expect("snapshot missing");
1081
1082 let gap_delta = OrderbookMessage {
1083 topic: "orderbook.1.BTCUSDT".into(),
1084 msg_type: "delta".into(),
1085 ts: 2,
1086 data: vec![sample_data(
1087 "BTCUSDT",
1088 &[("100", "0")],
1089 &[("101", "2")],
1090 8,
1091 Some(6),
1092 )],
1093 };
1094 manager.handle(gap_delta).await;
1095
1096 let resub = cmd_rx.recv().await.expect("resubscribe missing");
1097 match resub {
1098 WsCommand::Subscribe(topic) => assert_eq!(topic, "orderbook.1.BTCUSDT"),
1099 _ => panic!("unexpected command {:?}", resub),
1100 }
1101 }
1102}