Skip to main content

rustrade_data/exchange/bybit/book/
l2.rs

1use std::vec;
2
3use crate::{
4    Identifier,
5    error::DataError,
6    event::{MarketEvent, MarketIter},
7    exchange::{
8        Connector,
9        bybit::{Bybit, message::BybitPayloadKind, spot::BybitSpot},
10    },
11    subscription::{
12        Map,
13        book::{OrderBookEvent, OrderBooksL2},
14    },
15    transformer::ExchangeTransformer,
16};
17use derive_more::Constructor;
18use rustrade_integration::{Transformer, protocol::websocket::WsMessage};
19use tokio::sync::mpsc::UnboundedSender;
20use tracing::debug;
21
22use super::BybitOrderBookMessage;
23
24#[derive(Debug, Constructor)]
25pub struct BybitOrderBookL2Meta<InstrumentKey, Sequencer> {
26    pub key: InstrumentKey,
27    pub sequencer: Option<Sequencer>,
28}
29
30#[derive(Debug)]
31pub struct BybitOrderBooksL2Transformer<InstrumentKey> {
32    instrument_map: Map<BybitOrderBookL2Meta<InstrumentKey, BybitOrderBookL2Sequencer>>,
33}
34
35impl<InstrumentKey, Server> ExchangeTransformer<Bybit<Server>, InstrumentKey, OrderBooksL2>
36    for BybitOrderBooksL2Transformer<InstrumentKey>
37where
38    InstrumentKey: Clone + PartialEq + Send + Sync,
39    Server: Send,
40{
41    async fn init(
42        instrument_map: Map<InstrumentKey>,
43        _: &[MarketEvent<InstrumentKey, OrderBookEvent>],
44        _: UnboundedSender<WsMessage>,
45    ) -> Result<Self, DataError> {
46        let instrument_map = instrument_map
47            .0
48            .into_iter()
49            .map(|(sub_id, instrument_key)| {
50                (sub_id, BybitOrderBookL2Meta::new(instrument_key, None))
51            })
52            .collect();
53
54        Ok(Self { instrument_map })
55    }
56}
57
58impl<InstrumentKey> Transformer for BybitOrderBooksL2Transformer<InstrumentKey>
59where
60    InstrumentKey: Clone,
61{
62    type Error = DataError;
63    type Input = BybitOrderBookMessage;
64    type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
65    type OutputIter = Vec<Result<Self::Output, Self::Error>>;
66
67    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
68        // Determine if the message has an identifiable SubscriptionId
69        let subscription_id = match input.id() {
70            Some(subscription_id) => subscription_id,
71            None => return vec![],
72        };
73
74        // Find Instrument associated with Input and transform
75        let instrument = match self.instrument_map.find_mut(&subscription_id) {
76            Ok(instrument) => instrument,
77            Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
78        };
79
80        // Initialise a sequencer when snapshot received from the exchange. We
81        // return immediately because the snapshot message is always valid.
82        if matches!(input.kind, BybitPayloadKind::Snapshot) {
83            instrument.sequencer.replace(BybitOrderBookL2Sequencer {
84                last_update_id: input.data.update_id,
85            });
86
87            return MarketIter::<InstrumentKey, OrderBookEvent>::from((
88                BybitSpot::ID,
89                instrument.key.clone(),
90                input,
91            ))
92            .0;
93        }
94
95        // Could happen if we receive an update message before the snapshot
96        let Some(sequencer) = &mut instrument.sequencer else {
97            debug!("Update message received before initial Snapshot");
98            return vec![];
99        };
100
101        // Drop any outdated updates & validate sequence for relevant updates
102        let valid_update = match sequencer.validate_sequence(input) {
103            Ok(Some(valid_update)) => valid_update,
104            Ok(None) => return vec![],
105            Err(error) => return vec![Err(error)],
106        };
107
108        MarketIter::<InstrumentKey, OrderBookEvent>::from((
109            BybitSpot::ID,
110            instrument.key.clone(),
111            valid_update,
112        ))
113        .0
114    }
115}
116
117#[derive(Debug)]
118struct BybitOrderBookL2Sequencer {
119    last_update_id: u64,
120}
121
122impl BybitOrderBookL2Sequencer {
123    pub fn validate_sequence(
124        &mut self,
125        update: BybitOrderBookMessage,
126    ) -> Result<Option<BybitOrderBookMessage>, DataError> {
127        // Each new update_id should be `last_update_id + 1`
128        if update.data.update_id != self.last_update_id + 1 {
129            return Err(DataError::InvalidSequence {
130                prev_last_update_id: self.last_update_id,
131                first_update_id: update.data.update_id,
132            });
133        }
134
135        // Update metadata
136        self.last_update_id = update.data.update_id;
137
138        Ok(Some(update))
139    }
140}