rustrade_data/exchange/bybit/book/
l2.rs1use std::vec;
2
3use crate::{
4 Identifier,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 bybit::{Bybit, message::BybitPayloadKind, spot::BybitSpot},
10 },
11 subscription::{
12 Map,
13 book::{OrderBookEvent, OrderBooksL2},
14 },
15 transformer::ExchangeTransformer,
16};
17use derive_more::Constructor;
18use rustrade_integration::{Transformer, protocol::websocket::WsMessage};
19use tokio::sync::mpsc::UnboundedSender;
20use tracing::debug;
21
22use super::BybitOrderBookMessage;
23
24#[derive(Debug, Constructor)]
25pub struct BybitOrderBookL2Meta<InstrumentKey, Sequencer> {
26 pub key: InstrumentKey,
27 pub sequencer: Option<Sequencer>,
28}
29
30#[derive(Debug)]
31pub struct BybitOrderBooksL2Transformer<InstrumentKey> {
32 instrument_map: Map<BybitOrderBookL2Meta<InstrumentKey, BybitOrderBookL2Sequencer>>,
33}
34
35impl<InstrumentKey, Server> ExchangeTransformer<Bybit<Server>, InstrumentKey, OrderBooksL2>
36 for BybitOrderBooksL2Transformer<InstrumentKey>
37where
38 InstrumentKey: Clone + PartialEq + Send + Sync,
39 Server: Send,
40{
41 async fn init(
42 instrument_map: Map<InstrumentKey>,
43 _: &[MarketEvent<InstrumentKey, OrderBookEvent>],
44 _: UnboundedSender<WsMessage>,
45 ) -> Result<Self, DataError> {
46 let instrument_map = instrument_map
47 .0
48 .into_iter()
49 .map(|(sub_id, instrument_key)| {
50 (sub_id, BybitOrderBookL2Meta::new(instrument_key, None))
51 })
52 .collect();
53
54 Ok(Self { instrument_map })
55 }
56}
57
58impl<InstrumentKey> Transformer for BybitOrderBooksL2Transformer<InstrumentKey>
59where
60 InstrumentKey: Clone,
61{
62 type Error = DataError;
63 type Input = BybitOrderBookMessage;
64 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
65 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
66
67 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
68 let subscription_id = match input.id() {
70 Some(subscription_id) => subscription_id,
71 None => return vec![],
72 };
73
74 let instrument = match self.instrument_map.find_mut(&subscription_id) {
76 Ok(instrument) => instrument,
77 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
78 };
79
80 if matches!(input.kind, BybitPayloadKind::Snapshot) {
83 instrument.sequencer.replace(BybitOrderBookL2Sequencer {
84 last_update_id: input.data.update_id,
85 });
86
87 return MarketIter::<InstrumentKey, OrderBookEvent>::from((
88 BybitSpot::ID,
89 instrument.key.clone(),
90 input,
91 ))
92 .0;
93 }
94
95 let Some(sequencer) = &mut instrument.sequencer else {
97 debug!("Update message received before initial Snapshot");
98 return vec![];
99 };
100
101 let valid_update = match sequencer.validate_sequence(input) {
103 Ok(Some(valid_update)) => valid_update,
104 Ok(None) => return vec![],
105 Err(error) => return vec![Err(error)],
106 };
107
108 MarketIter::<InstrumentKey, OrderBookEvent>::from((
109 BybitSpot::ID,
110 instrument.key.clone(),
111 valid_update,
112 ))
113 .0
114 }
115}
116
117#[derive(Debug)]
118struct BybitOrderBookL2Sequencer {
119 last_update_id: u64,
120}
121
122impl BybitOrderBookL2Sequencer {
123 pub fn validate_sequence(
124 &mut self,
125 update: BybitOrderBookMessage,
126 ) -> Result<Option<BybitOrderBookMessage>, DataError> {
127 if update.data.update_id != self.last_update_id + 1 {
129 return Err(DataError::InvalidSequence {
130 prev_last_update_id: self.last_update_id,
131 first_update_id: update.data.update_id,
132 });
133 }
134
135 self.last_update_id = update.data.update_id;
137
138 Ok(Some(update))
139 }
140}