binance_stream_handler/
ob_manager.rs1use std::collections::HashMap;
2use tokio::sync::{mpsc, watch};
3use tracing::info_span;
4use tracing::Instrument;
5use tracing::{debug, error, info, trace, warn};
6
7pub mod order_book;
8
9use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};
10
11pub fn init_order_books(
12 currency_pairs: &'static [&'static str],
13 mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
14) -> HashMap<String, watch::Receiver<OrderBook>> {
15 let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
16
17 for &pair in currency_pairs {
18 let pair_up = pair.to_ascii_uppercase();
19 let mut rx = receivers
20 .remove(&pair_up)
21 .expect("router created a channel for every symbol");
22
23 let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
24 ob_streams.insert(pair_up.clone(), rx_ob);
25
26 tokio::spawn(
27 async move {
28 let ob = match OrderBook::init_ob(pair).await {
29 Ok(ob) => {
30 debug!(
31 "{} orderbook is initiated, last Id: {:?}",
32 ob.symbol, ob.snapshot_id
33 );
34 ob
35 }
36 Err(e) => {
37 eprintln!("[{pair}] snapshot init error: {e}");
38 return;
39 }
40 };
41 let mut need_resync = false;
42 let _ = tx_ob.send_replace(ob);
43
44 while let Some(du) = rx.recv().await {
45 if need_resync {
46 let fresh_ob = match OrderBook::init_ob(pair).await {
47 Ok(ob) => ob,
48 Err(e) => {
49 eprintln!("[{pair}] snapshot init error: {e}");
50 return;
51 }
52 };
53 let _ = tx_ob.send_replace(fresh_ob);
54 need_resync = false;
55 } else {
56 let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
57 UpdateDecision::Drop => {
58 info!("Update dropped");
59 }
60 UpdateDecision::Apply(du) => {
61 trace!("Update applied");
62 book.apply_update(du);
63 }
64 UpdateDecision::Resync(info) => {
65 trace!("Resync required");
66 eprintln!(
67 "[{pair}] RESYNC: expected pu={:?}, got pu={}, u={}",
68 info.expected_pu, info.got_pu, info.got_u
69 );
70 need_resync = true;
71 }
72 });
73 }
74 }
75 }
76 .instrument(info_span!("orderbook_task", symbol = %pair)),
77 );
78 }
79 ob_streams
80}