use std::{
hash::{DefaultHasher, Hash, Hasher},
mem,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use bitcoin::hex::DisplayHex;
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{
AddrBytes, BlockHash, MempoolEntryInfo, MempoolInfo, Timestamp, Transaction, TxIn, TxOut,
TxStatus, Txid, TxidPrefix, VSize, Vout,
};
use derive_more::Deref;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::FxHashMap;
use tracing::error;
use crate::{
addrs::AddrTracker,
block_builder::build_projected_blocks,
entry::Entry,
entry_pool::EntryPool,
projected_blocks::{BlockStats, RecommendedFees, Snapshot},
tx_store::TxStore,
types::TxWithHex,
};
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
const MIN_REBUILD_INTERVAL_MS: u64 = 1000;
#[derive(Clone, Deref)]
pub struct Mempool(Arc<MempoolInner>);
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(MempoolInner::new(client.clone())))
}
}
pub struct MempoolInner {
client: Client,
info: RwLock<MempoolInfo>,
txs: RwLock<TxStore>,
addrs: RwLock<AddrTracker>,
entries: RwLock<EntryPool>,
snapshot: RwLock<Snapshot>,
dirty: AtomicBool,
last_rebuild_ms: AtomicU64,
}
impl MempoolInner {
pub fn new(client: Client) -> Self {
Self {
client,
info: RwLock::new(MempoolInfo::default()),
txs: RwLock::new(TxStore::default()),
addrs: RwLock::new(AddrTracker::default()),
entries: RwLock::new(EntryPool::default()),
snapshot: RwLock::new(Snapshot::default()),
dirty: AtomicBool::new(false),
last_rebuild_ms: AtomicU64::new(0),
}
}
pub fn get_info(&self) -> MempoolInfo {
self.info.read().clone()
}
pub fn get_fees(&self) -> RecommendedFees {
self.snapshot.read().fees.clone()
}
pub fn get_snapshot(&self) -> Snapshot {
self.snapshot.read().clone()
}
pub fn get_block_stats(&self) -> Vec<BlockStats> {
self.snapshot.read().block_stats.clone()
}
pub fn next_block_hash(&self) -> u64 {
self.snapshot.read().next_block_hash()
}
pub fn addr_hash(&self, addr: &AddrBytes) -> u64 {
let addrs = self.addrs.read();
let Some((stats, _)) = addrs.get(addr) else {
return 0;
};
let mut hasher = DefaultHasher::new();
stats.hash(&mut hasher);
hasher.finish()
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, TxStore> {
self.txs.read()
}
pub fn get_entries(&self) -> RwLockReadGuard<'_, EntryPool> {
self.entries.read()
}
pub fn get_addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
self.addrs.read()
}
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
thread::sleep(Duration::from_secs(1));
}
}
pub fn update(&self) -> Result<()> {
let entries_info = self.client.get_raw_mempool_verbose()?;
let new_txs = self.fetch_new_txs(&entries_info);
let has_changes = self.apply_changes(&entries_info, new_txs);
if has_changes {
self.dirty.store(true, Ordering::Release);
}
self.rebuild_if_needed();
Ok(())
}
fn fetch_new_txs(&self, entries_info: &[MempoolEntryInfo]) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
entries_info
.iter()
.filter(|e| !txs.contains(&e.txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.filter_map(|entry| {
self.build_transaction(entry, &txs)
.ok()
.map(|tx| (entry.txid.clone(), tx))
})
.collect()
}
fn build_transaction(
&self,
entry: &MempoolEntryInfo,
mempool_txs: &TxStore,
) -> Result<TxWithHex> {
let (mut btc_tx, hex) = self.client.get_mempool_raw_tx(&entry.txid)?;
let total_size = hex.len() / 2;
let total_sigop_cost = btc_tx.total_sigop_cost(|_| None);
let mut parent_cache: FxHashMap<Txid, Vec<bitcoin::TxOut>> = FxHashMap::default();
for txin in &btc_tx.input {
let prev_txid: Txid = txin.previous_output.txid.into();
if !mempool_txs.contains_key(&prev_txid)
&& !parent_cache.contains_key(&prev_txid)
&& let Ok(prev) = self
.client
.get_raw_transaction(&prev_txid, None as Option<&BlockHash>)
{
parent_cache.insert(prev_txid, prev.output);
}
}
let input = mem::take(&mut btc_tx.input)
.into_iter()
.map(|txin| {
let prev_txid: Txid = txin.previous_output.txid.into();
let prev_vout = usize::from(Vout::from(txin.previous_output.vout));
let prevout = if let Some(prev) = mempool_txs.get(&prev_txid) {
prev.tx()
.output
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value)))
} else if let Some(outputs) = parent_cache.get(&prev_txid) {
outputs
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into())))
} else {
None
};
TxIn {
is_coinbase: prevout.is_none(),
prevout,
txid: prev_txid,
vout: txin.previous_output.vout.into(),
script_sig: txin.script_sig,
script_sig_asm: (),
witness: txin
.witness
.iter()
.map(|w| w.to_lower_hex_string())
.collect(),
sequence: txin.sequence.into(),
inner_redeem_script_asm: (),
inner_witness_script_asm: (),
}
})
.collect();
let tx = Transaction {
index: None,
txid: entry.txid.clone(),
version: btc_tx.version.into(),
total_sigop_cost,
weight: entry.weight.into(),
lock_time: btc_tx.lock_time.into(),
total_size,
fee: entry.fee,
input,
output: btc_tx.output.into_iter().map(TxOut::from).collect(),
status: TxStatus::UNCONFIRMED,
};
Ok(TxWithHex::new(tx, hex))
}
fn apply_changes(
&self,
entries_info: &[MempoolEntryInfo],
new_txs: FxHashMap<Txid, TxWithHex>,
) -> bool {
let entries_by_prefix: FxHashMap<TxidPrefix, &MempoolEntryInfo> = entries_info
.iter()
.map(|e| (TxidPrefix::from(&e.txid), e))
.collect();
let mut info = self.info.write();
let mut txs = self.txs.write();
let mut addrs = self.addrs.write();
let mut entries = self.entries.write();
let mut had_removals = false;
let had_additions = !new_txs.is_empty();
txs.retain_or_remove(
|txid| entries_by_prefix.contains_key(&TxidPrefix::from(txid)),
|txid, tx_with_hex| {
had_removals = true;
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
let fee = entries.get(&prefix).map(|e| e.fee).unwrap_or_default();
info.remove(tx, fee);
addrs.remove_tx(tx, txid);
entries.remove(&prefix);
},
);
for (txid, tx_with_hex) in &new_txs {
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
let Some(entry_info) = entries_by_prefix.get(&prefix) else {
continue;
};
info.add(tx, entry_info.fee);
addrs.add_tx(tx, txid);
entries.insert(
prefix,
Entry {
txid: entry_info.txid.clone(),
fee: entry_info.fee,
vsize: VSize::from(entry_info.vsize),
size: tx.total_size as u64,
ancestor_fee: entry_info.ancestor_fee,
ancestor_vsize: VSize::from(entry_info.ancestor_size),
depends: entry_info.depends.iter().map(TxidPrefix::from).collect(),
first_seen: Timestamp::now(),
},
);
}
txs.extend(new_txs);
had_removals || had_additions
}
fn rebuild_if_needed(&self) {
if !self.dirty.load(Ordering::Acquire) {
return;
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = self.last_rebuild_ms.load(Ordering::Acquire);
if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS {
return;
}
if self
.last_rebuild_ms
.compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
self.dirty.store(false, Ordering::Release);
self.rebuild_projected_blocks();
}
fn rebuild_projected_blocks(&self) {
let entries = self.entries.read();
let entries_slice = entries.entries();
let blocks = build_projected_blocks(entries_slice);
let snapshot = Snapshot::build(blocks, entries_slice);
*self.snapshot.write() = snapshot;
}
}