binance_stream_handler/ob_manager/
order_book.rs1use ordered_float::OrderedFloat as OF;
2use reqwest::Client;
3use serde::Deserialize;
4use std::collections::BTreeMap;
5use tracing::{debug, error, info, trace, warn};
6
7type Price = OF<f64>;
8type Qty = f64;
9
10#[allow(non_snake_case)]
11#[derive(Debug, Deserialize, PartialEq)]
12pub struct DepthUpdate {
13 pub e: String, pub E: u64, pub T: u64, pub s: String, pub U: u64, pub u: u64, pub pu: u64, pub b: Vec<[String; 2]>, pub a: Vec<[String; 2]>, pub channel_load: Option<usize>,
23}
24
25#[derive(Debug, Deserialize, PartialEq)]
26pub struct CombinedDepthUpdate {
27 pub stream: String,
29 pub data: DepthUpdate,
30}
31
32#[derive(Debug, Clone)]
33pub struct ResyncNeeded {
34 pub symbol: String,
35 pub expected_pu: Option<u64>, pub got_pu: u64, pub got_u: u64, }
39
40#[allow(non_snake_case)]
41#[derive(Debug, Deserialize)]
42pub struct DepthSnapshot {
43 #[serde(rename = "lastUpdateId")]
44 last_update_id: u64,
45 E: u64, T: u64, bids: Vec<[String; 2]>, asks: Vec<[String; 2]>,
49}
50
51#[derive(Debug)]
52pub enum UpdateDecision<'a> {
53 Drop, Apply(&'a DepthUpdate), Resync(ResyncNeeded), }
57
58#[derive(Debug, Clone)]
66pub struct OrderBook {
67 pub symbol: String,
68 pub bids: BTreeMap<Price, Qty>,
70 pub asks: BTreeMap<Price, Qty>,
71 pub last_u: Option<u64>,
72 pub snapshot_id: Option<u64>,
73 pub depth: u16,
74}
75
76impl OrderBook {
77 pub fn new(symbol: &str) -> Self {
78 Self {
79 symbol: symbol.to_ascii_uppercase(),
80 bids: BTreeMap::new(),
81 asks: BTreeMap::new(),
82 last_u: None,
83 snapshot_id: None,
84 depth: 1000,
85 }
86 }
87
88 pub async fn init_ob(symbol: &str) -> Result<OrderBook, Box<dyn std::error::Error>> {
89 let mut ob = OrderBook::new(symbol);
90 let snapshot = ob.get_depth_snapshot(ob.depth).await?;
91 ob.from_snapshot(&snapshot);
92 Ok(ob)
93 }
94
95 pub async fn resync_ob(&mut self) -> Result<(), Box<dyn std::error::Error>> {
96 let new_snapshot = self.get_depth_snapshot(self.depth).await?;
97 self.from_snapshot(&new_snapshot);
98 Ok(())
99 }
100
101 pub async fn get_depth_snapshot(
102 &self,
103 limit: u16,
104 ) -> Result<DepthSnapshot, Box<dyn std::error::Error>> {
105 let sym = self.symbol.to_ascii_uppercase();
106 let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={sym}&limit={limit}");
107
108 let client = Client::builder()
109 .user_agent("binance-stream-handler/0.1")
110 .build()?;
111
112 let resp = client.get(&url).send().await?;
113 if !resp.status().is_success() {
114 return Err(format!("Snapshot HTTP error: {}", resp.status()).into());
115 }
116
117 let snapshot: DepthSnapshot = resp.json().await?;
118
119 Ok(snapshot)
120 }
121
122 pub fn from_snapshot(&mut self, snap: &DepthSnapshot) {
124 self.bids.clear();
125 self.asks.clear();
126
127 self.snapshot_id = Some(snap.last_update_id);
128
129 for [p, q] in &snap.bids {
130 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
131 if q != 0.0 {
132 self.bids.insert(OF(p), q);
133 }
134 }
135 for [p, q] in &snap.asks {
136 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
137 if q != 0.0 {
138 self.asks.insert(OF(p), q);
139 }
140 }
141 }
142
143 pub fn apply_update(&mut self, ev: &DepthUpdate) {
145 for [p, q] in &ev.b {
147 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
148 if q == 0.0 {
149 self.bids.remove(&OF(p));
150 } else {
151 self.bids.insert(OF(p), q);
152 }
153 }
154 for [p, q] in &ev.a {
156 let (p, q) = (Self::parse_f64(p), Self::parse_f64(q));
157 if q == 0.0 {
158 self.asks.remove(&OF(p));
159 } else {
160 self.asks.insert(OF(p), q);
161 }
162 }
163 self.last_u = Some(ev.u);
164 }
165
166 pub fn continuity_check<'a>(&mut self, du: &'a DepthUpdate) -> UpdateDecision<'a> {
167 let snapshot_id = match self.snapshot_id {
168 None => {
169 self.last_u = None;
170 return UpdateDecision::Resync(ResyncNeeded {
171 symbol: self.symbol.clone(),
172 expected_pu: None,
173 got_pu: du.pu,
174 got_u: du.u,
175 });
176 }
177 Some(s) => s + 1,
178 };
179
180 match self.last_u {
181 None => {
182 if du.U <= snapshot_id && snapshot_id <= du.u {
183 self.last_u = Some(du.u);
184 return UpdateDecision::Apply(&du);
185 } else if snapshot_id <= du.U {
186 debug!(
187 "Missed updates after initialization for {}, snap_id: {:?} U: {} u: {}",
188 du.s, snapshot_id, du.U, du.u,
189 );
190 self.last_u = None;
191 return UpdateDecision::Resync(ResyncNeeded {
192 symbol: self.symbol.clone(),
193 expected_pu: None,
194 got_pu: du.pu,
195 got_u: du.u,
196 });
197 } else {
198 return UpdateDecision::Drop;
199 }
200 }
201
202 Some(pu) => {
203 if pu == du.pu {
204 self.last_u = Some(du.u);
205 return UpdateDecision::Apply(&du);
206 } else if pu > du.pu {
207 return UpdateDecision::Drop;
208 } else {
209 self.last_u = None;
210 return UpdateDecision::Resync(ResyncNeeded {
211 symbol: self.symbol.clone(),
212 expected_pu: Some(pu),
213 got_pu: du.pu,
214 got_u: du.u,
215 });
216 }
217 }
218 }
219 }
220
221 fn parse_f64(s: &str) -> f64 {
222 s.parse::<f64>().unwrap()
224 }
225}