use std::sync::{Arc, RwLock};
use log::debug;
use tari_common_types::types::{CompressedSignature, FixedHash, HashOutput, PrivateKey};
use tari_node_components::blocks::Block;
use tari_transaction_components::{rpc::models::FeePerGramStat, transaction_components::Transaction};
use tokio::task;
use crate::{
consensus::BaseNodeConsensusManager,
mempool::{
MempoolConfig,
StateResponse,
StatsResponse,
TxStorageResponse,
error::MempoolError,
mempool_storage::MempoolStorage,
},
validation::TransactionValidator,
};
pub const LOG_TARGET: &str = "c::mp::mempool";
#[derive(Clone)]
pub struct Mempool {
pool_storage: Arc<RwLock<MempoolStorage>>,
}
impl Mempool {
pub fn new(
config: MempoolConfig,
rules: BaseNodeConsensusManager,
validator: Box<dyn TransactionValidator>,
) -> Self {
Self {
pool_storage: Arc::new(RwLock::new(MempoolStorage::new(config, rules, validator))),
}
}
pub async fn insert(&self, tx: Arc<Transaction>) -> Result<TxStorageResponse, MempoolError> {
self.with_write_access(|storage| {
storage
.insert(tx)
.map_err(|e| MempoolError::InternalError(e.to_string()))
})
.await
}
pub async fn insert_all(&self, transactions: Vec<Arc<Transaction>>) -> Result<(), MempoolError> {
self.with_write_access(|storage| {
for tx in transactions {
storage
.insert(tx)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
}
Ok(())
})
.await
}
pub async fn process_published_block(&self, published_block: Arc<Block>) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_published_block(&published_block))
.await
}
pub async fn clear_transactions_for_failed_block(&self, failed_block: Arc<Block>) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.clear_transactions_for_failed_block(&failed_block))
.await
}
pub async fn process_reorg(
&self,
removed_blocks: Vec<Arc<Block>>,
new_blocks: Vec<Arc<Block>>,
) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_reorg(&removed_blocks, &new_blocks))
.await
}
pub async fn process_sync(&self) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_sync()).await
}
pub async fn snapshot(&self) -> Result<Vec<Arc<Transaction>>, MempoolError> {
self.with_read_access(|storage| Ok(storage.snapshot())).await
}
pub async fn retrieve(&self, total_weight: u64) -> Result<Vec<Arc<Transaction>>, MempoolError> {
let start = std::time::Instant::now();
let retrieved = self
.with_read_access(move |storage| storage.retrieve(total_weight))
.await?;
debug!(
target: LOG_TARGET,
"Retrieved {} highest priority transaction(s) from the mempool in {:.0?} ms",
retrieved.retrieved_transactions.len(),
start.elapsed()
);
if !retrieved.transactions_to_remove_and_insert.is_empty() {
debug!(
target: LOG_TARGET,
"Removing {} transaction(s) from unconfirmed pool because they need re-evaluation",
retrieved.transactions_to_remove_and_insert.len()
);
let transactions_to_remove_and_insert = retrieved.transactions_to_remove_and_insert.clone();
self.with_write_access(move |storage| {
storage.remove_and_reinsert_transactions(transactions_to_remove_and_insert)
})
.await?;
}
Ok(retrieved.retrieved_transactions)
}
pub async fn retrieve_by_excess_sigs(
&self,
excess_sigs: Vec<PrivateKey>,
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
self.with_read_access(move |storage| storage.retrieve_by_excess_sigs(&excess_sigs))
.await
}
pub async fn filter_outputs_in_mempool(
&self,
output_hashes: Vec<HashOutput>,
) -> Result<Vec<HashOutput>, MempoolError> {
self.with_read_access(move |storage| Ok(storage.filter_outputs_in_mempool(&output_hashes)))
.await
}
pub async fn has_tx_with_excess_sig(
&self,
excess_sig: CompressedSignature,
) -> Result<TxStorageResponse, MempoolError> {
self.with_read_access(move |storage| Ok(storage.has_tx_with_excess_sig(&excess_sig)))
.await
}
pub async fn has_transaction(&self, tx: Arc<Transaction>) -> Result<TxStorageResponse, MempoolError> {
self.with_read_access(move |storage| storage.has_transaction(&tx)).await
}
pub async fn stats(&self) -> Result<StatsResponse, MempoolError> {
self.with_read_access(|storage| storage.stats().map_err(|e| MempoolError::InternalError(e.to_string())))
.await
}
pub async fn state(&self) -> Result<StateResponse, MempoolError> {
self.with_read_access(|storage| Ok(storage.state())).await
}
pub async fn get_fee_per_gram_stats(
&self,
count: usize,
tip_height: u64,
) -> Result<Vec<FeePerGramStat>, MempoolError> {
self.with_read_access(move |storage| storage.get_fee_per_gram_stats(count, tip_height))
.await
}
async fn with_read_access<F, T>(&self, callback: F) -> Result<T, MempoolError>
where
F: FnOnce(&MempoolStorage) -> Result<T, MempoolError> + Send + 'static,
T: Send + 'static,
{
let storage = self.pool_storage.clone();
task::spawn_blocking(move || {
let lock = storage.read().map_err(|_| MempoolError::RwLockPoisonError)?;
callback(&lock)
})
.await?
}
async fn with_write_access<F, T>(&self, callback: F) -> Result<T, MempoolError>
where
F: FnOnce(&mut MempoolStorage) -> Result<T, MempoolError> + Send + 'static,
T: Send + 'static,
{
let storage = self.pool_storage.clone();
task::spawn_blocking(move || {
let mut lock = storage.write().map_err(|_| MempoolError::RwLockPoisonError)?;
callback(&mut lock)
})
.await?
}
pub async fn get_last_seen_hash(&self) -> Result<FixedHash, MempoolError> {
self.with_read_access(|storage| Ok(storage.last_seen_hash)).await
}
}