binance_stream_handler/ob_manager/
order_book.rs

1use ordered_float::OrderedFloat as OF;
2use reqwest::Client;
3use serde::Deserialize;
4use std::collections::BTreeMap;
5use tracing::{debug, error, info, trace, warn};
6
7type Price = OF<f64>;
8type Qty = f64;
9
10#[allow(non_snake_case)]
11#[derive(Debug, Deserialize, PartialEq)]
12pub struct DepthUpdate {
13    pub e: String,           // Event type: "depthUpdate"
14    pub E: u64,              // Event time
15    pub T: u64,              // Transaction time
16    pub s: String,           // Symbol
17    pub U: u64,              // First update ID in event
18    pub u: u64,              // Final update ID in event
19    pub pu: u64,             // Final update Id in last stream(ie `u` in last stream)
20    pub b: Vec<[String; 2]>, // bids updates [price, qty]
21    pub a: Vec<[String; 2]>, // asks updates
22    pub channel_load: Option<usize>,
23}
24
25#[derive(Debug, Deserialize, PartialEq)]
26pub struct CombinedDepthUpdate {
27    // e.g. "adausdt@depth@100ms"
28    pub stream: String,
29    pub data: DepthUpdate,
30}
31
32#[derive(Debug, Clone)]
33pub struct ResyncNeeded {
34    pub symbol: String,
35    pub expected_pu: Option<u64>, // what we expected (prev u)
36    pub got_pu: u64,              // the pu we received
37    pub got_U: u64,
38    pub got_u: u64,               // the u we received
39}
40
41#[allow(non_snake_case)]
42#[derive(Debug, Deserialize)]
43pub struct DepthSnapshot {
44    #[serde(rename = "lastUpdateId")]
45    last_update_id: u64,
46    E: u64,                 // event time (ms)
47    T: u64,                 // transaction time (ms)
48    bids: Vec<[String; 2]>, // [price, qty]
49    asks: Vec<[String; 2]>,
50}
51
52#[derive(Debug)]
53pub enum UpdateDecision<'a> {
54    Drop,                   // ignore this event
55    Apply(&'a DepthUpdate), // apply to book
56    Resync(ResyncNeeded),   // trigger re-snapshot
57}
58
59/// A sorted Binance order book (bids descending by price, asks ascending).
60///
61/// Values are **absolute quantities** (Binance-style). `last_u` and `snapshot_id`
62/// reflect the latest applied update and the initializing REST snapshot respectively.
63///
64/// Most users don't construct `OrderBook` directly—consume it via the
65/// `watch::Receiver<OrderBook>` returned by [`generate_orderbooks`].
66#[derive(Debug, Clone)]
67pub struct OrderBook {
68    pub symbol: String,
69    // Sorted by price ascending (wrapped so it implements Ord)
70    pub bids: BTreeMap<Price, Qty>,
71    pub asks: BTreeMap<Price, Qty>,
72    pub last_u: Option<u64>,
73    pub snapshot_id: Option<u64>,
74    pub depth: u16,
75}
76
77impl OrderBook {
78    pub fn new(symbol: &str) -> Self {
79        Self {
80            symbol: symbol.to_ascii_uppercase(),
81            bids: BTreeMap::new(),
82            asks: BTreeMap::new(),
83            last_u: None,
84            snapshot_id: None,
85            depth: 1000,
86        }
87    }
88
89    pub async fn init_ob(symbol: &str) -> Result<OrderBook, Box<dyn std::error::Error>> {
90        let mut ob = OrderBook::new(symbol);
91        let snapshot = ob.get_depth_snapshot(ob.depth).await?;
92        ob.from_snapshot(&snapshot);
93        Ok(ob)
94    }
95
96    pub async fn resync_ob(&mut self) -> Result<(), Box<dyn std::error::Error>> {
97        let new_snapshot = self.get_depth_snapshot(self.depth).await?;
98        self.from_snapshot(&new_snapshot);
99        Ok(())
100    }
101
102    pub async fn get_depth_snapshot(
103        &self,
104        limit: u16,
105    ) -> Result<DepthSnapshot, Box<dyn std::error::Error>> {
106        let sym = self.symbol.to_ascii_uppercase();
107        let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={sym}&limit={limit}");
108
109        let client = Client::builder()
110            .user_agent("binance-stream-handler/0.1")
111            .build()?;
112
113        let resp = client.get(&url).send().await?;
114        if !resp.status().is_success() {
115            return Err(format!("Snapshot HTTP error: {}", resp.status()).into());
116        }
117
118        let snapshot: DepthSnapshot = resp.json().await?;
119
120        Ok(snapshot)
121    }
122
123    /// Build a sorted book directly from a REST snapshot.
124    pub fn from_snapshot(&mut self, snap: &DepthSnapshot) {
125        self.bids.clear();
126        self.asks.clear();
127
128        self.snapshot_id = Some(snap.last_update_id);
129
130        for [p, q] in &snap.bids {
131            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
132            if q != 0.0 {
133                self.bids.insert(OF(p), q);
134            }
135        }
136        for [p, q] in &snap.asks {
137            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
138            if q != 0.0 {
139                self.asks.insert(OF(p), q);
140            }
141        }
142    }
143
144    /// Apply one WS depth update (absolute quantities)
145    pub fn apply_update(&mut self, ev: &DepthUpdate) {
146        // bids
147        for [p, q] in &ev.b {
148            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
149            if q == 0.0 {
150                self.bids.remove(&OF(p));
151            } else {
152                self.bids.insert(OF(p), q);
153            }
154        }
155        // asks
156        for [p, q] in &ev.a {
157            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
158            if q == 0.0 {
159                self.asks.remove(&OF(p));
160            } else {
161                self.asks.insert(OF(p), q);
162            }
163        }
164        self.last_u = Some(ev.u);
165    }
166
167    pub fn continuity_check<'a>(&mut self, du: &'a DepthUpdate) -> UpdateDecision<'a> {
168        let snapshot_id = match self.snapshot_id {
169            None => {
170                self.last_u = None;
171                return UpdateDecision::Resync(ResyncNeeded {
172                    symbol: self.symbol.clone(),
173                    expected_pu: None,
174                    got_pu: du.pu,
175                    got_U: du.U,
176                    got_u: du.u,
177                });
178            }
179            Some(s) => s,
180        };
181
182        match self.last_u {
183            None => {
184                if du.u < snapshot_id {
185                    return UpdateDecision::Drop;
186                }
187            
188                if du.U <= snapshot_id && snapshot_id <= du.u {
189                    self.last_u = Some(du.u);
190                    return UpdateDecision::Apply(du);
191                }
192            
193                // du.U > snapshot_id => we missed the bridging update
194                debug!(
195                    "Missed updates after initialization for {}, snap_id: {} U: {} u: {}",
196                    du.s, snapshot_id, du.U, du.u,
197                );
198                self.last_u = None;
199                return UpdateDecision::Resync(ResyncNeeded {
200                    symbol: self.symbol.clone(),
201                    expected_pu: None,
202                    got_pu: du.pu,
203                    got_U: du.U,
204                    got_u: du.u,
205                });
206            }
207
208            Some(pu) => {
209                if pu == du.pu {
210                    self.last_u = Some(du.u);
211                    return UpdateDecision::Apply(&du);
212                } else if pu > du.pu {
213                    return UpdateDecision::Drop;
214                } else {
215                    self.last_u = None;
216                    return UpdateDecision::Resync(ResyncNeeded {
217                        symbol: self.symbol.clone(),
218                        expected_pu: Some(pu),
219                        got_pu: du.pu,
220                        got_U: du.U,
221                        got_u: du.u,
222                    });
223                }
224            }
225        }
226    }
227
228    fn parse_f64(s: &str) -> f64 {
229        // Binance sends clean numeric strings
230        s.parse::<f64>().unwrap()
231    }
232}