use std::{sync::Arc, time::Instant};
use log::*;
use tari_common_types::types::{CompressedSignature, FixedHash, HashOutput, PrivateKey};
use tari_node_components::blocks::Block;
use tari_transaction_components::{
rpc::models::FeePerGramStat,
transaction_components::{Transaction, TransactionError},
weight::TransactionWeight,
};
use tari_utilities::hex::Hex;
#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
consensus::BaseNodeConsensusManager,
mempool::{
MempoolConfig,
StateResponse,
StatsResponse,
TxStorageResponse,
error::MempoolError,
reorg_pool::ReorgPool,
unconfirmed_pool::{RetrieveResults, TransactionKey, UnconfirmedPool, UnconfirmedPoolError},
},
validation::{TransactionValidator, ValidationError},
};
pub const LOG_TARGET: &str = "c::mp::mempool_storage";
pub struct MempoolStorage {
pub(crate) unconfirmed_pool: UnconfirmedPool,
reorg_pool: ReorgPool,
validator: Box<dyn TransactionValidator>,
rules: BaseNodeConsensusManager,
last_seen_height: u64,
pub(crate) last_seen_hash: FixedHash,
}
impl MempoolStorage {
pub fn new(
config: MempoolConfig,
rules: BaseNodeConsensusManager,
validator: Box<dyn TransactionValidator>,
) -> Self {
Self {
unconfirmed_pool: UnconfirmedPool::new(config.unconfirmed_pool),
reorg_pool: ReorgPool::new(config.reorg_pool),
validator,
rules,
last_seen_height: 0,
last_seen_hash: Default::default(),
}
}
pub fn insert(&mut self, tx: Arc<Transaction>) -> Result<TxStorageResponse, UnconfirmedPoolError> {
let tx_id = tx
.body
.kernels()
.first()
.map(|k| k.excess_sig.get_signature().to_hex())
.unwrap_or_else(|| "None?!".into());
let timer = Instant::now();
debug!(target: LOG_TARGET, "Inserting tx into mempool: {tx_id}");
let tx_fee = match tx.body.get_total_fee() {
Ok(fee) => fee,
Err(e) => {
warn!(target: LOG_TARGET, "Invalid transaction: {e}");
return Ok(TxStorageResponse::NotStoredConsensus);
},
};
if tx_fee.as_u64() < self.unconfirmed_pool.config.min_fee {
debug!(target: LOG_TARGET, "Tx: ({tx_id}) fee too low, rejecting");
return Ok(TxStorageResponse::NotStoredFeeTooLow);
}
match self.validator.validate(&tx) {
Ok(()) => {
debug!(
target: LOG_TARGET,
"Transaction {} is VALID ({:.2?}), inserting in unconfirmed pool in",
tx_id,
timer.elapsed()
);
let timer = Instant::now();
let weight = self.get_transaction_weighting();
self.unconfirmed_pool.insert(tx, None, &weight)?;
debug!(
target: LOG_TARGET,
"Transaction {} inserted in {:.2?}",
tx_id,
timer.elapsed()
);
Ok(TxStorageResponse::UnconfirmedPool)
},
Err(ValidationError::UnknownInputs(dependent_outputs)) => {
if self.unconfirmed_pool.contains_all_outputs(&dependent_outputs) {
let weight = self.get_transaction_weighting();
self.unconfirmed_pool.insert(tx, Some(dependent_outputs), &weight)?;
Ok(TxStorageResponse::UnconfirmedPool)
} else {
Ok(TxStorageResponse::NotStoredOrphan)
}
},
Err(ValidationError::ContainsSTxO) => {
info!(target: LOG_TARGET, "Validation failed due to already spent input");
Ok(TxStorageResponse::NotStoredAlreadySpent)
},
Err(ValidationError::MaturityError) => Ok(TxStorageResponse::NotStoredTimeLocked),
Err(ValidationError::ConsensusError(msg)) => {
warn!(target: LOG_TARGET, "Validation failed due to consensus rule: {msg}");
Ok(TxStorageResponse::NotStoredConsensus)
},
Err(ValidationError::DuplicateKernelError(msg)) => {
debug!(
target: LOG_TARGET,
"Validation failed due to already mined kernel: {msg}"
);
Ok(TxStorageResponse::NotStoredAlreadyMined)
},
Err(e) => {
info!(target: LOG_TARGET, "Validation failed due to error: {e}");
Ok(TxStorageResponse::NotStored)
},
}
}
fn get_transaction_weighting(&self) -> TransactionWeight {
*self
.rules
.consensus_constants(self.last_seen_height)
.transaction_weight_params()
}
pub(crate) fn remove_and_reinsert_transactions(
&mut self,
transactions: Vec<(TransactionKey, Arc<Transaction>)>,
) -> Result<(), MempoolError> {
for (tx_key, _) in &transactions {
self.unconfirmed_pool
.remove_transaction(*tx_key)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
}
self.insert_txs(transactions.iter().map(|(_, tx)| tx.clone()).collect())
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
Ok(())
}
fn insert_txs(&mut self, txs: Vec<Arc<Transaction>>) -> Result<(), UnconfirmedPoolError> {
for tx in txs {
self.insert(tx)?;
}
Ok(())
}
pub fn process_published_block(&mut self, published_block: &Block) -> Result<(), MempoolError> {
debug!(
target: LOG_TARGET,
"Mempool processing new block: #{} ({}) {}",
published_block.header.height,
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
let removed_transactions = self
.unconfirmed_pool
.remove_published_and_discard_deprecated_transactions(published_block)?;
debug!(
target: LOG_TARGET,
"{} transactions removed from unconfirmed pool in {:.2?}, moving them to reorg pool for block #{} ({}) {}",
removed_transactions.len(),
timer.elapsed(),
published_block.header.height,
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
self.reorg_pool
.insert_all(published_block.header.height, removed_transactions);
debug!(
target: LOG_TARGET,
"Transactions added to reorg pool in {:.2?} for block #{} ({}) {}",
timer.elapsed(),
published_block.header.height,
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
self.unconfirmed_pool.compact();
self.reorg_pool.compact();
self.last_seen_height = published_block.header.height;
self.last_seen_hash = published_block.header.hash();
debug!(target: LOG_TARGET, "Compaction took {:.2?}", timer.elapsed());
match self.stats() {
Ok(stats) => debug!(target: LOG_TARGET, "{stats}"),
Err(e) => warn!(target: LOG_TARGET, "error to obtain stats: {e}"),
}
#[cfg(feature = "metrics")]
metrics::reorg_invalid_transactions().set(0);
Ok(())
}
pub fn clear_transactions_for_failed_block(&mut self, failed_block: &Block) -> Result<(), MempoolError> {
warn!(
target: LOG_TARGET,
"Removing transaction from failed block #{} ({})",
failed_block.header.height,
failed_block.hash().to_hex()
);
let txs = self
.unconfirmed_pool
.remove_published_and_discard_deprecated_transactions(failed_block)?;
self.insert_txs(txs)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
self.unconfirmed_pool.compact();
Ok(())
}
pub fn process_reorg(
&mut self,
removed_blocks: &[Arc<Block>],
new_blocks: &[Arc<Block>],
) -> Result<(), MempoolError> {
debug!(target: LOG_TARGET, "Mempool processing reorg");
let mut num_invalid_txs: i64 = 0;
let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions();
let num_removed_txs = removed_txs.len();
for tx in removed_txs {
let resp = self
.insert(tx)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
if resp == TxStorageResponse::NotStoredAlreadySpent {
num_invalid_txs += 1;
}
}
let reorg_txs = self
.reorg_pool
.remove_reorged_txs_and_discard_double_spends(removed_blocks, new_blocks);
let num_reorg_txs = reorg_txs.len();
for tx in reorg_txs {
let resp = self
.insert(tx)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
if resp == TxStorageResponse::NotStoredAlreadySpent {
num_invalid_txs += 1;
}
}
if num_invalid_txs > 0 {
warn!(
target: LOG_TARGET,
"Mempool reorg: {num_invalid_txs} transaction(s) invalidated \
(from {num_removed_txs} unconfirmed and {num_reorg_txs} reorg pool transactions)"
);
}
#[cfg(feature = "metrics")]
metrics::reorg_invalid_transactions().set(num_invalid_txs);
if let Some((height, hash)) = new_blocks
.last()
.or_else(|| removed_blocks.first())
.map(|block| (block.header.height, block.header.hash()))
{
self.last_seen_height = height;
self.last_seen_hash = hash;
}
Ok(())
}
pub fn process_sync(&mut self) -> Result<(), MempoolError> {
debug!(target: LOG_TARGET, "Mempool processing sync finished");
let txs = self.unconfirmed_pool.drain_all_mempool_transactions();
self.insert_txs(txs)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
let txs = self.reorg_pool.clear_and_retrieve_all();
self.insert_txs(txs)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
Ok(())
}
pub fn snapshot(&self) -> Vec<Arc<Transaction>> {
self.unconfirmed_pool.snapshot()
}
pub fn retrieve(&self, total_weight: u64) -> Result<RetrieveResults, MempoolError> {
self.unconfirmed_pool
.fetch_highest_priority_txs(total_weight)
.map_err(|e| MempoolError::InternalError(e.to_string()))
}
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
let (found_txns, remaining) = self.unconfirmed_pool.retrieve_by_excess_sigs(excess_sigs)?;
match self.reorg_pool.retrieve_by_excess_sigs(&remaining) {
Ok((found_published_transactions, remaining)) => Ok((
found_txns.into_iter().chain(found_published_transactions).collect(),
remaining,
)),
Err(e) => Err(e),
}
}
pub fn filter_outputs_in_mempool(&self, output_hashes: &[HashOutput]) -> Vec<HashOutput> {
self.unconfirmed_pool.filter_outputs(output_hashes)
}
pub fn has_tx_with_excess_sig(&self, excess_sig: &CompressedSignature) -> TxStorageResponse {
if self.unconfirmed_pool.has_tx_with_excess_sig(excess_sig) {
TxStorageResponse::UnconfirmedPool
} else if self.reorg_pool.has_tx_with_excess_sig(excess_sig) {
TxStorageResponse::ReorgPool
} else {
TxStorageResponse::NotStored
}
}
pub fn has_transaction(&self, tx: &Transaction) -> Result<TxStorageResponse, MempoolError> {
tx.body
.kernels()
.iter()
.fold(None, |stored, kernel| {
if stored.is_none() {
return Some(self.has_tx_with_excess_sig(&kernel.excess_sig));
}
let stored = stored.unwrap();
match (self.has_tx_with_excess_sig(&kernel.excess_sig), stored) {
(TxStorageResponse::UnconfirmedPool, TxStorageResponse::UnconfirmedPool) => {
Some(TxStorageResponse::UnconfirmedPool)
},
(TxStorageResponse::UnconfirmedPool, TxStorageResponse::ReorgPool) |
(TxStorageResponse::ReorgPool, TxStorageResponse::UnconfirmedPool) => {
Some(TxStorageResponse::NotStoredAlreadySpent)
},
(TxStorageResponse::ReorgPool, TxStorageResponse::ReorgPool) => Some(TxStorageResponse::ReorgPool),
(TxStorageResponse::UnconfirmedPool, other) |
(TxStorageResponse::ReorgPool, other) |
(other, _) => Some(other),
}
})
.ok_or(MempoolError::TransactionNoKernels)
}
pub fn stats(&self) -> Result<StatsResponse, TransactionError> {
let weighting = self.get_transaction_weighting();
Ok(StatsResponse {
unconfirmed_txs: self.unconfirmed_pool.len() as u64,
reorg_txs: self.reorg_pool.len() as u64,
unconfirmed_weight: self.unconfirmed_pool.calculate_weight(&weighting)?,
})
}
pub fn state(&self) -> StateResponse {
let unconfirmed_pool = self.unconfirmed_pool.snapshot();
let reorg_pool = self
.reorg_pool
.snapshot()
.iter()
.map(|tx| tx.first_kernel_excess_sig().cloned().unwrap_or_default())
.collect::<Vec<_>>();
StateResponse {
unconfirmed_pool,
reorg_pool,
}
}
pub fn get_fee_per_gram_stats(&self, count: usize, tip_height: u64) -> Result<Vec<FeePerGramStat>, MempoolError> {
let target_weight = self
.rules
.consensus_constants(tip_height)
.max_block_transaction_weight();
let stats = self.unconfirmed_pool.get_fee_per_gram_stats(count, target_weight)?;
Ok(stats)
}
}