1use rust_decimal::Decimal;
19use std::collections::{BTreeMap, VecDeque};
20
21use crate::spot::{http::OrderBook, ws::DepthUpdateMsg};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ApplyOutcome {
26 Buffered,
30 Synced,
33 Applied,
35 Ignored,
37 ResyncRequired,
42}
43
44#[derive(Debug, Default)]
51pub struct OrderBookState {
52 inner: Inner,
53}
54
55#[derive(Debug)]
56enum Inner {
57 NoSnapshot { buffered: VecDeque<DepthUpdateMsg> },
59 Pending {
61 snapshot_last_id: i64,
62 bids: BTreeMap<Decimal, Decimal>,
63 asks: BTreeMap<Decimal, Decimal>,
64 buffered: VecDeque<DepthUpdateMsg>,
65 },
66 Synced {
68 last_update_id: i64,
69 bids: BTreeMap<Decimal, Decimal>,
70 asks: BTreeMap<Decimal, Decimal>,
71 },
72}
73
74impl Default for Inner {
75 fn default() -> Self {
76 Self::NoSnapshot {
77 buffered: VecDeque::new(),
78 }
79 }
80}
81
82impl OrderBookState {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn apply_snapshot(&mut self, snapshot: OrderBook) -> ApplyOutcome {
89 let new_id = snapshot.last_update_id;
90
91 match &self.inner {
92 Inner::Synced { last_update_id, .. } => {
93 if new_id <= *last_update_id {
94 return ApplyOutcome::Ignored;
95 }
96 let (bids, asks) = sides_from_snapshot(snapshot);
97 self.inner = Inner::Synced {
98 last_update_id: new_id,
99 bids,
100 asks,
101 };
102 return ApplyOutcome::Applied;
103 }
104 Inner::Pending {
105 snapshot_last_id, ..
106 } if new_id <= *snapshot_last_id => {
107 return ApplyOutcome::Ignored;
108 }
109 _ => {}
110 }
111
112 let buffered = match std::mem::take(&mut self.inner) {
113 Inner::NoSnapshot { buffered } | Inner::Pending { buffered, .. } => buffered,
114 Inner::Synced { .. } => unreachable!("handled above"),
115 };
116 let (bids, asks) = sides_from_snapshot(snapshot);
117 self.inner = Inner::Pending {
118 snapshot_last_id: new_id,
119 bids,
120 asks,
121 buffered,
122 };
123 self.try_bridge()
124 }
125
126 pub fn apply_diff(&mut self, diff: DepthUpdateMsg) -> ApplyOutcome {
128 match &mut self.inner {
129 Inner::NoSnapshot { buffered } => {
130 buffered.push_back(diff);
131 ApplyOutcome::Buffered
132 }
133 Inner::Pending { buffered, .. } => {
134 buffered.push_back(diff);
135 self.try_bridge()
136 }
137 Inner::Synced {
138 last_update_id,
139 bids,
140 asks,
141 } => {
142 if diff.final_update_id <= *last_update_id {
143 return ApplyOutcome::Ignored;
144 }
145 if diff.first_update_id != *last_update_id + 1 {
146 let mut buffered = VecDeque::new();
147 buffered.push_back(diff);
148 self.inner = Inner::NoSnapshot { buffered };
149 return ApplyOutcome::ResyncRequired;
150 }
151 apply_diff_to_sides(bids, asks, &diff);
152 *last_update_id = diff.final_update_id;
153 ApplyOutcome::Applied
154 }
155 }
156 }
157
158 pub fn is_synced(&self) -> bool {
159 matches!(self.inner, Inner::Synced { .. })
160 }
161
162 pub fn last_update_id(&self) -> Option<i64> {
163 match &self.inner {
164 Inner::Synced { last_update_id, .. } => Some(*last_update_id),
165 _ => None,
166 }
167 }
168
169 pub fn bids(&self) -> Option<&BTreeMap<Decimal, Decimal>> {
170 match &self.inner {
171 Inner::Synced { bids, .. } => Some(bids),
172 _ => None,
173 }
174 }
175
176 pub fn asks(&self) -> Option<&BTreeMap<Decimal, Decimal>> {
177 match &self.inner {
178 Inner::Synced { asks, .. } => Some(asks),
179 _ => None,
180 }
181 }
182
183 pub fn best_bid(&self) -> Option<(Decimal, Decimal)> {
184 self.bids()
185 .and_then(|b| b.iter().next_back().map(|(p, q)| (*p, *q)))
186 }
187
188 pub fn best_ask(&self) -> Option<(Decimal, Decimal)> {
189 self.asks()
190 .and_then(|a| a.iter().next().map(|(p, q)| (*p, *q)))
191 }
192
193 fn try_bridge(&mut self) -> ApplyOutcome {
200 if !matches!(self.inner, Inner::Pending { .. }) {
201 return ApplyOutcome::Buffered;
202 }
203
204 let Inner::Pending {
205 snapshot_last_id,
206 bids,
207 asks,
208 mut buffered,
209 } = std::mem::take(&mut self.inner)
210 else {
211 unreachable!("checked above");
212 };
213
214 while let Some(front) = buffered.front() {
215 if front.final_update_id <= snapshot_last_id {
216 buffered.pop_front();
217 } else {
218 break;
219 }
220 }
221
222 let Some(first) = buffered.front() else {
223 self.inner = Inner::Pending {
224 snapshot_last_id,
225 bids,
226 asks,
227 buffered,
228 };
229 return ApplyOutcome::Buffered;
230 };
231
232 if first.first_update_id > snapshot_last_id + 1 {
233 self.inner = Inner::NoSnapshot { buffered };
236 return ApplyOutcome::ResyncRequired;
237 }
238
239 let mut bids = bids;
240 let mut asks = asks;
241 let mut last_id = snapshot_last_id;
242 let mut prev_u: Option<i64> = None;
243 while let Some(diff) = buffered.pop_front() {
244 if let Some(p) = prev_u
245 && diff.first_update_id != p + 1
246 {
247 let mut remaining = VecDeque::with_capacity(buffered.len() + 1);
248 remaining.push_back(diff);
249 remaining.extend(buffered);
250 self.inner = Inner::NoSnapshot {
251 buffered: remaining,
252 };
253 return ApplyOutcome::ResyncRequired;
254 }
255 apply_diff_to_sides(&mut bids, &mut asks, &diff);
256 last_id = diff.final_update_id;
257 prev_u = Some(diff.final_update_id);
258 }
259
260 self.inner = Inner::Synced {
261 last_update_id: last_id,
262 bids,
263 asks,
264 };
265 ApplyOutcome::Synced
266 }
267}
268
269fn sides_from_snapshot(
270 snapshot: OrderBook,
271) -> (BTreeMap<Decimal, Decimal>, BTreeMap<Decimal, Decimal>) {
272 let bids = snapshot
273 .bids
274 .into_iter()
275 .map(|l| (l.price(), l.qty()))
276 .collect();
277 let asks = snapshot
278 .asks
279 .into_iter()
280 .map(|l| (l.price(), l.qty()))
281 .collect();
282 (bids, asks)
283}
284
285fn apply_diff_to_sides(
286 bids: &mut BTreeMap<Decimal, Decimal>,
287 asks: &mut BTreeMap<Decimal, Decimal>,
288 diff: &DepthUpdateMsg,
289) {
290 for level in &diff.bids {
291 apply_side(bids, level.price(), level.qty());
292 }
293 for level in &diff.asks {
294 apply_side(asks, level.price(), level.qty());
295 }
296}
297
298fn apply_side(side: &mut BTreeMap<Decimal, Decimal>, price: Decimal, qty: Decimal) {
299 if qty.is_zero() {
300 side.remove(&price);
301 } else {
302 side.insert(price, qty);
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::spot::http::OrderLevel;
310
311 fn snapshot(last_id: i64) -> OrderBook {
312 OrderBook {
313 last_update_id: last_id,
314 bids: vec![level("100", "1")],
315 asks: vec![level("101", "2")],
316 }
317 }
318
319 fn level(price: &str, qty: &str) -> OrderLevel {
320 let json = format!("[\"{price}\", \"{qty}\"]");
321 serde_json::from_str(&json).unwrap()
322 }
323
324 fn diff(u_first: i64, u_last: i64) -> DepthUpdateMsg {
325 DepthUpdateMsg {
326 event_time: 0,
327 symbol: "BTCUSDT".into(),
328 first_update_id: u_first,
329 final_update_id: u_last,
330 bids: vec![],
331 asks: vec![],
332 }
333 }
334
335 #[test]
336 fn snapshot_then_bridging_diff() {
337 let mut book = OrderBookState::new();
338 assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Buffered);
339 assert_eq!(book.apply_diff(diff(95, 105)), ApplyOutcome::Synced);
341 assert_eq!(book.last_update_id(), Some(105));
342 }
343
344 #[test]
345 fn diffs_then_snapshot() {
346 let mut book = OrderBookState::new();
347 assert_eq!(book.apply_diff(diff(90, 95)), ApplyOutcome::Buffered);
349 assert_eq!(book.apply_diff(diff(96, 105)), ApplyOutcome::Buffered);
351 assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Synced);
352 assert_eq!(book.last_update_id(), Some(105));
353 }
354
355 #[test]
356 fn snapshot_older_than_buffered_stream_triggers_resync() {
357 let mut book = OrderBookState::new();
358 assert_eq!(book.apply_diff(diff(200, 210)), ApplyOutcome::Buffered);
360 assert_eq!(
361 book.apply_snapshot(snapshot(100)),
362 ApplyOutcome::ResyncRequired
363 );
364 assert!(!book.is_synced());
365 assert_eq!(book.apply_snapshot(snapshot(199)), ApplyOutcome::Synced);
367 assert_eq!(book.last_update_id(), Some(210));
368 }
369
370 #[test]
371 fn live_chain_break_triggers_resync() {
372 let mut book = OrderBookState::new();
373 book.apply_snapshot(snapshot(100));
374 book.apply_diff(diff(95, 105));
375 assert!(book.is_synced());
376 assert_eq!(
378 book.apply_diff(diff(107, 115)),
379 ApplyOutcome::ResyncRequired
380 );
381 assert!(!book.is_synced());
382 }
383
384 #[test]
385 fn stale_diff_after_sync_is_ignored() {
386 let mut book = OrderBookState::new();
387 book.apply_snapshot(snapshot(100));
388 book.apply_diff(diff(95, 105));
389 assert_eq!(book.apply_diff(diff(80, 95)), ApplyOutcome::Ignored);
391 assert_eq!(book.last_update_id(), Some(105));
392 }
393
394 #[test]
395 fn duplicate_snapshot_is_ignored() {
396 let mut book = OrderBookState::new();
397 book.apply_snapshot(snapshot(100));
398 assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Ignored);
399 assert_eq!(book.apply_snapshot(snapshot(50)), ApplyOutcome::Ignored);
400 }
401
402 #[test]
403 fn newer_snapshot_replaces_synced_book() {
404 let mut book = OrderBookState::new();
405 book.apply_snapshot(snapshot(100));
406 book.apply_diff(diff(95, 105));
407 assert!(book.is_synced());
408 assert_eq!(book.apply_snapshot(snapshot(200)), ApplyOutcome::Applied);
409 assert_eq!(book.last_update_id(), Some(200));
410 }
411
412 #[test]
413 fn live_continuous_chain() {
414 let mut book = OrderBookState::new();
415 book.apply_snapshot(snapshot(100));
416 book.apply_diff(diff(95, 105));
417 assert_eq!(book.apply_diff(diff(106, 110)), ApplyOutcome::Applied);
419 assert_eq!(book.apply_diff(diff(111, 115)), ApplyOutcome::Applied);
421 assert_eq!(book.last_update_id(), Some(115));
422 }
423}