use std::collections::HashMap;
use tokio::sync::{mpsc, watch};
use tracing::info_span;
use tracing::Instrument;
use tracing::{debug, error, info, trace, warn};
pub mod order_book;
use crate::ob_manager::order_book::{DepthUpdate, OrderBook, UpdateDecision};
pub fn init_order_books(
currency_pairs: &'static [&'static str],
mut receivers: HashMap<String, mpsc::Receiver<DepthUpdate>>,
) -> HashMap<String, watch::Receiver<OrderBook>> {
let mut ob_streams: HashMap<String, watch::Receiver<OrderBook>> = HashMap::new();
for &pair in currency_pairs {
let pair_up = pair.to_ascii_uppercase();
let mut rx = receivers
.remove(&pair_up)
.expect("router created a channel for every symbol");
let (tx_ob, rx_ob) = watch::channel(OrderBook::new(pair));
ob_streams.insert(pair_up.clone(), rx_ob);
tokio::spawn(
async move {
let ob = match OrderBook::init_ob(pair).await {
Ok(ob) => {
debug!(
"{} orderbook is initiated, last Id: {:?}",
ob.symbol, ob.snapshot_id
);
ob
}
Err(e) => {
error!(symbole=%pair, error=%e, "Snapshot init failed; stopping orderbook task");
return;
}
};
let mut need_resync = false;
let _ = tx_ob.send_replace(ob);
while let Some(du) = rx.recv().await {
if need_resync {
let fresh_ob = match OrderBook::init_ob(pair).await {
Ok(ob) => ob,
Err(e) => {
error!(symbole=%pair, error=%e, "Snapshot re-init failed during resync; stopping orderbook task");
return;
}
};
let _ = tx_ob.send_replace(fresh_ob);
need_resync = false;
} else {
let _ = tx_ob.send_modify(|book| match book.continuity_check(&du) {
UpdateDecision::Drop => {
info!(
symbol=%pair,
U=du.U, u=du.u, pu=du.pu,
snap_id=?book.snapshot_id.map(|x| x+1),
last_u=?book.last_u,
"Update dropped"
);
}
UpdateDecision::Apply(du) => {
trace!("Update applied");
book.apply_update(du);
}
UpdateDecision::Resync(info) => {
warn!(
expected_pu=?info.expected_pu,
got_pu=info.got_pu,
got_U=info.got_U,
got_u=info.got_u,
"Resync required"
);
need_resync = true;
}
});
}
}
}
.instrument(info_span!("orderbook_task", symbol = %pair)),
);
}
ob_streams
}