binance-stream-handler 0.1.3

Async WebSocket/HTTP app for Binance Orderbooks streams.
Documentation
use std::collections::HashMap;
use tokio::sync::{mpsc, watch};
use tracing::info_span;
use tracing::Instrument;
use tracing::{debug, error, info, trace, warn};

pub mod order_book;

use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};

pub fn init_order_books(
    currency_pairs: &'static [&'static str],
    mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
) -> HashMap<String, watch::Receiver<OrderBook>> {
    let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();

    for &pair in currency_pairs {
        let pair_up = pair.to_ascii_uppercase();
        let mut rx = receivers
            .remove(&pair_up)
            .expect("router created a channel for every symbol");

        let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
        ob_streams.insert(pair_up.clone(), rx_ob);

        tokio::spawn(
            async move {
                let ob = match OrderBook::init_ob(pair).await {
                    Ok(ob) => {
                        debug!(
                            "{} orderbook is initiated, last Id: {:?}",
                            ob.symbol, ob.snapshot_id
                        );
                        ob
                    }
                    Err(e) => {
                        error!(symbole=%pair, error=%e, "Snapshot init failed; stopping orderbook task");
                        return;
                    }
                };
                let mut need_resync = false;
                let _ = tx_ob.send_replace(ob);

                while let Some(du) = rx.recv().await {
                    if need_resync {
                        let fresh_ob = match OrderBook::init_ob(pair).await {
                            Ok(ob) => ob,
                            Err(e) => {
                                error!(symbole=%pair, error=%e, "Snapshot re-init failed during resync; stopping orderbook task");
                                return;
                            }
                        };
                        let _ = tx_ob.send_replace(fresh_ob);
                        need_resync = false;
                    } else {
                        let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
                            UpdateDecision::Drop => {
                                info!(
                                    symbol=%pair,
                                    U=du.U, u=du.u, pu=du.pu,
                                    snap_id=?book.snapshot_id.map(|x| x+1),
                                    last_u=?book.last_u,
                                    "Update dropped"
                                );
                            }
                            UpdateDecision::Apply(du) => {
                                trace!("Update applied");
                                book.apply_update(du);
                            }
                            UpdateDecision::Resync(info) => {
                                
                                warn!(
                                    expected_pu=?info.expected_pu,
                                    got_pu=info.got_pu,
                                    got_U=info.got_U,
                                    got_u=info.got_u,
                                    "Resync required"
                                );

                                need_resync = true;
                            }
                        });
                    }
                }
            }
            .instrument(info_span!("orderbook_task", symbol = %pair)),
        );
    }
    ob_streams
}