betfair_stream_api/cache/tracker/
mod.rs1mod market_stream_tracker;
2mod order_stream_tracker;
3
4use betfair_stream_types::response::market_change_message::MarketChangeMessage;
5use betfair_stream_types::response::order_change_message::OrderChangeMessage;
6use betfair_stream_types::response::{
7 ChangeType, Clock, DataChange, DatasetChangeMessage, InitialClock,
8};
9use serde::de::DeserializeOwned;
10
11use self::market_stream_tracker::MarketStreamTracker;
12use self::order_stream_tracker::OrderStreamTracker;
13use super::primitives::{MarketBookCache, OrderBookCache};
14
15#[derive(Debug, Clone)]
17pub struct StreamState {
18 pub stream_id: Option<u64>,
19 pub update_clk: Option<Clock>,
20 pub max_latency_ms: Option<u64>,
21 pub unique_id: Option<i32>,
22 pub initial_clock: Option<InitialClock>,
23 pub time_created: chrono::DateTime<chrono::Utc>,
24 pub time_updated: chrono::DateTime<chrono::Utc>,
25 pub market_stream_tracker: MarketStreamTracker,
26 pub order_stream_tracker: OrderStreamTracker,
27}
28
29pub enum Updates<'a> {
30 Market(Vec<&'a MarketBookCache>),
31 Order(Vec<&'a OrderBookCache>),
32}
33
34pub struct HasFullImage(pub bool);
35
36impl Default for StreamState {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl StreamState {
43 #[must_use]
44 pub fn new() -> Self {
45 Self {
46 stream_id: None,
47 update_clk: None,
48 max_latency_ms: None,
49 unique_id: None,
50 initial_clock: None,
51 time_created: chrono::Utc::now(),
52 time_updated: chrono::Utc::now(),
53 market_stream_tracker: MarketStreamTracker::new(),
54 order_stream_tracker: OrderStreamTracker::new(),
55 }
56 }
57
58 pub fn order_change_update(&mut self, msg: OrderChangeMessage) -> Option<Vec<&OrderBookCache>> {
59 match msg.change_type {
60 Some(ChangeType::SubImage) => {
61 self.update_clk(&msg);
62 self.order_stream_tracker.process(msg).0
63 }
64 Some(ChangeType::Heartbeat) => {
65 self.update_clk(&msg);
66 self.on_heartbeat(&msg);
67 None
68 }
69 None | Some(ChangeType::ResubDelta) => {
70 self.on_update(&msg);
71 self.order_stream_tracker.process(msg).0
72 }
73 }
74 }
75
76 pub fn market_change_update(
77 &mut self,
78 msg: MarketChangeMessage,
79 ) -> Option<Vec<&MarketBookCache>> {
80 match msg.change_type {
81 Some(ChangeType::SubImage) => {
82 self.update_clk(&msg);
83 self.market_stream_tracker.process(msg).0
84 }
85 Some(ChangeType::Heartbeat) => {
86 self.update_clk(&msg);
87 self.on_heartbeat(&msg);
88 None
89 }
90 None | Some(ChangeType::ResubDelta) => {
91 self.on_update(&msg);
92 self.market_stream_tracker.process(msg).0
93 }
94 }
95 }
96
97 fn on_update<T: DeserializeOwned + DataChange<T>>(&mut self, msg: &DatasetChangeMessage<T>) {
98 if self.update_clk.is_some() {
99 self.update_clk(msg);
100 }
101 if let (Some(publish_time), Some(max_latency_ms)) = (msg.publish_time, self.max_latency_ms)
102 {
103 let latency = chrono::Utc::now().signed_duration_since(publish_time);
104 if latency.num_milliseconds() > max_latency_ms.try_into().unwrap_or(0) {
105 tracing::warn!(
106 "High Latency! {:?}ms is greater than max_latency_ms of {:?}ms",
107 latency.num_milliseconds(),
108 max_latency_ms
109 );
110 }
111 }
112 }
113
114 fn on_heartbeat<T: DeserializeOwned + DataChange<T>>(&mut self, msg: &DatasetChangeMessage<T>) {
115 self.update_clk(msg);
116 }
117
118 #[allow(unused)]
119 pub(crate) fn clear_stale_cache(&mut self, publish_time: chrono::DateTime<chrono::Utc>) {
120 self.market_stream_tracker.clear_stale_cache(publish_time);
121 self.order_stream_tracker.clear_stale_cache(publish_time);
122 }
123
124 fn update_clk<T: DeserializeOwned + DataChange<T>>(&mut self, data: &DatasetChangeMessage<T>) {
125 if let Some(ref initial_clock) = data.initial_clock {
126 self.initial_clock = Some(initial_clock.clone());
127 }
128
129 if let Some(ref update_clk) = data.clock {
130 self.update_clk = Some(update_clk.clone());
131 }
132
133 self.time_updated = chrono::Utc::now();
134 }
135}