use super::*;
pub(in crate::tui::runtime) fn resolve_levels(
cache: Option<&crate::tui::data::GtiCache>,
bids_raw: &[L2Level],
asks_raw: &[L2Level],
) -> (Vec<ClobLevel>, Vec<ClobLevel>, bool) {
let mut had_miss = false;
let mut resolve_side = |raw: &[L2Level]| -> Vec<ClobLevel> {
raw.iter()
.filter_map(|lvl| match cache.and_then(|c| c.resolve(lvl.trader_id)) {
Some(pk) => Some((lvl.price, lvl.qty, pubkey_trader_prefix(&pk))),
None => {
had_miss = true;
None
}
})
.collect()
};
let bids = resolve_side(bids_raw);
let asks = resolve_side(asks_raw);
(bids, asks, had_miss)
}
const L2_POLL_INTERVAL: Duration = Duration::from_millis(500);
pub(in crate::tui::runtime) fn spawn_phoenix_l2_book_rpc(
mut cfg_rx: watch::Receiver<SplineConfig>,
l2_tx: tokio::sync::mpsc::UnboundedSender<L2BookStreamMsg>,
gti_cache: GtiHandle,
gti_refresh: Arc<tokio::sync::Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let rpc =
RpcClient::new_with_commitment(rpc_http_url_from_env(), CommitmentConfig::processed());
let mut cfg = cfg_rx.borrow().clone();
let mut market_pk = match Pubkey::from_str(&cfg.market_pubkey) {
Ok(pk) => pk,
Err(e) => {
warn!(
market = %cfg.market_pubkey,
error = %e,
"invalid market pubkey for L2 book"
);
return;
}
};
let mut last_data_hash: u64 = 0;
let mut poll_ticker = tokio::time::interval(L2_POLL_INTERVAL);
poll_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
r = cfg_rx.changed() => {
if r.is_err() {
return;
}
let new_cfg = cfg_rx.borrow().clone();
if new_cfg.market_pubkey != cfg.market_pubkey {
match Pubkey::from_str(&new_cfg.market_pubkey) {
Ok(pk) => {
market_pk = pk;
last_data_hash = 0;
}
Err(e) => {
warn!(
market = %new_cfg.market_pubkey,
error = %e,
"invalid market pubkey for L2 book"
);
continue;
}
}
}
cfg = new_cfg;
}
_ = poll_ticker.tick() => {
let resp = match rpc
.get_account_with_commitment(&market_pk, CommitmentConfig::processed())
.await
{
Ok(r) => r,
Err(e) => {
warn!(
symbol = %cfg.symbol,
error = %e,
"L2 market getAccount failed"
);
continue;
}
};
let Some(account) = resp.value else {
continue;
};
let hash = {
use std::hash::Hasher;
let mut h = std::collections::hash_map::DefaultHasher::new();
h.write(&account.data);
h.finish()
};
if hash == last_data_hash {
continue;
}
last_data_hash = hash;
let Some((bids_raw, asks_raw)) = parse_l2_book_from_market_account(
account.data,
cfg.tick_size,
cfg.base_lot_decimals,
L2_SNAPSHOT_DEPTH,
) else {
continue;
};
let (bids, asks, had_miss) = {
let cache = gti_cache.read().await;
resolve_levels(cache.as_ref(), &bids_raw, &asks_raw)
};
if had_miss {
gti_refresh.notify_one();
}
if l2_tx
.send(L2BookStreamMsg {
symbol: cfg.symbol.clone(),
bids,
asks,
})
.is_err()
{
return;
}
}
}
}
})
}