binance_stream_handler/ob_manager/
order_book.rs

1use ordered_float::OrderedFloat as OF;
2use reqwest::Client;
3use serde::Deserialize;
4use std::collections::BTreeMap;
5use tracing::{debug, error, info, trace, warn};
6
7type Price = OF<f64>;
8type Qty = f64;
9
10#[allow(non_snake_case)]
11#[derive(Debug, Deserialize, PartialEq)]
12pub struct DepthUpdate {
13    pub e: String,           // Event type: "depthUpdate"
14    pub E: u64,              // Event time
15    pub T: u64,              // Transaction time
16    pub s: String,           // Symbol
17    pub U: u64,              // First update ID in event
18    pub u: u64,              // Final update ID in event
19    pub pu: u64,             // Final update Id in last stream(ie `u` in last stream)
20    pub b: Vec<[String; 2]>, // bids updates [price, qty]
21    pub a: Vec<[String; 2]>, // asks updates
22    pub channel_load: Option<usize>,
23}
24
25#[derive(Debug, Deserialize, PartialEq)]
26pub struct CombinedDepthUpdate {
27    // e.g. "adausdt@depth@100ms"
28    pub stream: String,
29    pub data: DepthUpdate,
30}
31
32#[derive(Debug, Clone)]
33pub struct ResyncNeeded {
34    pub symbol: String,
35    pub expected_pu: Option<u64>, // what we expected (prev u)
36    pub got_pu: u64,              // the pu we received
37    pub got_u: u64,               // the u we received
38}
39
40#[allow(non_snake_case)]
41#[derive(Debug, Deserialize)]
42pub struct DepthSnapshot {
43    #[serde(rename = "lastUpdateId")]
44    last_update_id: u64,
45    E: u64,                 // event time (ms)
46    T: u64,                 // transaction time (ms)
47    bids: Vec<[String; 2]>, // [price, qty]
48    asks: Vec<[String; 2]>,
49}
50
51#[derive(Debug)]
52pub enum UpdateDecision<'a> {
53    Drop,                   // ignore this event
54    Apply(&'a DepthUpdate), // apply to book
55    Resync(ResyncNeeded),   // trigger re-snapshot
56}
57
58/// A sorted Binance order book (bids descending by price, asks ascending).
59///
60/// Values are **absolute quantities** (Binance-style). `last_u` and `snapshot_id`
61/// reflect the latest applied update and the initializing REST snapshot respectively.
62///
63/// Most users don't construct `OrderBook` directly—consume it via the
64/// `watch::Receiver<OrderBook>` returned by [`generate_orderbooks`].
65#[derive(Debug, Clone)]
66pub struct OrderBook {
67    pub symbol: String,
68    // Sorted by price ascending (wrapped so it implements Ord)
69    pub bids: BTreeMap<Price, Qty>,
70    pub asks: BTreeMap<Price, Qty>,
71    pub last_u: Option<u64>,
72    pub snapshot_id: Option<u64>,
73    pub depth: u16,
74}
75
76impl OrderBook {
77    pub fn new(symbol: &str) -> Self {
78        Self {
79            symbol: symbol.to_ascii_uppercase(),
80            bids: BTreeMap::new(),
81            asks: BTreeMap::new(),
82            last_u: None,
83            snapshot_id: None,
84            depth: 1000,
85        }
86    }
87
88    pub async fn init_ob(symbol: &str) -> Result<OrderBook, Box<dyn std::error::Error>> {
89        let mut ob = OrderBook::new(symbol);
90        let snapshot = ob.get_depth_snapshot(ob.depth).await?;
91        ob.from_snapshot(&snapshot);
92        Ok(ob)
93    }
94
95    pub async fn resync_ob(&mut self) -> Result<(), Box<dyn std::error::Error>> {
96        let new_snapshot = self.get_depth_snapshot(self.depth).await?;
97        self.from_snapshot(&new_snapshot);
98        Ok(())
99    }
100
101    pub async fn get_depth_snapshot(
102        &self,
103        limit: u16,
104    ) -> Result<DepthSnapshot, Box<dyn std::error::Error>> {
105        let sym = self.symbol.to_ascii_uppercase();
106        let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={sym}&limit={limit}");
107
108        let client = Client::builder()
109            .user_agent("binance-stream-handler/0.1")
110            .build()?;
111
112        let resp = client.get(&url).send().await?;
113        if !resp.status().is_success() {
114            return Err(format!("Snapshot HTTP error: {}", resp.status()).into());
115        }
116
117        let snapshot: DepthSnapshot = resp.json().await?;
118
119        Ok(snapshot)
120    }
121
122    /// Build a sorted book directly from a REST snapshot.
123    pub fn from_snapshot(&mut self, snap: &DepthSnapshot) {
124        self.bids.clear();
125        self.asks.clear();
126
127        self.snapshot_id = Some(snap.last_update_id);
128
129        for [p, q] in &snap.bids {
130            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
131            if q != 0.0 {
132                self.bids.insert(OF(p), q);
133            }
134        }
135        for [p, q] in &snap.asks {
136            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
137            if q != 0.0 {
138                self.asks.insert(OF(p), q);
139            }
140        }
141    }
142
143    /// Apply one WS depth update (absolute quantities)
144    pub fn apply_update(&mut self, ev: &DepthUpdate) {
145        // bids
146        for [p, q] in &ev.b {
147            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
148            if q == 0.0 {
149                self.bids.remove(&OF(p));
150            } else {
151                self.bids.insert(OF(p), q);
152            }
153        }
154        // asks
155        for [p, q] in &ev.a {
156            let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
157            if q == 0.0 {
158                self.asks.remove(&OF(p));
159            } else {
160                self.asks.insert(OF(p), q);
161            }
162        }
163        self.last_u = Some(ev.u);
164    }
165
166    pub fn continuity_check<'a>(&mut self, du: &'a DepthUpdate) -> UpdateDecision<'a> {
167        let snapshot_id = match self.snapshot_id {
168            None => {
169                self.last_u = None;
170                return UpdateDecision::Resync(ResyncNeeded {
171                    symbol: self.symbol.clone(),
172                    expected_pu: None,
173                    got_pu: du.pu,
174                    got_u: du.u,
175                });
176            }
177            Some(s) => s + 1,
178        };
179
180        match self.last_u {
181            None => {
182                if du.U <= snapshot_id && snapshot_id <= du.u {
183                    self.last_u = Some(du.u);
184                    return UpdateDecision::Apply(&du);
185                } else if snapshot_id <= du.U {
186                    debug!(
187                        "Missed updates after initialization for {}, snap_id: {:?} U: {} u: {}",
188                        du.s, snapshot_id, du.U, du.u,
189                    );
190                    self.last_u = None;
191                    return UpdateDecision::Resync(ResyncNeeded {
192                        symbol: self.symbol.clone(),
193                        expected_pu: None,
194                        got_pu: du.pu,
195                        got_u: du.u,
196                    });
197                } else {
198                    return UpdateDecision::Drop;
199                }
200            }
201
202            Some(pu) => {
203                if pu == du.pu {
204                    self.last_u = Some(du.u);
205                    return UpdateDecision::Apply(&du);
206                } else if pu > du.pu {
207                    return UpdateDecision::Drop;
208                } else {
209                    self.last_u = None;
210                    return UpdateDecision::Resync(ResyncNeeded {
211                        symbol: self.symbol.clone(),
212                        expected_pu: Some(pu),
213                        got_pu: du.pu,
214                        got_u: du.u,
215                    });
216                }
217            }
218        }
219    }
220
221    fn parse_f64(s: &str) -> f64 {
222        // Binance sends clean numeric strings
223        s.parse::<f64>().unwrap()
224    }
225}