mod steps;
mod stores;
pub use steps::preparer::Removal;
pub use steps::rebuilder::projected_blocks::{BlockStats, RecommendedFees, Snapshot};
pub use stores::{Entry, EntryPool, Tombstone, TxGraveyard, TxStore};
use std::{sync::Arc, thread, time::Duration};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{AddrBytes, MempoolInfo, TxOut, Txid, Vout};
use parking_lot::RwLockReadGuard;
use tracing::error;
use crate::{
steps::{fetcher::Fetcher, preparer::Preparer, rebuilder::Rebuilder, resolver::Resolver},
stores::{AddrTracker, MempoolState},
};
#[derive(Clone)]
pub struct Mempool(Arc<Inner>);
struct Inner {
client: Client,
state: MempoolState,
rebuilder: Rebuilder,
}
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(Inner {
client: client.clone(),
state: MempoolState::default(),
rebuilder: Rebuilder::default(),
}))
}
pub fn info(&self) -> MempoolInfo {
self.0.state.info.read().clone()
}
pub fn snapshot(&self) -> Arc<Snapshot> {
self.0.rebuilder.snapshot()
}
pub fn fees(&self) -> RecommendedFees {
self.0.rebuilder.fees()
}
pub fn block_stats(&self) -> Vec<BlockStats> {
self.0.rebuilder.block_stats()
}
pub fn next_block_hash(&self) -> u64 {
self.0.rebuilder.next_block_hash()
}
pub fn addr_state_hash(&self, addr: &AddrBytes) -> u64 {
self.0.state.addrs.read().stats_hash(addr)
}
pub fn txs(&self) -> RwLockReadGuard<'_, TxStore> {
self.0.state.txs.read()
}
pub fn entries(&self) -> RwLockReadGuard<'_, EntryPool> {
self.0.state.entries.read()
}
pub fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
self.0.state.addrs.read()
}
pub fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> {
self.0.state.graveyard.read()
}
pub fn start(&self) {
self.start_with(|| {});
}
pub fn start_with(&self, mut after_update: impl FnMut()) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
after_update();
thread::sleep(Duration::from_secs(1));
}
}
pub fn fill_prevouts<F>(&self, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
Resolver::resolve_external(&self.0.state, resolver)
}
pub fn update(&self) -> Result<()> {
let inner = &*self.0;
let fetched = Fetcher::fetch(
&inner.client,
&inner.state.txs.read(),
&inner.state.graveyard.read(),
)?;
let pulled = Preparer::prepare(
fetched,
&inner.state.txs.read(),
&inner.state.graveyard.read(),
);
if inner.state.apply(pulled) {
Resolver::resolve_in_mempool(&inner.state);
inner.rebuilder.mark_dirty();
}
inner.rebuilder.tick(&inner.client, &inner.state.entries);
Ok(())
}
}