binance_stream_handler/
ob_manager.rs

1use futures_util::future::ready;
2use std::collections::HashMap;
3use tokio::sync::{mpsc, watch};
4use tracing::info_span;
5use tracing::Instrument;
6use tracing::{debug, error, info, trace, warn};
7
8pub mod order_book;
9
10use crate::ob_manager::order_book::{CombinedDepthUpdate, DepthUpdate, OrderBook, UpdateDecision};
11
12pub fn init_order_books(
13    currency_pairs: &'static [&'static str],
14    mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
15) -> HashMap<String, watch::Receiver<OrderBook>> {
16    let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
17
18    for &pair in currency_pairs {
19        let pair_up = pair.to_ascii_uppercase();
20        let mut rx = receivers
21            .remove(&pair_up)
22            .expect("router created a channel for every symbol");
23
24        let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
25        ob_streams.insert(pair_up.clone(), rx_ob);
26
27        tokio::spawn(
28            async move {
29                let ob = match OrderBook::init_ob(pair).await {
30                    Ok(ob) => {
31                        debug!(
32                            "{} orderbook is initiated, last Id: {:?}",
33                            ob.symbol, ob.snapshot_id
34                        );
35                        ob
36                    }
37                    Err(e) => {
38                        eprintln!("[{pair}] snapshot init error: {e}");
39                        return;
40                    }
41                };
42                let mut need_resync = false;
43                let _ = tx_ob.send_replace(ob);
44
45                while let Some(du) = rx.recv().await {
46                    if need_resync {
47                        let fresh_ob = match OrderBook::init_ob(pair).await {
48                            Ok(ob) => ob,
49                            Err(e) => {
50                                eprintln!("[{pair}] snapshot init error: {e}");
51                                return;
52                            }
53                        };
54                        let _ = tx_ob.send_replace(fresh_ob);
55                        need_resync = false;
56                    } else {
57                        let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
58                            UpdateDecision::Drop => {
59                                info!("Update dropped");
60                            }
61                            UpdateDecision::Apply(du) => {
62                                trace!("Update applied");
63                                book.apply_update(du);
64                            }
65                            UpdateDecision::Resync(info) => {
66                                trace!("Resync required");
67                                eprintln!(
68                                    "[{pair}] RESYNC: expected pu={:?}, got pu={}, u={}",
69                                    info.expected_pu, info.got_pu, info.got_u
70                                );
71                                need_resync = true;
72                            }
73                        });
74                    }
75                }
76            }
77            .instrument(info_span!("orderbook_task", symbol = %pair)),
78        );
79    }
80    ob_streams
81}