binance_stream_handler/
ob_manager.rs1use std::collections::HashMap;
2use tokio::sync::{mpsc, watch};
3use tracing::info_span;
4use tracing::Instrument;
5use tracing::{debug, error, info, trace, warn};
6
7pub mod order_book;
8
9use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};
10
11pub fn init_order_books(
12 currency_pairs: &'static [&'static str],
13 mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
14) -> HashMap<String, watch::Receiver<OrderBook>> {
15 let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
16
17 for &pair in currency_pairs {
18 let pair_up = pair.to_ascii_uppercase();
19 let mut rx = receivers
20 .remove(&pair_up)
21 .expect("router created a channel for every symbol");
22
23 let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
24 ob_streams.insert(pair_up.clone(), rx_ob);
25
26 tokio::spawn(
27 async move {
28 let ob = match OrderBook::init_ob(pair).await {
29 Ok(ob) => {
30 debug!(
31 "{} orderbook is initiated, last Id: {:?}",
32 ob.symbol, ob.snapshot_id
33 );
34 ob
35 }
36 Err(e) => {
37 error!(symbole=%pair, error=%e, "Snapshot init failed; stopping orderbook task");
38 return;
39 }
40 };
41 let mut need_resync = false;
42 let _ = tx_ob.send_replace(ob);
43
44 while let Some(du) = rx.recv().await {
45 if need_resync {
46 let fresh_ob = match OrderBook::init_ob(pair).await {
47 Ok(ob) => ob,
48 Err(e) => {
49 error!(symbole=%pair, error=%e, "Snapshot re-init failed during resync; stopping orderbook task");
50 return;
51 }
52 };
53 let _ = tx_ob.send_replace(fresh_ob);
54 need_resync = false;
55 } else {
56 let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
57 UpdateDecision::Drop => {
58 info!(
59 symbol=%pair,
60 U=du.U, u=du.u, pu=du.pu,
61 snap_id=?book.snapshot_id.map(|x| x+1),
62 last_u=?book.last_u,
63 "Update dropped"
64 );
65 }
66 UpdateDecision::Apply(du) => {
67 trace!("Update applied");
68 book.apply_update(du);
69 }
70 UpdateDecision::Resync(info) => {
71
72 warn!(
73 expected_pu=?info.expected_pu,
74 got_pu=info.got_pu,
75 got_U=info.got_U,
76 got_u=info.got_u,
77 "Resync required"
78 );
79
80 need_resync = true;
81 }
82 });
83 }
84 }
85 }
86 .instrument(info_span!("orderbook_task", symbol = %pair)),
87 );
88 }
89 ob_streams
90}