barter_data/exchange/bybit/book/
l2.rs

1use std::vec;
2
3use crate::{
4    Identifier,
5    error::DataError,
6    event::{MarketEvent, MarketIter},
7    exchange::{
8        Connector,
9        bybit::{Bybit, message::BybitPayloadKind, spot::BybitSpot},
10    },
11    subscription::{
12        Map,
13        book::{OrderBookEvent, OrderBooksL2},
14    },
15    transformer::ExchangeTransformer,
16};
17use async_trait::async_trait;
18use barter_integration::{Transformer, protocol::websocket::WsMessage};
19use derive_more::Constructor;
20use tokio::sync::mpsc::UnboundedSender;
21use tracing::debug;
22
23use super::BybitOrderBookMessage;
24
25#[derive(Debug, Constructor)]
26pub struct BybitOrderBookL2Meta<InstrumentKey, Sequencer> {
27    pub key: InstrumentKey,
28    pub sequencer: Option<Sequencer>,
29}
30
31#[derive(Debug)]
32pub struct BybitOrderBooksL2Transformer<InstrumentKey> {
33    instrument_map: Map<BybitOrderBookL2Meta<InstrumentKey, BybitOrderBookL2Sequencer>>,
34}
35
36#[async_trait]
37impl<InstrumentKey, Server> ExchangeTransformer<Bybit<Server>, InstrumentKey, OrderBooksL2>
38    for BybitOrderBooksL2Transformer<InstrumentKey>
39where
40    InstrumentKey: Clone + PartialEq + Send + Sync,
41{
42    async fn init(
43        instrument_map: Map<InstrumentKey>,
44        _: &[MarketEvent<InstrumentKey, OrderBookEvent>],
45        _: UnboundedSender<WsMessage>,
46    ) -> Result<Self, DataError> {
47        let instrument_map = instrument_map
48            .0
49            .into_iter()
50            .map(|(sub_id, instrument_key)| {
51                (sub_id, BybitOrderBookL2Meta::new(instrument_key, None))
52            })
53            .collect();
54
55        Ok(Self { instrument_map })
56    }
57}
58
59impl<InstrumentKey> Transformer for BybitOrderBooksL2Transformer<InstrumentKey>
60where
61    InstrumentKey: Clone,
62{
63    type Error = DataError;
64    type Input = BybitOrderBookMessage;
65    type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
66    type OutputIter = Vec<Result<Self::Output, Self::Error>>;
67
68    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
69        // Determine if the message has an identifiable SubscriptionId
70        let subscription_id = match input.id() {
71            Some(subscription_id) => subscription_id,
72            None => return vec![],
73        };
74
75        // Find Instrument associated with Input and transform
76        let instrument = match self.instrument_map.find_mut(&subscription_id) {
77            Ok(instrument) => instrument,
78            Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
79        };
80
81        // Initialise a sequencer when snapshot received from the exchange. We
82        // return immediately because the snapshot message is always valid.
83        if matches!(input.kind, BybitPayloadKind::Snapshot) {
84            instrument.sequencer.replace(BybitOrderBookL2Sequencer {
85                last_update_id: input.data.update_id,
86            });
87
88            return MarketIter::<InstrumentKey, OrderBookEvent>::from((
89                BybitSpot::ID,
90                instrument.key.clone(),
91                input,
92            ))
93            .0;
94        }
95
96        // Could happen if we receive an update message before the snapshot
97        let Some(sequencer) = &mut instrument.sequencer else {
98            debug!("Update message received before initial Snapshot");
99            return vec![];
100        };
101
102        // Drop any outdated updates & validate sequence for relevant updates
103        let valid_update = match sequencer.validate_sequence(input) {
104            Ok(Some(valid_update)) => valid_update,
105            Ok(None) => return vec![],
106            Err(error) => return vec![Err(error)],
107        };
108
109        MarketIter::<InstrumentKey, OrderBookEvent>::from((
110            BybitSpot::ID,
111            instrument.key.clone(),
112            valid_update,
113        ))
114        .0
115    }
116}
117
118#[derive(Debug)]
119struct BybitOrderBookL2Sequencer {
120    last_update_id: u64,
121}
122
123impl BybitOrderBookL2Sequencer {
124    pub fn validate_sequence(
125        &mut self,
126        update: BybitOrderBookMessage,
127    ) -> Result<Option<BybitOrderBookMessage>, DataError> {
128        // Each new update_id should be `last_update_id + 1`
129        if update.data.update_id != self.last_update_id + 1 {
130            return Err(DataError::InvalidSequence {
131                prev_last_update_id: self.last_update_id,
132                first_update_id: update.data.update_id,
133            });
134        }
135
136        // Update metadata
137        self.last_update_id = update.data.update_id;
138
139        Ok(Some(update))
140    }
141}