barter_data/exchange/bybit/book/
l2.rs1use std::vec;
2
3use crate::{
4 Identifier,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 bybit::{Bybit, message::BybitPayloadKind, spot::BybitSpot},
10 },
11 subscription::{
12 Map,
13 book::{OrderBookEvent, OrderBooksL2},
14 },
15 transformer::ExchangeTransformer,
16};
17use async_trait::async_trait;
18use barter_integration::{Transformer, protocol::websocket::WsMessage};
19use derive_more::Constructor;
20use tokio::sync::mpsc::UnboundedSender;
21use tracing::debug;
22
23use super::BybitOrderBookMessage;
24
25#[derive(Debug, Constructor)]
26pub struct BybitOrderBookL2Meta<InstrumentKey, Sequencer> {
27 pub key: InstrumentKey,
28 pub sequencer: Option<Sequencer>,
29}
30
31#[derive(Debug)]
32pub struct BybitOrderBooksL2Transformer<InstrumentKey> {
33 instrument_map: Map<BybitOrderBookL2Meta<InstrumentKey, BybitOrderBookL2Sequencer>>,
34}
35
36#[async_trait]
37impl<InstrumentKey, Server> ExchangeTransformer<Bybit<Server>, InstrumentKey, OrderBooksL2>
38 for BybitOrderBooksL2Transformer<InstrumentKey>
39where
40 InstrumentKey: Clone + PartialEq + Send + Sync,
41{
42 async fn init(
43 instrument_map: Map<InstrumentKey>,
44 _: &[MarketEvent<InstrumentKey, OrderBookEvent>],
45 _: UnboundedSender<WsMessage>,
46 ) -> Result<Self, DataError> {
47 let instrument_map = instrument_map
48 .0
49 .into_iter()
50 .map(|(sub_id, instrument_key)| {
51 (sub_id, BybitOrderBookL2Meta::new(instrument_key, None))
52 })
53 .collect();
54
55 Ok(Self { instrument_map })
56 }
57}
58
59impl<InstrumentKey> Transformer for BybitOrderBooksL2Transformer<InstrumentKey>
60where
61 InstrumentKey: Clone,
62{
63 type Error = DataError;
64 type Input = BybitOrderBookMessage;
65 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
66 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
67
68 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
69 let subscription_id = match input.id() {
71 Some(subscription_id) => subscription_id,
72 None => return vec![],
73 };
74
75 let instrument = match self.instrument_map.find_mut(&subscription_id) {
77 Ok(instrument) => instrument,
78 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
79 };
80
81 if matches!(input.kind, BybitPayloadKind::Snapshot) {
84 instrument.sequencer.replace(BybitOrderBookL2Sequencer {
85 last_update_id: input.data.update_id,
86 });
87
88 return MarketIter::<InstrumentKey, OrderBookEvent>::from((
89 BybitSpot::ID,
90 instrument.key.clone(),
91 input,
92 ))
93 .0;
94 }
95
96 let Some(sequencer) = &mut instrument.sequencer else {
98 debug!("Update message received before initial Snapshot");
99 return vec![];
100 };
101
102 let valid_update = match sequencer.validate_sequence(input) {
104 Ok(Some(valid_update)) => valid_update,
105 Ok(None) => return vec![],
106 Err(error) => return vec![Err(error)],
107 };
108
109 MarketIter::<InstrumentKey, OrderBookEvent>::from((
110 BybitSpot::ID,
111 instrument.key.clone(),
112 valid_update,
113 ))
114 .0
115 }
116}
117
118#[derive(Debug)]
119struct BybitOrderBookL2Sequencer {
120 last_update_id: u64,
121}
122
123impl BybitOrderBookL2Sequencer {
124 pub fn validate_sequence(
125 &mut self,
126 update: BybitOrderBookMessage,
127 ) -> Result<Option<BybitOrderBookMessage>, DataError> {
128 if update.data.update_id != self.last_update_id + 1 {
130 return Err(DataError::InvalidSequence {
131 prev_last_update_id: self.last_update_id,
132 first_update_id: update.data.update_id,
133 });
134 }
135
136 self.last_update_id = update.data.update_id;
138
139 Ok(Some(update))
140 }
141}