binance_stream_handler/
ob_manager.rs

1use std::collections::HashMap;
2use tokio::sync::{mpsc, watch};
3use tracing::info_span;
4use tracing::Instrument;
5use tracing::{debug, error, info, trace, warn};
6
7pub mod order_book;
8
9use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};
10
11pub fn init_order_books(
12    currency_pairs: &'static [&'static str],
13    mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
14) -> HashMap<String, watch::Receiver<OrderBook>> {
15    let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
16
17    for &pair in currency_pairs {
18        let pair_up = pair.to_ascii_uppercase();
19        let mut rx = receivers
20            .remove(&pair_up)
21            .expect("router created a channel for every symbol");
22
23        let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
24        ob_streams.insert(pair_up.clone(), rx_ob);
25
26        tokio::spawn(
27            async move {
28                let ob = match OrderBook::init_ob(pair).await {
29                    Ok(ob) => {
30                        debug!(
31                            "{} orderbook is initiated, last Id: {:?}",
32                            ob.symbol, ob.snapshot_id
33                        );
34                        ob
35                    }
36                    Err(e) => {
37                        eprintln!("[{pair}] snapshot init error: {e}");
38                        return;
39                    }
40                };
41                let mut need_resync = false;
42                let _ = tx_ob.send_replace(ob);
43
44                while let Some(du) = rx.recv().await {
45                    if need_resync {
46                        let fresh_ob = match OrderBook::init_ob(pair).await {
47                            Ok(ob) => ob,
48                            Err(e) => {
49                                eprintln!("[{pair}] snapshot init error: {e}");
50                                return;
51                            }
52                        };
53                        let _ = tx_ob.send_replace(fresh_ob);
54                        need_resync = false;
55                    } else {
56                        let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
57                            UpdateDecision::Drop => {
58                                info!("Update dropped");
59                            }
60                            UpdateDecision::Apply(du) => {
61                                trace!("Update applied");
62                                book.apply_update(du);
63                            }
64                            UpdateDecision::Resync(info) => {
65                                trace!("Resync required");
66                                eprintln!(
67                                    "[{pair}] RESYNC: expected pu={:?}, got pu={}, u={}",
68                                    info.expected_pu, info.got_pu, info.got_u
69                                );
70                                need_resync = true;
71                            }
72                        });
73                    }
74                }
75            }
76            .instrument(info_span!("orderbook_task", symbol = %pair)),
77        );
78    }
79    ob_streams
80}