binance_stream_handler/
ob_manager.rs1use futures_util::future::ready;
2use std::collections::HashMap;
3use tokio::sync::{mpsc, watch};
4use tracing::info_span;
5use tracing::Instrument;
6use tracing::{debug, error, info, trace, warn};
7
8pub mod order_book;
9
10use crate::ob_manager::order_book::{CombinedDepthUpdate, DepthUpdate, OrderBook, UpdateDecision};
11
12pub fn init_order_books(
13 currency_pairs: &'static [&'static str],
14 mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
15) -> HashMap<String, watch::Receiver<OrderBook>> {
16 let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
17
18 for &pair in currency_pairs {
19 let pair_up = pair.to_ascii_uppercase();
20 let mut rx = receivers
21 .remove(&pair_up)
22 .expect("router created a channel for every symbol");
23
24 let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
25 ob_streams.insert(pair_up.clone(), rx_ob);
26
27 tokio::spawn(
28 async move {
29 let ob = match OrderBook::init_ob(pair).await {
30 Ok(ob) => {
31 debug!(
32 "{} orderbook is initiated, last Id: {:?}",
33 ob.symbol, ob.snapshot_id
34 );
35 ob
36 }
37 Err(e) => {
38 eprintln!("[{pair}] snapshot init error: {e}");
39 return;
40 }
41 };
42 let mut need_resync = false;
43 let _ = tx_ob.send_replace(ob);
44
45 while let Some(du) = rx.recv().await {
46 if need_resync {
47 let fresh_ob = match OrderBook::init_ob(pair).await {
48 Ok(ob) => ob,
49 Err(e) => {
50 eprintln!("[{pair}] snapshot init error: {e}");
51 return;
52 }
53 };
54 let _ = tx_ob.send_replace(fresh_ob);
55 need_resync = false;
56 } else {
57 let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
58 UpdateDecision::Drop => {
59 info!("Update dropped");
60 }
61 UpdateDecision::Apply(du) => {
62 trace!("Update applied");
63 book.apply_update(du);
64 }
65 UpdateDecision::Resync(info) => {
66 trace!("Resync required");
67 eprintln!(
68 "[{pair}] RESYNC: expected pu={:?}, got pu={}, u={}",
69 info.expected_pu, info.got_pu, info.got_u
70 );
71 need_resync = true;
72 }
73 });
74 }
75 }
76 }
77 .instrument(info_span!("orderbook_task", symbol = %pair)),
78 );
79 }
80 ob_streams
81}