Skip to main content

binance/spot/
order_book_state.rs

1//! Local L2 order book for Binance Spot.
2//!
3//! [`OrderBookState`] encapsulates the full synchronization protocol from the
4//! Binance Spot docs (snapshot + `<symbol>@depth` diff stream) behind a state
5//! machine that accepts a REST snapshot and live diff events in any order.
6//! Callers feed events as they arrive and inspect the returned
7//! [`ApplyOutcome`] — on [`ApplyOutcome::ResyncRequired`], fetch a new
8//! snapshot.
9//!
10//! **Spot vs. futures sync rules.** The futures stream carries a `pu` field
11//! ("previous final update id") that lets the chain be verified directly.
12//! Spot has no such field, so the chain is verified arithmetically:
13//!
14//! - Drop buffered events where `u <= lastUpdateId` (note: `<=`, not `<`).
15//! - First processed event: `U <= lastUpdateId+1 AND u >= lastUpdateId+1`.
16//! - Subsequent events must satisfy `next.U == prev.u + 1`.
17
18use rust_decimal::Decimal;
19use std::collections::{BTreeMap, VecDeque};
20
21use crate::spot::{http::OrderBook, ws::DepthUpdateMsg};
22
23/// Outcome of [`OrderBookState::apply_snapshot`] / [`OrderBookState::apply_diff`].
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ApplyOutcome {
26    /// Data was stored but the book is not yet synchronized; more input is
27    /// needed (e.g., snapshot received but no bridging diff has arrived yet,
28    /// or diffs received before the snapshot).
29    Buffered,
30    /// State has just transitioned to fully synchronized: snapshot + bridging
31    /// diffs have been applied and the local book is now live.
32    Synced,
33    /// Diff was applied to an already-synchronized live book.
34    Applied,
35    /// Data was stale relative to current state and was dropped.
36    Ignored,
37    /// State is unrecoverable from current data — the snapshot is too old to
38    /// bridge the diff stream, or the live diff chain has a gap. The caller
39    /// must fetch a fresh snapshot and feed it back via [`OrderBookState::apply_snapshot`].
40    /// Buffered diffs are preserved so the new snapshot has a chance to bridge.
41    ResyncRequired,
42}
43
44/// Local L2 order book for a Spot symbol.
45///
46/// Built from a REST depth snapshot plus the `<symbol>@depth` diff stream.
47/// Snapshots and diffs may be fed in any order; the state machine handles
48/// buffering, draining stale events, bridging, and live-chain verification
49/// internally.
50#[derive(Debug, Default)]
51pub struct OrderBookState {
52    inner: Inner,
53}
54
55#[derive(Debug)]
56enum Inner {
57    /// No snapshot yet. Diffs accumulate in the buffer.
58    NoSnapshot { buffered: VecDeque<DepthUpdateMsg> },
59    /// Snapshot received but not yet bridged. Diffs continue to accumulate.
60    Pending {
61        snapshot_last_id: i64,
62        bids: BTreeMap<Decimal, Decimal>,
63        asks: BTreeMap<Decimal, Decimal>,
64        buffered: VecDeque<DepthUpdateMsg>,
65    },
66    /// Fully synchronized live book.
67    Synced {
68        last_update_id: i64,
69        bids: BTreeMap<Decimal, Decimal>,
70        asks: BTreeMap<Decimal, Decimal>,
71    },
72}
73
74impl Default for Inner {
75    fn default() -> Self {
76        Self::NoSnapshot {
77            buffered: VecDeque::new(),
78        }
79    }
80}
81
82impl OrderBookState {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Feed a REST depth snapshot into the state machine.
88    pub fn apply_snapshot(&mut self, snapshot: OrderBook) -> ApplyOutcome {
89        let new_id = snapshot.last_update_id;
90
91        match &self.inner {
92            Inner::Synced { last_update_id, .. } => {
93                if new_id <= *last_update_id {
94                    return ApplyOutcome::Ignored;
95                }
96                let (bids, asks) = sides_from_snapshot(snapshot);
97                self.inner = Inner::Synced {
98                    last_update_id: new_id,
99                    bids,
100                    asks,
101                };
102                return ApplyOutcome::Applied;
103            }
104            Inner::Pending {
105                snapshot_last_id, ..
106            } if new_id <= *snapshot_last_id => {
107                return ApplyOutcome::Ignored;
108            }
109            _ => {}
110        }
111
112        let buffered = match std::mem::take(&mut self.inner) {
113            Inner::NoSnapshot { buffered } | Inner::Pending { buffered, .. } => buffered,
114            Inner::Synced { .. } => unreachable!("handled above"),
115        };
116        let (bids, asks) = sides_from_snapshot(snapshot);
117        self.inner = Inner::Pending {
118            snapshot_last_id: new_id,
119            bids,
120            asks,
121            buffered,
122        };
123        self.try_bridge()
124    }
125
126    /// Feed a live diff event into the state machine.
127    pub fn apply_diff(&mut self, diff: DepthUpdateMsg) -> ApplyOutcome {
128        match &mut self.inner {
129            Inner::NoSnapshot { buffered } => {
130                buffered.push_back(diff);
131                ApplyOutcome::Buffered
132            }
133            Inner::Pending { buffered, .. } => {
134                buffered.push_back(diff);
135                self.try_bridge()
136            }
137            Inner::Synced {
138                last_update_id,
139                bids,
140                asks,
141            } => {
142                if diff.final_update_id <= *last_update_id {
143                    return ApplyOutcome::Ignored;
144                }
145                if diff.first_update_id != *last_update_id + 1 {
146                    let mut buffered = VecDeque::new();
147                    buffered.push_back(diff);
148                    self.inner = Inner::NoSnapshot { buffered };
149                    return ApplyOutcome::ResyncRequired;
150                }
151                apply_diff_to_sides(bids, asks, &diff);
152                *last_update_id = diff.final_update_id;
153                ApplyOutcome::Applied
154            }
155        }
156    }
157
158    pub fn is_synced(&self) -> bool {
159        matches!(self.inner, Inner::Synced { .. })
160    }
161
162    pub fn last_update_id(&self) -> Option<i64> {
163        match &self.inner {
164            Inner::Synced { last_update_id, .. } => Some(*last_update_id),
165            _ => None,
166        }
167    }
168
169    pub fn bids(&self) -> Option<&BTreeMap<Decimal, Decimal>> {
170        match &self.inner {
171            Inner::Synced { bids, .. } => Some(bids),
172            _ => None,
173        }
174    }
175
176    pub fn asks(&self) -> Option<&BTreeMap<Decimal, Decimal>> {
177        match &self.inner {
178            Inner::Synced { asks, .. } => Some(asks),
179            _ => None,
180        }
181    }
182
183    pub fn best_bid(&self) -> Option<(Decimal, Decimal)> {
184        self.bids()
185            .and_then(|b| b.iter().next_back().map(|(p, q)| (*p, *q)))
186    }
187
188    pub fn best_ask(&self) -> Option<(Decimal, Decimal)> {
189        self.asks()
190            .and_then(|a| a.iter().next().map(|(p, q)| (*p, *q)))
191    }
192
193    /// Attempt to bridge the held snapshot with buffered diffs.
194    ///
195    /// Per Binance Spot docs:
196    /// - Drop buffered events with `u <= lastUpdateId` (stale).
197    /// - First processed event must satisfy `U <= lastUpdateId+1 AND u >= lastUpdateId+1`.
198    /// - Subsequent events must chain: `next.U == prev.u + 1`.
199    fn try_bridge(&mut self) -> ApplyOutcome {
200        if !matches!(self.inner, Inner::Pending { .. }) {
201            return ApplyOutcome::Buffered;
202        }
203
204        let Inner::Pending {
205            snapshot_last_id,
206            bids,
207            asks,
208            mut buffered,
209        } = std::mem::take(&mut self.inner)
210        else {
211            unreachable!("checked above");
212        };
213
214        while let Some(front) = buffered.front() {
215            if front.final_update_id <= snapshot_last_id {
216                buffered.pop_front();
217            } else {
218                break;
219            }
220        }
221
222        let Some(first) = buffered.front() else {
223            self.inner = Inner::Pending {
224                snapshot_last_id,
225                bids,
226                asks,
227                buffered,
228            };
229            return ApplyOutcome::Buffered;
230        };
231
232        if first.first_update_id > snapshot_last_id + 1 {
233            // Snapshot is older than the diff stream — gap. Drop the
234            // snapshot, keep buffered diffs for the next snapshot attempt.
235            self.inner = Inner::NoSnapshot { buffered };
236            return ApplyOutcome::ResyncRequired;
237        }
238
239        let mut bids = bids;
240        let mut asks = asks;
241        let mut last_id = snapshot_last_id;
242        let mut prev_u: Option<i64> = None;
243        while let Some(diff) = buffered.pop_front() {
244            if let Some(p) = prev_u
245                && diff.first_update_id != p + 1
246            {
247                let mut remaining = VecDeque::with_capacity(buffered.len() + 1);
248                remaining.push_back(diff);
249                remaining.extend(buffered);
250                self.inner = Inner::NoSnapshot {
251                    buffered: remaining,
252                };
253                return ApplyOutcome::ResyncRequired;
254            }
255            apply_diff_to_sides(&mut bids, &mut asks, &diff);
256            last_id = diff.final_update_id;
257            prev_u = Some(diff.final_update_id);
258        }
259
260        self.inner = Inner::Synced {
261            last_update_id: last_id,
262            bids,
263            asks,
264        };
265        ApplyOutcome::Synced
266    }
267}
268
269fn sides_from_snapshot(
270    snapshot: OrderBook,
271) -> (BTreeMap<Decimal, Decimal>, BTreeMap<Decimal, Decimal>) {
272    let bids = snapshot
273        .bids
274        .into_iter()
275        .map(|l| (l.price(), l.qty()))
276        .collect();
277    let asks = snapshot
278        .asks
279        .into_iter()
280        .map(|l| (l.price(), l.qty()))
281        .collect();
282    (bids, asks)
283}
284
285fn apply_diff_to_sides(
286    bids: &mut BTreeMap<Decimal, Decimal>,
287    asks: &mut BTreeMap<Decimal, Decimal>,
288    diff: &DepthUpdateMsg,
289) {
290    for level in &diff.bids {
291        apply_side(bids, level.price(), level.qty());
292    }
293    for level in &diff.asks {
294        apply_side(asks, level.price(), level.qty());
295    }
296}
297
298fn apply_side(side: &mut BTreeMap<Decimal, Decimal>, price: Decimal, qty: Decimal) {
299    if qty.is_zero() {
300        side.remove(&price);
301    } else {
302        side.insert(price, qty);
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::spot::http::OrderLevel;
310
311    fn snapshot(last_id: i64) -> OrderBook {
312        OrderBook {
313            last_update_id: last_id,
314            bids: vec![level("100", "1")],
315            asks: vec![level("101", "2")],
316        }
317    }
318
319    fn level(price: &str, qty: &str) -> OrderLevel {
320        let json = format!("[\"{price}\", \"{qty}\"]");
321        serde_json::from_str(&json).unwrap()
322    }
323
324    fn diff(u_first: i64, u_last: i64) -> DepthUpdateMsg {
325        DepthUpdateMsg {
326            event_time: 0,
327            symbol: "BTCUSDT".into(),
328            first_update_id: u_first,
329            final_update_id: u_last,
330            bids: vec![],
331            asks: vec![],
332        }
333    }
334
335    #[test]
336    fn snapshot_then_bridging_diff() {
337        let mut book = OrderBookState::new();
338        assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Buffered);
339        // Spot bridge: U <= L+1=101 AND u >= L+1=101.
340        assert_eq!(book.apply_diff(diff(95, 105)), ApplyOutcome::Synced);
341        assert_eq!(book.last_update_id(), Some(105));
342    }
343
344    #[test]
345    fn diffs_then_snapshot() {
346        let mut book = OrderBookState::new();
347        // First diff covers 90..=95, will be dropped (u=95 <= L=100).
348        assert_eq!(book.apply_diff(diff(90, 95)), ApplyOutcome::Buffered);
349        // Second diff covers 96..=105, satisfies U<=101 AND u>=101.
350        assert_eq!(book.apply_diff(diff(96, 105)), ApplyOutcome::Buffered);
351        assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Synced);
352        assert_eq!(book.last_update_id(), Some(105));
353    }
354
355    #[test]
356    fn snapshot_older_than_buffered_stream_triggers_resync() {
357        let mut book = OrderBookState::new();
358        // First buffered diff U=200 > L+1=101 → snapshot too old.
359        assert_eq!(book.apply_diff(diff(200, 210)), ApplyOutcome::Buffered);
360        assert_eq!(
361            book.apply_snapshot(snapshot(100)),
362            ApplyOutcome::ResyncRequired
363        );
364        assert!(!book.is_synced());
365        // Fresher snapshot with L=199 (so L+1=200 == diff.U) bridges.
366        assert_eq!(book.apply_snapshot(snapshot(199)), ApplyOutcome::Synced);
367        assert_eq!(book.last_update_id(), Some(210));
368    }
369
370    #[test]
371    fn live_chain_break_triggers_resync() {
372        let mut book = OrderBookState::new();
373        book.apply_snapshot(snapshot(100));
374        book.apply_diff(diff(95, 105));
375        assert!(book.is_synced());
376        // Next diff U=107 but expected = last_u + 1 = 106 → gap.
377        assert_eq!(
378            book.apply_diff(diff(107, 115)),
379            ApplyOutcome::ResyncRequired
380        );
381        assert!(!book.is_synced());
382    }
383
384    #[test]
385    fn stale_diff_after_sync_is_ignored() {
386        let mut book = OrderBookState::new();
387        book.apply_snapshot(snapshot(100));
388        book.apply_diff(diff(95, 105));
389        // Diff u=95 <= last_u=105 → stale.
390        assert_eq!(book.apply_diff(diff(80, 95)), ApplyOutcome::Ignored);
391        assert_eq!(book.last_update_id(), Some(105));
392    }
393
394    #[test]
395    fn duplicate_snapshot_is_ignored() {
396        let mut book = OrderBookState::new();
397        book.apply_snapshot(snapshot(100));
398        assert_eq!(book.apply_snapshot(snapshot(100)), ApplyOutcome::Ignored);
399        assert_eq!(book.apply_snapshot(snapshot(50)), ApplyOutcome::Ignored);
400    }
401
402    #[test]
403    fn newer_snapshot_replaces_synced_book() {
404        let mut book = OrderBookState::new();
405        book.apply_snapshot(snapshot(100));
406        book.apply_diff(diff(95, 105));
407        assert!(book.is_synced());
408        assert_eq!(book.apply_snapshot(snapshot(200)), ApplyOutcome::Applied);
409        assert_eq!(book.last_update_id(), Some(200));
410    }
411
412    #[test]
413    fn live_continuous_chain() {
414        let mut book = OrderBookState::new();
415        book.apply_snapshot(snapshot(100));
416        book.apply_diff(diff(95, 105));
417        // 106 == 105 + 1 → applied.
418        assert_eq!(book.apply_diff(diff(106, 110)), ApplyOutcome::Applied);
419        // 111 == 110 + 1 → applied.
420        assert_eq!(book.apply_diff(diff(111, 115)), ApplyOutcome::Applied);
421        assert_eq!(book.last_update_id(), Some(115));
422    }
423}