brk_mempool 0.3.0-beta.4

Bitcoin mempool monitor with fee estimation
Documentation
use std::{
    hash::{DefaultHasher, Hash, Hasher},
    mem,
    sync::{
        Arc,
        atomic::{AtomicBool, AtomicU64, Ordering},
    },
    thread,
    time::{Duration, SystemTime, UNIX_EPOCH},
};

use bitcoin::hex::DisplayHex;
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{
    AddrBytes, BlockHash, MempoolEntryInfo, MempoolInfo, Timestamp, Transaction, TxIn, TxOut,
    TxStatus, Txid, TxidPrefix, VSize, Vout,
};
use derive_more::Deref;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::FxHashMap;
use tracing::error;

use crate::{
    addrs::AddrTracker,
    block_builder::build_projected_blocks,
    entry::Entry,
    entry_pool::EntryPool,
    projected_blocks::{BlockStats, RecommendedFees, Snapshot},
    tx_store::TxStore,
    types::TxWithHex,
};

/// Max new txs to fetch full data for per update cycle (for address tracking).
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;

/// Minimum interval between rebuilds (milliseconds).
const MIN_REBUILD_INTERVAL_MS: u64 = 1000;

/// Mempool monitor.
///
/// Thread-safe wrapper around `MempoolInner`. Free to clone.
#[derive(Clone, Deref)]
pub struct Mempool(Arc<MempoolInner>);

impl Mempool {
    pub fn new(client: &Client) -> Self {
        Self(Arc::new(MempoolInner::new(client.clone())))
    }
}

/// Inner mempool state and logic.
pub struct MempoolInner {
    client: Client,

    info: RwLock<MempoolInfo>,
    txs: RwLock<TxStore>,
    addrs: RwLock<AddrTracker>,
    entries: RwLock<EntryPool>,

    snapshot: RwLock<Snapshot>,

    dirty: AtomicBool,
    last_rebuild_ms: AtomicU64,
}

impl MempoolInner {
    pub fn new(client: Client) -> Self {
        Self {
            client,
            info: RwLock::new(MempoolInfo::default()),
            txs: RwLock::new(TxStore::default()),
            addrs: RwLock::new(AddrTracker::default()),
            entries: RwLock::new(EntryPool::default()),
            snapshot: RwLock::new(Snapshot::default()),
            dirty: AtomicBool::new(false),
            last_rebuild_ms: AtomicU64::new(0),
        }
    }

    pub fn get_info(&self) -> MempoolInfo {
        self.info.read().clone()
    }

    pub fn get_fees(&self) -> RecommendedFees {
        self.snapshot.read().fees.clone()
    }

    pub fn get_snapshot(&self) -> Snapshot {
        self.snapshot.read().clone()
    }

    pub fn get_block_stats(&self) -> Vec<BlockStats> {
        self.snapshot.read().block_stats.clone()
    }

    pub fn next_block_hash(&self) -> u64 {
        self.snapshot.read().next_block_hash()
    }

    pub fn addr_hash(&self, addr: &AddrBytes) -> u64 {
        let addrs = self.addrs.read();
        let Some((stats, _)) = addrs.get(addr) else {
            return 0;
        };
        let mut hasher = DefaultHasher::new();
        stats.hash(&mut hasher);
        hasher.finish()
    }

    pub fn get_txs(&self) -> RwLockReadGuard<'_, TxStore> {
        self.txs.read()
    }

    pub fn get_entries(&self) -> RwLockReadGuard<'_, EntryPool> {
        self.entries.read()
    }

    pub fn get_addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
        self.addrs.read()
    }

    /// Start an infinite update loop with a 1 second interval.
    pub fn start(&self) {
        loop {
            if let Err(e) = self.update() {
                error!("Error updating mempool: {}", e);
            }
            thread::sleep(Duration::from_secs(1));
        }
    }

    /// Sync with Bitcoin Core mempool and rebuild projections if needed.
    pub fn update(&self) -> Result<()> {
        let entries_info = self.client.get_raw_mempool_verbose()?;

        let new_txs = self.fetch_new_txs(&entries_info);
        let has_changes = self.apply_changes(&entries_info, new_txs);

        if has_changes {
            self.dirty.store(true, Ordering::Release);
        }

        self.rebuild_if_needed();

        Ok(())
    }

    /// Fetch full transaction data for new txids (needed for address tracking).
    fn fetch_new_txs(&self, entries_info: &[MempoolEntryInfo]) -> FxHashMap<Txid, TxWithHex> {
        let txs = self.txs.read();
        entries_info
            .iter()
            .filter(|e| !txs.contains(&e.txid))
            .take(MAX_TX_FETCHES_PER_CYCLE)
            .filter_map(|entry| {
                self.build_transaction(entry, &txs)
                    .ok()
                    .map(|tx| (entry.txid.clone(), tx))
            })
            .collect()
    }

    fn build_transaction(
        &self,
        entry: &MempoolEntryInfo,
        mempool_txs: &TxStore,
    ) -> Result<TxWithHex> {
        let (mut btc_tx, hex) = self.client.get_mempool_raw_tx(&entry.txid)?;

        let total_size = hex.len() / 2;
        let total_sigop_cost = btc_tx.total_sigop_cost(|_| None);

        // Collect unique parent txids not in the mempool store,
        // fetch each once instead of one get_tx_out per input
        let mut parent_cache: FxHashMap<Txid, Vec<bitcoin::TxOut>> = FxHashMap::default();
        for txin in &btc_tx.input {
            let prev_txid: Txid = txin.previous_output.txid.into();
            if !mempool_txs.contains_key(&prev_txid)
                && !parent_cache.contains_key(&prev_txid)
                && let Ok(prev) = self
                    .client
                    .get_raw_transaction(&prev_txid, None as Option<&BlockHash>)
            {
                parent_cache.insert(prev_txid, prev.output);
            }
        }

        let input = mem::take(&mut btc_tx.input)
            .into_iter()
            .map(|txin| {
                let prev_txid: Txid = txin.previous_output.txid.into();
                let prev_vout = usize::from(Vout::from(txin.previous_output.vout));

                let prevout = if let Some(prev) = mempool_txs.get(&prev_txid) {
                    prev.tx()
                        .output
                        .get(prev_vout)
                        .map(|o| TxOut::from((o.script_pubkey.clone(), o.value)))
                } else if let Some(outputs) = parent_cache.get(&prev_txid) {
                    outputs
                        .get(prev_vout)
                        .map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into())))
                } else {
                    None
                };

                TxIn {
                    is_coinbase: prevout.is_none(),
                    prevout,
                    txid: prev_txid,
                    vout: txin.previous_output.vout.into(),
                    script_sig: txin.script_sig,
                    script_sig_asm: (),
                    witness: txin
                        .witness
                        .iter()
                        .map(|w| w.to_lower_hex_string())
                        .collect(),
                    sequence: txin.sequence.into(),
                    inner_redeem_script_asm: (),
                    inner_witness_script_asm: (),
                }
            })
            .collect();

        let tx = Transaction {
            index: None,
            txid: entry.txid.clone(),
            version: btc_tx.version.into(),
            total_sigop_cost,
            weight: entry.weight.into(),
            lock_time: btc_tx.lock_time.into(),
            total_size,
            fee: entry.fee,
            input,
            output: btc_tx.output.into_iter().map(TxOut::from).collect(),
            status: TxStatus::UNCONFIRMED,
        };

        Ok(TxWithHex::new(tx, hex))
    }

    /// Apply transaction additions and removals. Returns true if there were changes.
    fn apply_changes(
        &self,
        entries_info: &[MempoolEntryInfo],
        new_txs: FxHashMap<Txid, TxWithHex>,
    ) -> bool {
        let entries_by_prefix: FxHashMap<TxidPrefix, &MempoolEntryInfo> = entries_info
            .iter()
            .map(|e| (TxidPrefix::from(&e.txid), e))
            .collect();

        let mut info = self.info.write();
        let mut txs = self.txs.write();
        let mut addrs = self.addrs.write();
        let mut entries = self.entries.write();

        let mut had_removals = false;
        let had_additions = !new_txs.is_empty();

        // Remove transactions no longer in mempool
        txs.retain_or_remove(
            |txid| entries_by_prefix.contains_key(&TxidPrefix::from(txid)),
            |txid, tx_with_hex| {
                had_removals = true;
                let tx = tx_with_hex.tx();
                let prefix = TxidPrefix::from(txid);

                // Get fee from entries (before removing) - this is the authoritative fee from Bitcoin Core
                let fee = entries.get(&prefix).map(|e| e.fee).unwrap_or_default();
                info.remove(tx, fee);
                addrs.remove_tx(tx, txid);
                entries.remove(&prefix);
            },
        );

        // Add new transactions
        for (txid, tx_with_hex) in &new_txs {
            let tx = tx_with_hex.tx();
            let prefix = TxidPrefix::from(txid);

            let Some(entry_info) = entries_by_prefix.get(&prefix) else {
                continue;
            };

            info.add(tx, entry_info.fee);
            addrs.add_tx(tx, txid);
            entries.insert(
                prefix,
                Entry {
                    txid: entry_info.txid.clone(),
                    fee: entry_info.fee,
                    vsize: VSize::from(entry_info.vsize),
                    size: tx.total_size as u64,
                    ancestor_fee: entry_info.ancestor_fee,
                    ancestor_vsize: VSize::from(entry_info.ancestor_size),
                    depends: entry_info.depends.iter().map(TxidPrefix::from).collect(),
                    first_seen: Timestamp::now(),
                },
            );
        }
        txs.extend(new_txs);

        had_removals || had_additions
    }

    /// Rebuild projected blocks if dirty and enough time has passed.
    fn rebuild_if_needed(&self) {
        if !self.dirty.load(Ordering::Acquire) {
            return;
        }

        let now_ms = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0);

        let last = self.last_rebuild_ms.load(Ordering::Acquire);
        if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS {
            return;
        }

        if self
            .last_rebuild_ms
            .compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed)
            .is_err()
        {
            return;
        }

        self.dirty.store(false, Ordering::Release);

        // let i = Instant::now();
        self.rebuild_projected_blocks();
        // debug!("mempool: rebuild_projected_blocks in {:?}", i.elapsed());
    }

    /// Rebuild projected blocks snapshot.
    fn rebuild_projected_blocks(&self) {
        let entries = self.entries.read();
        let entries_slice = entries.entries();

        let blocks = build_projected_blocks(entries_slice);
        let snapshot = Snapshot::build(blocks, entries_slice);

        *self.snapshot.write() = snapshot;
    }
}