1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6};
7use std::time::Duration;
8
9use chrono::{DateTime, TimeZone, Utc};
10use futures::{SinkExt, StreamExt};
11use hmac::{Hmac, Mac};
12use rust_decimal::Decimal;
13use serde::{Deserialize, Serialize};
14use serde_json::{self, json, Value};
15use sha2::Sha256;
16use tokio::net::TcpStream;
17use tokio::sync::mpsc::error::TryRecvError;
18use tokio::sync::{mpsc, Mutex};
19use tokio::time::{interval, MissedTickBehavior};
20use tokio_tungstenite::{
21 connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
22};
23use tracing::{debug, error, info, warn};
24
25use crate::{millis_to_datetime as parse_millis, BybitCredentials};
26
27type HmacSha256 = Hmac<Sha256>;
28
29use tesser_broker::{BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult, MarketStream};
30use tesser_core::{
31 AssetId, Candle, ExchangeId, Fill, Interval, LocalOrderBook, Order, OrderBook, OrderBookLevel,
32 OrderRequest, OrderType, Side, Symbol, Tick,
33};
34
35#[derive(Clone, Copy, Debug)]
36pub enum PublicChannel {
37 Linear,
38 Inverse,
39 Spot,
40 Option,
41 Spread,
42}
43
44impl PublicChannel {
45 pub fn as_path(&self) -> &'static str {
46 match self {
47 Self::Linear => "linear",
48 Self::Inverse => "inverse",
49 Self::Spot => "spot",
50 Self::Option => "option",
51 Self::Spread => "spread",
52 }
53 }
54}
55
56impl FromStr for PublicChannel {
57 type Err = BrokerError;
58
59 fn from_str(value: &str) -> Result<Self, Self::Err> {
60 match value.to_lowercase().as_str() {
61 "linear" => Ok(Self::Linear),
62 "inverse" => Ok(Self::Inverse),
63 "spot" => Ok(Self::Spot),
64 "option" => Ok(Self::Option),
65 "spread" => Ok(Self::Spread),
66 other => Err(BrokerError::InvalidRequest(format!(
67 "unsupported Bybit public channel '{other}'"
68 ))),
69 }
70 }
71}
72
73#[derive(Clone, Debug, Serialize)]
74pub enum BybitSubscription {
75 Trades { symbol: String },
76 Kline { symbol: String, interval: Interval },
77 OrderBook { symbol: String, depth: usize },
78}
79
80impl BybitSubscription {
81 fn topic(&self) -> String {
82 match self {
83 Self::Kline { symbol, interval } => {
84 format!("kline.{}.{}", interval.to_bybit(), symbol)
85 }
86 Self::Trades { symbol } => format!("publicTrade.{symbol}"),
87 Self::OrderBook { symbol, depth } => {
88 format!("orderbook.{depth}.{symbol}")
89 }
90 }
91 }
92}
93
94#[derive(Clone, Debug)]
95enum WsCommand {
96 Subscribe(String),
97 Shutdown,
98}
99
100pub struct BybitMarketStream {
101 info: BrokerInfo,
102 command_tx: mpsc::UnboundedSender<WsCommand>,
103 tick_rx: Mutex<mpsc::Receiver<Tick>>,
104 candle_rx: Mutex<mpsc::Receiver<Candle>>,
105 order_book_rx: Mutex<mpsc::Receiver<tesser_core::OrderBook>>,
106 connection_status: Option<Arc<AtomicBool>>,
107}
108
109impl BybitMarketStream {
110 pub async fn connect_public(
111 base_url: &str,
112 channel: PublicChannel,
113 connection_status: Option<Arc<AtomicBool>>,
114 exchange: ExchangeId,
115 ) -> BrokerResult<Self> {
116 let endpoint = format!(
117 "{}/v5/public/{}",
118 base_url.trim_end_matches('/'),
119 channel.as_path()
120 );
121 let (ws, _) = connect_async(&endpoint)
122 .await
123 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
124 if let Some(flag) = &connection_status {
125 flag.store(true, Ordering::SeqCst);
126 }
127 let (command_tx, command_rx) = mpsc::unbounded_channel();
128 let command_loop = command_tx.clone();
129 let (tick_tx, tick_rx) = mpsc::channel(2048);
130 let (candle_tx, candle_rx) = mpsc::channel(1024);
131 let (order_book_tx, order_book_rx) = mpsc::channel(256);
132 let status_for_loop = connection_status.clone();
133 let exchange_id = exchange;
134 tokio::spawn(async move {
135 if let Err(err) = run_ws_loop(
136 ws,
137 command_rx,
138 command_loop,
139 tick_tx,
140 candle_tx,
141 order_book_tx,
142 status_for_loop,
143 exchange_id,
144 )
145 .await
146 {
147 error!(error = %err, "bybit ws loop exited unexpectedly");
148 }
149 });
150 Ok(Self {
151 info: BrokerInfo {
152 name: format!("bybit-{}", channel.as_path()),
153 markets: vec![channel.as_path().to_string()],
154 supports_testnet: endpoint.contains("testnet"),
155 },
156 command_tx,
157 tick_rx: Mutex::new(tick_rx),
158 candle_rx: Mutex::new(candle_rx),
159 order_book_rx: Mutex::new(order_book_rx),
160 connection_status,
161 })
162 }
163
164 pub fn connection_status(&self) -> Option<Arc<AtomicBool>> {
165 self.connection_status.clone()
166 }
167}
168
169pub async fn connect_private(
170 base_url: &str,
171 creds: &BybitCredentials,
172 connection_status: Option<Arc<AtomicBool>>,
173) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BrokerError> {
174 let endpoint = format!("{}/v5/private", base_url.trim_end_matches('/'));
175 let (mut socket, _) = match connect_async(&endpoint).await {
176 Ok(value) => {
177 if let Some(flag) = &connection_status {
178 flag.store(true, Ordering::SeqCst);
179 }
180 value
181 }
182 Err(err) => {
183 if let Some(flag) = &connection_status {
184 flag.store(false, Ordering::SeqCst);
185 }
186 return Err(BrokerError::Transport(err.to_string()));
187 }
188 };
189
190 let expires = (Utc::now() + chrono::Duration::seconds(10)).timestamp_millis();
191 let payload = format!("GET/realtime{expires}");
192 let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
193 .map_err(|e| BrokerError::Other(format!("failed to init signer: {e}")))?;
194 mac.update(payload.as_bytes());
195 let signature = hex::encode(mac.finalize().into_bytes());
196
197 let auth_payload = json!({
198 "op": "auth",
199 "args": [creds.api_key.clone(), expires, signature],
200 });
201
202 socket
203 .send(Message::Text(auth_payload.to_string()))
204 .await
205 .map_err(|e| BrokerError::Transport(e.to_string()))?;
206
207 if let Some(Ok(Message::Text(text))) = socket.next().await {
208 if let Ok(value) = serde_json::from_str::<Value>(&text) {
209 if value
210 .get("success")
211 .and_then(|v| v.as_bool())
212 .unwrap_or(false)
213 {
214 info!("Private websocket authenticated");
215 } else {
216 warn!(payload = text, "Private websocket auth failed");
217 return Err(BrokerError::Authentication(
218 "private websocket auth failed".into(),
219 ));
220 }
221 }
222 }
223
224 let sub_payload = json!({
225 "op": "subscribe",
226 "args": ["order", "execution"],
227 });
228 socket
229 .send(Message::Text(sub_payload.to_string()))
230 .await
231 .map_err(|e| BrokerError::Transport(e.to_string()))?;
232
233 info!("Subscribed to private order/execution channels");
234
235 Ok(socket)
236}
237
238#[async_trait::async_trait]
239impl MarketStream for BybitMarketStream {
240 type Subscription = BybitSubscription;
241
242 fn name(&self) -> &str {
243 &self.info.name
244 }
245
246 fn info(&self) -> Option<&BrokerInfo> {
247 Some(&self.info)
248 }
249
250 async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
251 let topic = subscription.topic();
252 self.command_tx
253 .send(WsCommand::Subscribe(topic.clone()))
254 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
255 info!(topic, "subscribed to Bybit stream");
256 Ok(())
257 }
258
259 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
260 let mut rx = self.tick_rx.lock().await;
261 match rx.try_recv() {
262 Ok(tick) => Ok(Some(tick)),
263 Err(TryRecvError::Empty) => Ok(None),
264 Err(TryRecvError::Disconnected) => Ok(None),
265 }
266 }
267
268 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
269 let mut rx = self.candle_rx.lock().await;
270 match rx.try_recv() {
271 Ok(candle) => Ok(Some(candle)),
272 Err(TryRecvError::Empty) => Ok(None),
273 Err(TryRecvError::Disconnected) => Ok(None),
274 }
275 }
276
277 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
278 let mut rx = self.order_book_rx.lock().await;
279 match rx.try_recv() {
280 Ok(book) => Ok(Some(book)),
281 Err(TryRecvError::Empty) => Ok(None),
282 Err(TryRecvError::Disconnected) => Ok(None),
283 }
284 }
285}
286
287type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
288
289#[allow(clippy::too_many_arguments)]
290async fn run_ws_loop(
291 mut socket: WsStream,
292 mut commands: mpsc::UnboundedReceiver<WsCommand>,
293 command_tx: mpsc::UnboundedSender<WsCommand>,
294 tick_tx: mpsc::Sender<Tick>,
295 candle_tx: mpsc::Sender<Candle>,
296 order_book_tx: mpsc::Sender<OrderBook>,
297 connection_status: Option<Arc<AtomicBool>>,
298 exchange: ExchangeId,
299) -> BrokerResult<()> {
300 let mut heartbeat = interval(Duration::from_secs(20));
301 heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
302
303 if let Some(flag) = &connection_status {
304 flag.store(true, Ordering::SeqCst);
305 }
306
307 let mut book_manager = BookManager::new(exchange, order_book_tx.clone(), command_tx);
308
309 loop {
310 tokio::select! {
311 cmd = commands.recv() => {
312 match cmd {
313 Some(WsCommand::Subscribe(topic)) => send_subscribe(&mut socket, &topic).await?,
314 Some(WsCommand::Shutdown) => {
315 let _ = socket.send(Message::Close(None)).await;
316 break;
317 }
318 None => break,
319 }
320 }
321 msg = socket.next() => {
322 match msg {
323 Some(Ok(Message::Ping(payload))) => {
324 socket
325 .send(Message::Pong(payload))
326 .await
327 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))?;
328 }
329 Some(Ok(message)) => {
330 handle_message(
331 message,
332 &tick_tx,
333 &candle_tx,
334 &mut book_manager,
335 exchange,
336 )
337 .await?
338 }
339 Some(Err(err)) => return Err(BrokerError::from_display(err, BrokerErrorKind::Transport)),
340 None => break,
341 }
342 }
343 _ = heartbeat.tick() => {
344 send_ping(&mut socket).await?;
345 }
346 }
347 }
348
349 if let Some(flag) = connection_status {
350 flag.store(false, Ordering::SeqCst);
351 }
352
353 Ok(())
354}
355
356async fn send_subscribe(socket: &mut WsStream, topic: &str) -> BrokerResult<()> {
357 let payload = json!({
358 "op": "subscribe",
359 "args": [topic],
360 });
361 socket
362 .send(Message::Text(payload.to_string()))
363 .await
364 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
365}
366
367async fn send_ping(socket: &mut WsStream) -> BrokerResult<()> {
368 let payload = json!({ "op": "ping" });
369 socket
370 .send(Message::Text(payload.to_string()))
371 .await
372 .map_err(|err| BrokerError::from_display(err, BrokerErrorKind::Transport))
373}
374
375async fn handle_message(
376 message: Message,
377 tick_tx: &mpsc::Sender<Tick>,
378 candle_tx: &mpsc::Sender<Candle>,
379 book_manager: &mut BookManager,
380 exchange: ExchangeId,
381) -> BrokerResult<()> {
382 match message {
383 Message::Text(text) => {
384 process_text_message(&text, tick_tx, candle_tx, book_manager, exchange).await;
385 }
386 Message::Binary(bytes) => {
387 if let Ok(text) = String::from_utf8(bytes) {
388 process_text_message(&text, tick_tx, candle_tx, book_manager, exchange).await;
389 } else {
390 warn!("received non UTF-8 binary payload from Bybit ws");
391 }
392 }
393 Message::Ping(payload) => {
394 debug!(size = payload.len(), "received ping from Bybit");
395 }
396 Message::Pong(_) => {
397 debug!("received pong from Bybit");
398 }
399 Message::Close(frame) => {
400 debug!(?frame, "bybit stream closed");
401 return Ok(());
402 }
403 Message::Frame(_) => {}
404 }
405 Ok(())
406}
407
408async fn process_text_message(
409 text: &str,
410 tick_tx: &mpsc::Sender<Tick>,
411 candle_tx: &mpsc::Sender<Candle>,
412 book_manager: &mut BookManager,
413 exchange: ExchangeId,
414) {
415 if let Ok(value) = serde_json::from_str::<Value>(text) {
416 if let Some(topic) = value.get("topic").and_then(|t| t.as_str()) {
417 if topic.starts_with("publicTrade") {
418 if let Ok(payload) = serde_json::from_value::<TradeMessage>(value.clone()) {
419 forward_trades(exchange, payload, tick_tx).await;
420 }
421 } else if topic.starts_with("kline") {
422 if let Ok(payload) = serde_json::from_value::<KlineMessage>(value.clone()) {
423 forward_klines(exchange, payload, candle_tx).await;
424 }
425 } else if topic.starts_with("orderbook") {
426 if let Ok(payload) = serde_json::from_value::<OrderbookMessage>(value.clone()) {
427 book_manager.handle(payload).await;
428 }
429 } else if topic == "order" {
430 if let Ok(payload) = serde_json::from_value::<PrivateMessage<BybitWsOrder>>(value) {
431 for order in payload.data {
432 debug!(
433 order_id = %order.order_id,
434 status = %order.order_status,
435 "received ws order update"
436 );
437 }
438 }
439 } else if topic == "execution" {
440 if let Ok(payload) =
441 serde_json::from_value::<PrivateMessage<BybitWsExecution>>(value)
442 {
443 for exec in payload.data {
444 debug!(exec_id = %exec.exec_id, "received ws execution");
445 }
446 }
447 } else {
448 debug!(topic, "ignoring unsupported topic from Bybit");
449 }
450 return;
451 }
452
453 if let Some(op) = value.get("op").and_then(|v| v.as_str()) {
454 match op {
455 "subscribe" => {
456 let success = value
457 .get("success")
458 .and_then(|v| v.as_bool())
459 .unwrap_or(true);
460 if success {
461 debug!("subscription acknowledged by Bybit");
462 } else {
463 let msg = value
464 .get("ret_msg")
465 .and_then(|v| v.as_str())
466 .unwrap_or("unknown error");
467 warn!(message = msg, "Bybit rejected subscription request");
468 }
469 }
470 "ping" | "pong" => {
471 debug!(payload = ?value, "heartbeat ack from Bybit");
472 }
473 _ => {
474 debug!(payload = ?value, "command response from Bybit");
475 }
476 }
477 }
478 } else {
479 warn!(payload = text, "failed to parse Bybit ws payload");
480 }
481}
482
483#[derive(Deserialize, Debug)]
484struct TradeMessage {
485 _topic: String,
486 data: Vec<TradeEntry>,
487}
488
489#[derive(Deserialize, Debug)]
490struct TradeEntry {
491 #[serde(rename = "T")]
492 timestamp: i64,
493 #[serde(rename = "s")]
494 symbol: String,
495 #[serde(rename = "S")]
496 side: String,
497 #[serde(rename = "v")]
498 size: String,
499 #[serde(rename = "p")]
500 price: String,
501}
502
503#[derive(Deserialize, Debug)]
504struct KlineMessage {
505 topic: String,
506 data: Vec<KlineEntry>,
507}
508
509#[derive(Deserialize, Debug)]
510struct KlineEntry {
511 _start: i64,
512 _end: i64,
513 interval: String,
514 open: String,
515 high: String,
516 low: String,
517 close: String,
518 volume: String,
519 confirm: bool,
520 timestamp: i64,
521}
522
523#[derive(Deserialize, Debug)]
524pub struct PrivateMessage<T> {
525 pub topic: String,
526 pub data: Vec<T>,
527}
528
529#[derive(Deserialize, Debug)]
530pub struct BybitWsOrder {
531 #[serde(rename = "orderId")]
532 pub order_id: String,
533 #[serde(rename = "symbol")]
534 pub symbol: String,
535 #[serde(rename = "side")]
536 pub side: String,
537 #[serde(rename = "orderStatus")]
538 pub order_status: String,
539}
540
541async fn forward_trades(exchange: ExchangeId, payload: TradeMessage, tick_tx: &mpsc::Sender<Tick>) {
542 for trade in payload.data {
543 if let Some(tick) = build_tick(exchange, &trade) {
544 if tick_tx.send(tick).await.is_err() {
545 warn!("dropping trade tick; downstream receiver closed");
546 break;
547 }
548 }
549 }
550}
551
552fn build_tick(exchange: ExchangeId, entry: &TradeEntry) -> Option<Tick> {
553 let price = entry.price.parse().ok()?;
554 let size = entry.size.parse().ok()?;
555 let side = match entry.side.as_str() {
556 "Buy" => Side::Buy,
557 "Sell" => Side::Sell,
558 _ => return None,
559 };
560 let exchange_timestamp = millis_to_datetime(entry.timestamp)?;
561 Some(Tick {
562 symbol: Symbol::from_code(exchange, &entry.symbol),
563 price,
564 size,
565 side,
566 exchange_timestamp,
567 received_at: Utc::now(),
568 })
569}
570
571async fn forward_klines(
572 exchange: ExchangeId,
573 payload: KlineMessage,
574 candle_tx: &mpsc::Sender<Candle>,
575) {
576 for kline in payload.data {
577 if !kline.confirm {
578 continue;
579 }
580 if let Some(candle) = build_candle(exchange, &payload.topic, &kline) {
581 if candle_tx.send(candle).await.is_err() {
582 warn!("dropping kline; downstream receiver closed");
583 break;
584 }
585 }
586 }
587}
588
589fn build_candle(exchange: ExchangeId, topic: &str, entry: &KlineEntry) -> Option<Candle> {
590 let interval = parse_interval(&entry.interval)?;
591 let symbol = topic.split('.').next_back()?.to_string();
592 Some(Candle {
593 symbol: Symbol::from_code(exchange, &symbol),
594 interval,
595 open: entry.open.parse().ok()?,
596 high: entry.high.parse().ok()?,
597 low: entry.low.parse().ok()?,
598 close: entry.close.parse().ok()?,
599 volume: entry.volume.parse().ok()?,
600 timestamp: millis_to_datetime(entry.timestamp)?,
601 })
602}
603
604fn parse_interval(value: &str) -> Option<Interval> {
605 match value {
606 "1" => Some(Interval::OneMinute),
607 "5" => Some(Interval::FiveMinutes),
608 "15" => Some(Interval::FifteenMinutes),
609 "60" => Some(Interval::OneHour),
610 "240" => Some(Interval::FourHours),
611 "D" | "d" => Some(Interval::OneDay),
612 _ => None,
613 }
614}
615
616fn parse_levels(entries: &[[String; 2]]) -> Option<Vec<(Decimal, Decimal)>> {
617 let mut out = Vec::with_capacity(entries.len());
618 for entry in entries {
619 let price = entry.first()?.parse().ok()?;
620 let qty = entry.get(1)?.parse().ok()?;
621 out.push((price, qty));
622 }
623 Some(out)
624}
625
626fn parse_topic(topic: &str) -> Option<(usize, String)> {
627 let mut parts = topic.split('.');
628 let kind = parts.next()?;
629 if kind != "orderbook" {
630 return None;
631 }
632 let depth = parts.next()?.parse().ok()?;
633 let symbol = parts.next()?.to_string();
634 Some((depth, symbol))
635}
636
637#[derive(Deserialize, Debug)]
638struct OrderbookMessage {
639 topic: String,
640 #[serde(rename = "type")]
641 msg_type: String, ts: i64,
643 data: Vec<OrderbookData>,
644}
645
646#[derive(Clone, Deserialize, Debug)]
647struct OrderbookData {
648 s: String,
649 b: Vec<[String; 2]>, a: Vec<[String; 2]>, #[serde(rename = "u")]
652 update_id: i64,
653 #[serde(rename = "seq", default)]
654 seq: Option<i64>,
655 #[serde(rename = "prev_seq", default)]
656 prev_seq: Option<i64>,
657 #[serde(rename = "pu", default)]
658 prev_update_id: Option<i64>,
659 #[serde(rename = "checksum", default)]
660 checksum: Option<u32>,
661}
662
663impl OrderbookData {
664 fn sequence(&self) -> i64 {
665 self.seq.unwrap_or(self.update_id)
666 }
667
668 fn previous_sequence(&self) -> Option<i64> {
669 self.prev_seq.or(self.prev_update_id)
670 }
671}
672
673struct BookManager {
674 streams: HashMap<String, SymbolBook>,
675 order_book_tx: mpsc::Sender<OrderBook>,
676 command_tx: mpsc::UnboundedSender<WsCommand>,
677 exchange: ExchangeId,
678}
679
680impl BookManager {
681 fn new(
682 exchange: ExchangeId,
683 order_book_tx: mpsc::Sender<OrderBook>,
684 command_tx: mpsc::UnboundedSender<WsCommand>,
685 ) -> Self {
686 Self {
687 streams: HashMap::new(),
688 order_book_tx,
689 command_tx,
690 exchange,
691 }
692 }
693
694 async fn handle(&mut self, payload: OrderbookMessage) {
695 let Some((depth, _)) = parse_topic(&payload.topic) else {
696 return;
697 };
698 let Some(data) = payload.data.into_iter().next() else {
699 return;
700 };
701 let symbol = data.s.clone();
702 let stream = self
703 .streams
704 .entry(payload.topic.clone())
705 .or_insert_with(|| {
706 SymbolBook::new(self.exchange, payload.topic.clone(), symbol, depth)
707 });
708
709 match stream.ingest(payload.msg_type.as_str(), data, payload.ts) {
710 BookUpdate::Pending => {}
711 BookUpdate::OutOfSync => {
712 warn!(topic = %payload.topic, "order book sequence gap detected; resubscribing");
713 let _ = self.command_tx.send(WsCommand::Subscribe(payload.topic));
714 }
715 BookUpdate::Updates(mut books) => {
716 for book in books.drain(..) {
717 if self.order_book_tx.send(book).await.is_err() {
718 warn!("dropping order book; downstream receiver closed");
719 break;
720 }
721 }
722 }
723 }
724 }
725}
726
727#[derive(Clone)]
728struct BookLevel {
729 price: Decimal,
730 quantity: Decimal,
731}
732
733struct PendingDelta {
734 bids: Vec<BookLevel>,
735 asks: Vec<BookLevel>,
736 seq: i64,
737 prev_seq: Option<i64>,
738 ts: i64,
739}
740
741impl PendingDelta {
742 fn from_data(data: OrderbookData, ts: i64) -> Option<Self> {
743 let bids = parse_levels(&data.b)?
744 .into_iter()
745 .map(|(price, quantity)| BookLevel { price, quantity })
746 .collect();
747 let asks = parse_levels(&data.a)?
748 .into_iter()
749 .map(|(price, quantity)| BookLevel { price, quantity })
750 .collect();
751 Some(Self {
752 bids,
753 asks,
754 seq: data.sequence(),
755 prev_seq: data.previous_sequence(),
756 ts,
757 })
758 }
759}
760
761struct SymbolBook {
762 exchange: ExchangeId,
763 symbol: String,
764 depth: usize,
765 book: LocalOrderBook,
766 last_seq: Option<i64>,
767 synced: bool,
768 pending: Vec<PendingDelta>,
769 last_checksum: Option<u32>,
770}
771
772impl SymbolBook {
773 fn new(exchange: ExchangeId, _topic: String, symbol: String, depth: usize) -> Self {
774 Self {
775 exchange,
776 symbol,
777 depth,
778 book: LocalOrderBook::new(),
779 last_seq: None,
780 synced: false,
781 pending: Vec::new(),
782 last_checksum: None,
783 }
784 }
785
786 fn ingest(&mut self, msg_type: &str, data: OrderbookData, ts: i64) -> BookUpdate {
787 match msg_type {
788 "snapshot" => self.apply_snapshot(data, ts),
789 "delta" => self.apply_delta(data, ts),
790 _ => BookUpdate::Pending,
791 }
792 }
793
794 fn apply_snapshot(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
795 self.last_checksum = data.checksum;
796 let Some(snapshot_bids) = parse_levels(&data.b) else {
797 return BookUpdate::Pending;
798 };
799 let Some(snapshot_asks) = parse_levels(&data.a) else {
800 return BookUpdate::Pending;
801 };
802 self.book.load_snapshot(&snapshot_bids, &snapshot_asks);
803 self.last_seq = Some(data.sequence());
804 self.synced = true;
805 let mut updates = Vec::new();
806 if let Some(book) = self.snapshot(ts) {
807 updates.push(book);
808 }
809 let pending = std::mem::take(&mut self.pending);
810 for delta in pending {
811 match self.apply_pending(delta) {
812 ApplyOutcome::Gap => return BookUpdate::OutOfSync,
813 ApplyOutcome::Updates(mut book_updates) => updates.append(&mut book_updates),
814 ApplyOutcome::Pending => {}
815 }
816 }
817 BookUpdate::Updates(updates)
818 }
819
820 fn apply_delta(&mut self, data: OrderbookData, ts: i64) -> BookUpdate {
821 self.last_checksum = data.checksum;
822 let Some(delta) = PendingDelta::from_data(data, ts) else {
823 return BookUpdate::Pending;
824 };
825 if !self.synced {
826 self.pending.push(delta);
827 return BookUpdate::Pending;
828 }
829 match self.apply_pending(delta) {
830 ApplyOutcome::Gap => BookUpdate::OutOfSync,
831 ApplyOutcome::Pending => BookUpdate::Pending,
832 ApplyOutcome::Updates(updates) => BookUpdate::Updates(updates),
833 }
834 }
835
836 fn apply_pending(&mut self, delta: PendingDelta) -> ApplyOutcome {
837 if let Some(last) = self.last_seq {
838 if let Some(prev) = delta.prev_seq {
839 if prev != last {
840 self.reset();
841 return ApplyOutcome::Gap;
842 }
843 } else if delta.seq - 1 != last {
844 self.reset();
845 return ApplyOutcome::Gap;
846 }
847 } else {
848 self.pending.push(delta);
849 return ApplyOutcome::Pending;
850 }
851
852 for level in &delta.bids {
853 self.book
854 .apply_delta(Side::Buy, level.price, level.quantity);
855 }
856 for level in &delta.asks {
857 self.book
858 .apply_delta(Side::Sell, level.price, level.quantity);
859 }
860 self.last_seq = Some(delta.seq);
861
862 if let Some(book) = self.snapshot(delta.ts) {
863 ApplyOutcome::Updates(vec![book])
864 } else {
865 ApplyOutcome::Updates(Vec::new())
866 }
867 }
868
869 fn snapshot(&self, ts: i64) -> Option<OrderBook> {
870 if self.book.is_empty() {
871 return None;
872 }
873 let timestamp = millis_to_datetime(ts)?;
874 let bids = self
875 .book
876 .bid_levels(self.depth)
877 .into_iter()
878 .map(|(price, size)| OrderBookLevel { price, size })
879 .collect::<Vec<_>>();
880 let asks = self
881 .book
882 .ask_levels(self.depth)
883 .into_iter()
884 .map(|(price, size)| OrderBookLevel { price, size })
885 .collect::<Vec<_>>();
886 Some(OrderBook {
887 symbol: Symbol::from_code(self.exchange, &self.symbol),
888 bids,
889 asks,
890 timestamp,
891 exchange_checksum: self.last_checksum,
892 local_checksum: Some(self.book.checksum(self.depth)),
893 })
894 }
895
896 fn reset(&mut self) {
897 self.synced = false;
898 self.last_seq = None;
899 self.pending.clear();
900 }
901}
902
903enum ApplyOutcome {
904 Updates(Vec<OrderBook>),
905 Pending,
906 Gap,
907}
908
909enum BookUpdate {
910 Updates(Vec<OrderBook>),
911 Pending,
912 OutOfSync,
913}
914
915fn millis_to_datetime(value: i64) -> Option<DateTime<Utc>> {
916 Utc.timestamp_millis_opt(value).single()
917}
918
919impl Drop for BybitMarketStream {
920 fn drop(&mut self) {
921 let _ = self.command_tx.send(WsCommand::Shutdown);
922 }
923}
924
925#[derive(Deserialize, Debug)]
926pub struct BybitWsExecution {
927 #[serde(rename = "execId")]
928 pub exec_id: String,
929 #[serde(rename = "orderId")]
930 pub order_id: String,
931 #[serde(rename = "symbol")]
932 pub symbol: String,
933 #[serde(rename = "execPrice")]
934 pub exec_price: String,
935 #[serde(rename = "execQty")]
936 pub exec_qty: String,
937 #[serde(rename = "side")]
938 pub side: String,
939 #[serde(rename = "execFee")]
940 pub exec_fee: String,
941 #[serde(rename = "feeCurrency")]
942 pub fee_currency: Option<String>,
943 #[serde(rename = "execTime")]
944 pub exec_time: String,
945 #[serde(rename = "cumExecQty")]
946 pub cum_exec_qty: String,
947 #[serde(rename = "avgPrice")]
948 pub avg_price: String,
949}
950
951impl BybitWsOrder {
952 pub fn to_tesser_order(
953 &self,
954 exchange: ExchangeId,
955 existing: Option<&Order>,
956 ) -> Result<Order, BrokerError> {
957 Ok(Order {
958 id: self.order_id.clone(),
959 request: existing
960 .map(|o| o.request.clone())
961 .unwrap_or_else(|| OrderRequest {
962 symbol: Symbol::from_code(exchange, &self.symbol),
963 side: if self.side == "Buy" {
964 Side::Buy
965 } else {
966 Side::Sell
967 },
968 order_type: OrderType::Market,
969 quantity: Decimal::ZERO,
970 price: None,
971 trigger_price: None,
972 time_in_force: None,
973 client_order_id: None,
974 take_profit: None,
975 stop_loss: None,
976 display_quantity: None,
977 }),
978 status: crate::BybitClient::map_order_status(&self.order_status),
979 filled_quantity: existing.map(|o| o.filled_quantity).unwrap_or(Decimal::ZERO),
980 avg_fill_price: existing.and_then(|o| o.avg_fill_price),
981 created_at: existing.map(|o| o.created_at).unwrap_or_else(Utc::now),
982 updated_at: Utc::now(),
983 })
984 }
985}
986
987impl BybitWsExecution {
988 pub fn to_tesser_fill(&self, exchange: ExchangeId) -> Result<Fill, BrokerError> {
989 let fill_price = self.exec_price.parse::<Decimal>().map_err(|e| {
990 BrokerError::Serialization(format!(
991 "failed to parse exec price {}: {e}",
992 self.exec_price
993 ))
994 })?;
995 let fill_quantity = self.exec_qty.parse::<Decimal>().map_err(|e| {
996 BrokerError::Serialization(format!("failed to parse exec qty {}: {e}", self.exec_qty))
997 })?;
998 let fee = self.exec_fee.parse::<Decimal>().ok();
999 let timestamp = parse_millis(&self.exec_time);
1000 let side = match self.side.as_str() {
1001 "Buy" => Side::Buy,
1002 "Sell" => Side::Sell,
1003 other => {
1004 return Err(BrokerError::Serialization(format!(
1005 "unhandled execution side: {other}"
1006 )))
1007 }
1008 };
1009
1010 let fee_asset = self
1011 .fee_currency
1012 .as_deref()
1013 .filter(|code| !code.is_empty())
1014 .map(|code| AssetId::from_code(exchange, code));
1015 Ok(Fill {
1016 order_id: self.order_id.clone(),
1017 symbol: Symbol::from_code(exchange, &self.symbol),
1018 side,
1019 fill_price,
1020 fill_quantity,
1021 fee,
1022 fee_asset,
1023 timestamp,
1024 })
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031
1032 fn sample_levels(levels: &[(&str, &str)]) -> Vec<[String; 2]> {
1033 levels
1034 .iter()
1035 .map(|(price, qty)| [price.to_string(), qty.to_string()])
1036 .collect()
1037 }
1038
1039 fn sample_data(
1040 symbol: &str,
1041 bids: &[(&str, &str)],
1042 asks: &[(&str, &str)],
1043 seq: i64,
1044 prev_seq: Option<i64>,
1045 ) -> OrderbookData {
1046 OrderbookData {
1047 s: symbol.into(),
1048 b: sample_levels(bids),
1049 a: sample_levels(asks),
1050 update_id: seq,
1051 seq: Some(seq),
1052 prev_seq,
1053 prev_update_id: None,
1054 checksum: None,
1055 }
1056 }
1057
1058 #[tokio::test]
1059 async fn book_manager_applies_snapshot_and_deltas() {
1060 let (book_tx, mut book_rx) = mpsc::channel(8);
1061 let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
1062 let exchange = ExchangeId::from("bybit_linear");
1063 let mut manager = BookManager::new(exchange, book_tx, cmd_tx);
1064
1065 let snapshot = OrderbookMessage {
1066 topic: "orderbook.2.BTCUSDT".into(),
1067 msg_type: "snapshot".into(),
1068 ts: 1,
1069 data: vec![sample_data(
1070 "BTCUSDT",
1071 &[("100", "1"), ("99", "2")],
1072 &[("101", "1"), ("102", "2")],
1073 10,
1074 Some(9),
1075 )],
1076 };
1077 manager.handle(snapshot).await;
1078 let first = book_rx.recv().await.expect("snapshot missing");
1079 assert_eq!(first.bids[0].price, Decimal::from(100));
1080 assert_eq!(first.asks[0].price, Decimal::from(101));
1081
1082 let delta = OrderbookMessage {
1083 topic: "orderbook.2.BTCUSDT".into(),
1084 msg_type: "delta".into(),
1085 ts: 2,
1086 data: vec![sample_data(
1087 "BTCUSDT",
1088 &[("100", "0"), ("98", "1")],
1089 &[("101", "2")],
1090 11,
1091 Some(10),
1092 )],
1093 };
1094 manager.handle(delta).await;
1095 let update = book_rx.recv().await.expect("delta missing");
1096 assert_eq!(update.bids.len(), 2);
1097 assert_eq!(update.bids[1].price, Decimal::from(98));
1098 assert_eq!(update.asks[0].size, Decimal::from(2));
1099 }
1100
1101 #[tokio::test]
1102 async fn book_manager_requests_resub_on_gap() {
1103 let (book_tx, mut book_rx) = mpsc::channel(8);
1104 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel();
1105 let exchange = ExchangeId::from("bybit_linear");
1106 let mut manager = BookManager::new(exchange, book_tx, cmd_tx.clone());
1107
1108 let snapshot = OrderbookMessage {
1109 topic: "orderbook.1.BTCUSDT".into(),
1110 msg_type: "snapshot".into(),
1111 ts: 1,
1112 data: vec![sample_data(
1113 "BTCUSDT",
1114 &[("100", "1")],
1115 &[("101", "1")],
1116 5,
1117 Some(4),
1118 )],
1119 };
1120 manager.handle(snapshot).await;
1121 book_rx.recv().await.expect("snapshot missing");
1122
1123 let gap_delta = OrderbookMessage {
1124 topic: "orderbook.1.BTCUSDT".into(),
1125 msg_type: "delta".into(),
1126 ts: 2,
1127 data: vec![sample_data(
1128 "BTCUSDT",
1129 &[("100", "0")],
1130 &[("101", "2")],
1131 8,
1132 Some(6),
1133 )],
1134 };
1135 manager.handle(gap_delta).await;
1136
1137 let resub = cmd_rx.recv().await.expect("resubscribe missing");
1138 match resub {
1139 WsCommand::Subscribe(topic) => assert_eq!(topic, "orderbook.1.BTCUSDT"),
1140 _ => panic!("unexpected command {:?}", resub),
1141 }
1142 }
1143}