Skip to main content

betfair_stream_api/cache/tracker/
mod.rs

1mod market_stream_tracker;
2mod order_stream_tracker;
3
4use betfair_stream_types::response::market_change_message::MarketChangeMessage;
5use betfair_stream_types::response::order_change_message::OrderChangeMessage;
6use betfair_stream_types::response::{
7    ChangeType, Clock, DataChange, DatasetChangeMessage, InitialClock,
8};
9use serde::de::DeserializeOwned;
10
11use self::market_stream_tracker::MarketStreamTracker;
12use self::order_stream_tracker::OrderStreamTracker;
13use super::primitives::{MarketBookCache, OrderBookCache};
14
15/// Separate stream struct to hold market/order caches
16#[derive(Debug, Clone)]
17pub struct StreamState {
18    pub stream_id: Option<u64>,
19    pub update_clk: Option<Clock>,
20    pub max_latency_ms: Option<u64>,
21    pub unique_id: Option<i32>,
22    pub initial_clock: Option<InitialClock>,
23    pub time_created: chrono::DateTime<chrono::Utc>,
24    pub time_updated: chrono::DateTime<chrono::Utc>,
25    pub market_stream_tracker: MarketStreamTracker,
26    pub order_stream_tracker: OrderStreamTracker,
27}
28
29pub enum Updates<'a> {
30    Market(Vec<&'a MarketBookCache>),
31    Order(Vec<&'a OrderBookCache>),
32}
33
34pub struct HasFullImage(pub bool);
35
36impl Default for StreamState {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl StreamState {
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            stream_id: None,
47            update_clk: None,
48            max_latency_ms: None,
49            unique_id: None,
50            initial_clock: None,
51            time_created: chrono::Utc::now(),
52            time_updated: chrono::Utc::now(),
53            market_stream_tracker: MarketStreamTracker::new(),
54            order_stream_tracker: OrderStreamTracker::new(),
55        }
56    }
57
58    pub fn order_change_update(&mut self, msg: OrderChangeMessage) -> Option<Vec<&OrderBookCache>> {
59        match msg.change_type {
60            Some(ChangeType::SubImage) => {
61                self.update_clk(&msg);
62                self.order_stream_tracker.process(msg).0
63            }
64            Some(ChangeType::Heartbeat) => {
65                self.update_clk(&msg);
66                self.on_heartbeat(&msg);
67                None
68            }
69            None | Some(ChangeType::ResubDelta) => {
70                self.on_update(&msg);
71                self.order_stream_tracker.process(msg).0
72            }
73        }
74    }
75
76    pub fn market_change_update(
77        &mut self,
78        msg: MarketChangeMessage,
79    ) -> Option<Vec<&MarketBookCache>> {
80        match msg.change_type {
81            Some(ChangeType::SubImage) => {
82                self.update_clk(&msg);
83                self.market_stream_tracker.process(msg).0
84            }
85            Some(ChangeType::Heartbeat) => {
86                self.update_clk(&msg);
87                self.on_heartbeat(&msg);
88                None
89            }
90            None | Some(ChangeType::ResubDelta) => {
91                self.on_update(&msg);
92                self.market_stream_tracker.process(msg).0
93            }
94        }
95    }
96
97    fn on_update<T: DeserializeOwned + DataChange<T>>(&mut self, msg: &DatasetChangeMessage<T>) {
98        if self.update_clk.is_some() {
99            self.update_clk(msg);
100        }
101        if let (Some(publish_time), Some(max_latency_ms)) = (msg.publish_time, self.max_latency_ms)
102        {
103            let latency = chrono::Utc::now().signed_duration_since(publish_time);
104            if latency.num_milliseconds() > max_latency_ms.try_into().unwrap_or(0) {
105                tracing::warn!(
106                    "High Latency! {:?}ms is greater than max_latency_ms of {:?}ms",
107                    latency.num_milliseconds(),
108                    max_latency_ms
109                );
110            }
111        }
112    }
113
114    fn on_heartbeat<T: DeserializeOwned + DataChange<T>>(&mut self, msg: &DatasetChangeMessage<T>) {
115        self.update_clk(msg);
116    }
117
118    #[allow(unused)]
119    pub(crate) fn clear_stale_cache(&mut self, publish_time: chrono::DateTime<chrono::Utc>) {
120        self.market_stream_tracker.clear_stale_cache(publish_time);
121        self.order_stream_tracker.clear_stale_cache(publish_time);
122    }
123
124    fn update_clk<T: DeserializeOwned + DataChange<T>>(&mut self, data: &DatasetChangeMessage<T>) {
125        if let Some(ref initial_clock) = data.initial_clock {
126            self.initial_clock = Some(initial_clock.clone());
127        }
128
129        if let Some(ref update_clk) = data.clock {
130            self.update_clk = Some(update_clk.clone());
131        }
132
133        self.time_updated = chrono::Utc::now();
134    }
135}