binance_stream_handler/ob_manager/
order_book.rs1use ordered_float::OrderedFloat as OF;
2use reqwest::Client;
3use serde::Deserialize;
4use std::collections::BTreeMap;
5use tracing::{debug, error, info, trace, warn};
6
7type Price = OF<f64>;
8type Qty = f64;
9
10#[allow(non_snake_case)]
11#[derive(Debug, Deserialize, PartialEq)]
12pub struct DepthUpdate {
13 pub e: String, pub E: u64, pub T: u64, pub s: String, pub U: u64, pub u: u64, pub pu: u64, pub b: Vec<[String; 2]>, pub a: Vec<[String; 2]>, pub channel_load: Option<usize>,
23}
24
25#[derive(Debug, Deserialize, PartialEq)]
26pub struct CombinedDepthUpdate {
27 pub stream: String,
29 pub data: DepthUpdate,
30}
31
32#[derive(Debug, Clone)]
33pub struct ResyncNeeded {
34 pub symbol: String,
35 pub expected_pu: Option<u64>, pub got_pu: u64, pub got_U: u64,
38 pub got_u: u64, }
40
41#[allow(non_snake_case)]
42#[derive(Debug, Deserialize)]
43pub struct DepthSnapshot {
44 #[serde(rename = "lastUpdateId")]
45 last_update_id: u64,
46 E: u64, T: u64, bids: Vec<[String; 2]>, asks: Vec<[String; 2]>,
50}
51
52#[derive(Debug)]
53pub enum UpdateDecision<'a> {
54 Drop, Apply(&'a DepthUpdate), Resync(ResyncNeeded), }
58
59#[derive(Debug, Clone)]
67pub struct OrderBook {
68 pub symbol: String,
69 pub bids: BTreeMap<Price, Qty>,
71 pub asks: BTreeMap<Price, Qty>,
72 pub last_u: Option<u64>,
73 pub snapshot_id: Option<u64>,
74 pub depth: u16,
75}
76
77impl OrderBook {
78 pub fn new(symbol: &str) -> Self {
79 Self {
80 symbol: symbol.to_ascii_uppercase(),
81 bids: BTreeMap::new(),
82 asks: BTreeMap::new(),
83 last_u: None,
84 snapshot_id: None,
85 depth: 1000,
86 }
87 }
88
89 pub async fn init_ob(symbol: &str) -> Result<OrderBook, Box<dyn std::error::Error>> {
90 let mut ob = OrderBook::new(symbol);
91 let snapshot = ob.get_depth_snapshot(ob.depth).await?;
92 ob.from_snapshot(&snapshot);
93 Ok(ob)
94 }
95
96 pub async fn resync_ob(&mut self) -> Result<(), Box<dyn std::error::Error>> {
97 let new_snapshot = self.get_depth_snapshot(self.depth).await?;
98 self.from_snapshot(&new_snapshot);
99 Ok(())
100 }
101
102 pub async fn get_depth_snapshot(
103 &self,
104 limit: u16,
105 ) -> Result<DepthSnapshot, Box<dyn std::error::Error>> {
106 let sym = self.symbol.to_ascii_uppercase();
107 let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={sym}&limit={limit}");
108
109 let client = Client::builder()
110 .user_agent("binance-stream-handler/0.1")
111 .build()?;
112
113 let resp = client.get(&url).send().await?;
114 if !resp.status().is_success() {
115 return Err(format!("Snapshot HTTP error: {}", resp.status()).into());
116 }
117
118 let snapshot: DepthSnapshot = resp.json().await?;
119
120 Ok(snapshot)
121 }
122
123 pub fn from_snapshot(&mut self, snap: &DepthSnapshot) {
125 self.bids.clear();
126 self.asks.clear();
127
128 self.snapshot_id = Some(snap.last_update_id);
129
130 for [p, q] in &snap.bids {
131 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
132 if q != 0.0 {
133 self.bids.insert(OF(p), q);
134 }
135 }
136 for [p, q] in &snap.asks {
137 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
138 if q != 0.0 {
139 self.asks.insert(OF(p), q);
140 }
141 }
142 }
143
144 pub fn apply_update(&mut self, ev: &DepthUpdate) {
146 for [p, q] in &ev.b {
148 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
149 if q == 0.0 {
150 self.bids.remove(&OF(p));
151 } else {
152 self.bids.insert(OF(p), q);
153 }
154 }
155 for [p, q] in &ev.a {
157 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
158 if q == 0.0 {
159 self.asks.remove(&OF(p));
160 } else {
161 self.asks.insert(OF(p), q);
162 }
163 }
164 self.last_u = Some(ev.u);
165 }
166
167 pub fn continuity_check<'a>(&mut self, du: &'a DepthUpdate) -> UpdateDecision<'a> {
168 let snapshot_id = match self.snapshot_id {
169 None => {
170 self.last_u = None;
171 return UpdateDecision::Resync(ResyncNeeded {
172 symbol: self.symbol.clone(),
173 expected_pu: None,
174 got_pu: du.pu,
175 got_U: du.U,
176 got_u: du.u,
177 });
178 }
179 Some(s) => s,
180 };
181
182 match self.last_u {
183 None => {
184 if du.u < snapshot_id {
185 return UpdateDecision::Drop;
186 }
187
188 if du.U <= snapshot_id && snapshot_id <= du.u {
189 self.last_u = Some(du.u);
190 return UpdateDecision::Apply(du);
191 }
192
193 debug!(
195 "Missed updates after initialization for {}, snap_id: {} U: {} u: {}",
196 du.s, snapshot_id, du.U, du.u,
197 );
198 self.last_u = None;
199 return UpdateDecision::Resync(ResyncNeeded {
200 symbol: self.symbol.clone(),
201 expected_pu: None,
202 got_pu: du.pu,
203 got_U: du.U,
204 got_u: du.u,
205 });
206 }
207
208 Some(pu) => {
209 if pu == du.pu {
210 self.last_u = Some(du.u);
211 return UpdateDecision::Apply(&du);
212 } else if pu > du.pu {
213 return UpdateDecision::Drop;
214 } else {
215 self.last_u = None;
216 return UpdateDecision::Resync(ResyncNeeded {
217 symbol: self.symbol.clone(),
218 expected_pu: Some(pu),
219 got_pu: du.pu,
220 got_U: du.U,
221 got_u: du.u,
222 });
223 }
224 }
225 }
226 }
227
228 fn parse_f64(s: &str) -> f64 {
229 s.parse::<f64>().unwrap()
231 }
232}