mod fetched;
pub use fetched::Fetched;
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{MempoolEntryInfo, Timestamp, Txid, VSize};
use parking_lot::RwLock;
use rustc_hash::FxHashSet;
use tracing::warn;
use crate::State;
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
pub struct Fetcher;
impl Fetcher {
pub fn fetch(client: &Client, lock: &RwLock<State>) -> Result<Fetched> {
let (mut state, block_template) = client.fetch_mempool_state()?;
let (new_txids, gbt_synth_set) = {
let mempool = lock.read();
let mut gbt_txids: FxHashSet<Txid> =
FxHashSet::with_capacity_and_hasher(block_template.len(), Default::default());
let mut gbt_synth_set: FxHashSet<Txid> = FxHashSet::default();
for g in &block_template {
gbt_txids.insert(g.txid);
if !mempool.txs.contains(&g.txid) {
gbt_synth_set.insert(g.txid);
}
}
let new_txids: Vec<Txid> = state
.live_txids
.iter()
.filter(|t| !mempool.txs.contains(t) && !gbt_txids.contains(t))
.take(MAX_TX_FETCHES_PER_CYCLE)
.copied()
.collect();
if new_txids.len() == MAX_TX_FETCHES_PER_CYCLE {
warn!(
cap = MAX_TX_FETCHES_PER_CYCLE,
"Fetcher: new-tx batch hit the per-cycle cap; remainder defers to the next cycle"
);
}
(new_txids, gbt_synth_set)
};
let (mut new_entries, mut new_txs) = client.fetch_new_pool_data(&new_txids)?;
new_entries.reserve(gbt_synth_set.len());
new_txs.reserve(gbt_synth_set.len());
let now = Timestamp::now();
let block_template_txids: Vec<Txid> = block_template
.into_iter()
.map(|g| {
let txid = g.txid;
if gbt_synth_set.contains(&txid) {
new_entries.push(MempoolEntryInfo {
txid,
vsize: VSize::from(g.weight),
weight: g.weight,
fee: g.fee,
first_seen: now,
depends: g.depends,
});
new_txs.insert(txid, g.tx);
}
txid
})
.collect();
state.live_txids.extend(block_template_txids.iter().copied());
Ok(Fetched {
state,
new_entries,
new_txs,
block_template_txids,
})
}
}