binance_stream_handler/
ob_manager.rs

1use std::collections::HashMap;
2use tokio::sync::{mpsc, watch};
3use tracing::info_span;
4use tracing::Instrument;
5use tracing::{debug, error, info, trace, warn};
6
7pub mod order_book;
8
9use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};
10
11pub fn init_order_books(
12    currency_pairs: &'static [&'static str],
13    mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
14) -> HashMap<String, watch::Receiver<OrderBook>> {
15    let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
16
17    for &pair in currency_pairs {
18        let pair_up = pair.to_ascii_uppercase();
19        let mut rx = receivers
20            .remove(&pair_up)
21            .expect("router created a channel for every symbol");
22
23        let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
24        ob_streams.insert(pair_up.clone(), rx_ob);
25
26        tokio::spawn(
27            async move {
28                let ob = match OrderBook::init_ob(pair).await {
29                    Ok(ob) => {
30                        debug!(
31                            "{} orderbook is initiated, last Id: {:?}",
32                            ob.symbol, ob.snapshot_id
33                        );
34                        ob
35                    }
36                    Err(e) => {
37                        error!(symbole=%pair, error=%e, "Snapshot init failed; stopping orderbook task");
38                        return;
39                    }
40                };
41                let mut need_resync = false;
42                let _ = tx_ob.send_replace(ob);
43
44                while let Some(du) = rx.recv().await {
45                    if need_resync {
46                        let fresh_ob = match OrderBook::init_ob(pair).await {
47                            Ok(ob) => ob,
48                            Err(e) => {
49                                error!(symbole=%pair, error=%e, "Snapshot re-init failed during resync; stopping orderbook task");
50                                return;
51                            }
52                        };
53                        let _ = tx_ob.send_replace(fresh_ob);
54                        need_resync = false;
55                    } else {
56                        let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
57                            UpdateDecision::Drop => {
58                                info!(
59                                    symbol=%pair,
60                                    U=du.U, u=du.u, pu=du.pu,
61                                    snap_id=?book.snapshot_id.map(|x| x+1),
62                                    last_u=?book.last_u,
63                                    "Update dropped"
64                                );
65                            }
66                            UpdateDecision::Apply(du) => {
67                                trace!("Update applied");
68                                book.apply_update(du);
69                            }
70                            UpdateDecision::Resync(info) => {
71                                
72                                warn!(
73                                    expected_pu=?info.expected_pu,
74                                    got_pu=info.got_pu,
75                                    got_U=info.got_U,
76                                    got_u=info.got_u,
77                                    "Resync required"
78                                );
79
80                                need_resync = true;
81                            }
82                        });
83                    }
84                }
85            }
86            .instrument(info_span!("orderbook_task", symbol = %pair)),
87        );
88    }
89    ob_streams
90}