use crate::{
feerate::{FeerateEstimator, FeerateEstimatorArgs},
mempool::{
config::Config,
errors::{RuleError, RuleResult},
model::{
map::MempoolTransactionCollection,
pool::{Pool, TransactionsEdges},
tx::{DoubleSpend, MempoolTransaction},
utxo_set::MempoolUtxoSet,
},
tx::Priority,
},
model::{topological_index::TopologicalIndex, TransactionIdSet},
Policy,
};
use kaspa_consensus_core::{
block::TemplateTransactionSelector,
tx::{MutableTransaction, TransactionId, TransactionOutpoint},
};
use kaspa_core::{debug, time::unix_now, trace};
use std::{
collections::{hash_map::Keys, hash_set::Iter},
iter::once,
sync::Arc,
};
use super::frontier::Frontier;
pub(crate) struct TransactionsPool {
config: Arc<Config>,
all_transactions: MempoolTransactionCollection,
parent_transactions: TransactionsEdges,
chained_transactions: TransactionsEdges,
ready_transactions: Frontier,
last_expire_scan_daa_score: u64,
last_expire_scan_time: u64,
estimated_size: usize,
utxo_set: MempoolUtxoSet,
}
impl TransactionsPool {
pub(crate) fn new(config: Arc<Config>) -> Self {
Self {
config,
all_transactions: MempoolTransactionCollection::default(),
parent_transactions: TransactionsEdges::default(),
chained_transactions: TransactionsEdges::default(),
ready_transactions: Default::default(),
last_expire_scan_daa_score: 0,
last_expire_scan_time: unix_now(),
utxo_set: MempoolUtxoSet::new(),
estimated_size: 0,
}
}
pub(crate) fn add_transaction(
&mut self,
transaction: MutableTransaction,
virtual_daa_score: u64,
priority: Priority,
transaction_size: usize,
) -> RuleResult<&MempoolTransaction> {
let transaction = MempoolTransaction::new(transaction, priority, virtual_daa_score);
let id = transaction.id();
self.add_mempool_transaction(transaction, transaction_size)?;
Ok(self.get(&id).unwrap())
}
pub(crate) fn add_mempool_transaction(&mut self, transaction: MempoolTransaction, transaction_size: usize) -> RuleResult<()> {
let id = transaction.id();
assert!(!self.all_transactions.contains_key(&id), "transaction {id} to be added already exists in the transactions pool");
assert!(transaction.mtx.is_fully_populated(), "transaction {id} to be added in the transactions pool is not fully populated");
let parents = self.get_parent_transaction_ids_in_pool(&transaction.mtx);
self.parent_transactions.insert(id, parents.clone());
if parents.is_empty() {
self.ready_transactions.insert((&transaction).into());
}
for parent_id in parents {
let entry = self.chained_transactions.entry(parent_id).or_default();
entry.insert(id);
}
self.utxo_set.add_transaction(&transaction.mtx);
self.estimated_size += transaction_size;
self.all_transactions.insert(id, transaction);
trace!("Added transaction {}", id);
Ok(())
}
pub(crate) fn remove_transaction(&mut self, transaction_id: &TransactionId) -> RuleResult<MempoolTransaction> {
if let Some(parents) = self.parent_transactions.get(transaction_id) {
for parent in parents.iter() {
if let Some(chains) = self.chained_transactions.get_mut(parent) {
chains.remove(transaction_id);
}
}
}
if let Some(chains) = self.chained_transactions.get(transaction_id) {
for chain in chains.iter() {
if let Some(parents) = self.parent_transactions.get_mut(chain) {
parents.remove(transaction_id);
if parents.is_empty() {
let tx = self.all_transactions.get(chain).unwrap();
self.ready_transactions.insert(tx.into());
}
}
}
}
self.parent_transactions.remove(transaction_id);
self.chained_transactions.remove(transaction_id);
let removed_tx = self.all_transactions.remove(transaction_id).ok_or(RuleError::RejectMissingTransaction(*transaction_id))?;
self.ready_transactions.remove(&(&removed_tx).into());
let parent_ids = self.get_parent_transaction_ids_in_pool(&removed_tx.mtx);
self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids);
self.estimated_size -= removed_tx.mtx.mempool_estimated_bytes();
if self.all_transactions.is_empty() {
assert_eq!(0, self.estimated_size, "Sanity test -- if tx pool is empty, estimated byte size should be zero");
}
Ok(removed_tx)
}
pub(crate) fn update_revalidated_transaction(&mut self, transaction: MutableTransaction) -> bool {
if let Some(tx) = self.all_transactions.get_mut(&transaction.id()) {
self.estimated_size -= tx.mtx.mempool_estimated_bytes();
tx.mtx = transaction;
self.estimated_size += tx.mtx.mempool_estimated_bytes();
true
} else {
false
}
}
pub(crate) fn ready_transaction_count(&self) -> usize {
self.ready_transactions.len()
}
pub(crate) fn ready_transaction_total_mass(&self) -> u64 {
self.ready_transactions.total_mass()
}
pub(crate) fn build_selector(&self) -> Box<dyn TemplateTransactionSelector> {
self.ready_transactions.build_selector(&Policy::new(self.config.maximum_mass_per_block))
}
pub(crate) fn build_feerate_estimator(&self, args: FeerateEstimatorArgs) -> FeerateEstimator {
self.ready_transactions.build_feerate_estimator(args)
}
pub(crate) fn limit_transaction_count(
&self,
transaction: &MutableTransaction,
transaction_size: usize,
) -> RuleResult<Vec<TransactionId>> {
if self.len() < self.config.maximum_transaction_count
&& self.estimated_size + transaction_size <= self.config.mempool_size_limit
{
return Ok(Default::default());
}
let feerate_threshold = transaction.calculated_feerate().unwrap();
let mut txs_to_remove = Vec::with_capacity(1); let mut selection_overall_size = 0;
for tx in self
.ready_transactions
.ascending_iter()
.map(|tx| self.all_transactions.get(&tx.id()).unwrap())
.filter(|mtx| mtx.priority == Priority::Low)
{
let redeemers = self.get_redeemer_ids_in_pool(&tx.id()).into_iter().chain(once(tx.id())).collect::<TransactionIdSet>();
if transaction.has_parent_in_set(&redeemers) {
continue;
}
if tx.fee_rate() > feerate_threshold {
let err = RuleError::RejectMempoolIsFull;
debug!("Transaction {} with feerate {} has been rejected: {}", transaction.id(), feerate_threshold, err);
return Err(err);
}
txs_to_remove.push(tx.id());
selection_overall_size += tx.mtx.mempool_estimated_bytes();
if self.len() + 1 - txs_to_remove.len() <= self.config.maximum_transaction_count
&& self.estimated_size + transaction_size - selection_overall_size <= self.config.mempool_size_limit
{
return Ok(txs_to_remove);
}
}
debug!(
"Mempool is filled with high-priority/ancestor txs (count: {}, bytes: {}). Transaction {} with feerate {} and size {} has been rejected: {}",
self.len(),
self.estimated_size,
transaction.id(),
feerate_threshold,
transaction_size,
RuleError::RejectMempoolIsFull
);
Err(RuleError::RejectMempoolIsFull)
}
pub(crate) fn get_estimated_size(&self) -> usize {
self.estimated_size
}
pub(crate) fn all_transaction_ids_with_priority(&self, priority: Priority) -> Vec<TransactionId> {
self.all().values().filter_map(|x| if x.priority == priority { Some(x.id()) } else { None }).collect()
}
pub(crate) fn get_outpoint_owner_id(&self, outpoint: &TransactionOutpoint) -> Option<&TransactionId> {
self.utxo_set.get_outpoint_owner_id(outpoint)
}
pub(crate) fn check_double_spends(&self, transaction: &MutableTransaction) -> RuleResult<()> {
self.utxo_set.check_double_spends(transaction)
}
pub(crate) fn get_double_spend_transaction_ids(&self, transaction: &MutableTransaction) -> Vec<DoubleSpend> {
self.utxo_set.get_double_spend_transaction_ids(transaction)
}
pub(crate) fn get_double_spend_owner<'a>(&'a self, double_spend: &DoubleSpend) -> RuleResult<&'a MempoolTransaction> {
match self.get(&double_spend.owner_id) {
Some(transaction) => Ok(transaction),
None => {
Err(double_spend.into())
}
}
}
pub(crate) fn collect_expired_low_priority_transactions(&mut self, virtual_daa_score: u64) -> Vec<TransactionId> {
let now = unix_now();
if virtual_daa_score < self.last_expire_scan_daa_score + self.config.transaction_expire_scan_interval_daa_score
|| now < self.last_expire_scan_time + self.config.transaction_expire_scan_interval_milliseconds
{
return vec![];
}
self.last_expire_scan_daa_score = virtual_daa_score;
self.last_expire_scan_time = now;
self.all_transactions
.values()
.filter_map(|x| {
if (x.priority == Priority::Low)
&& virtual_daa_score > x.added_at_daa_score + self.config.transaction_expire_interval_daa_score
{
Some(x.id())
} else {
None
}
})
.collect()
}
}
type IterTxId<'a> = Iter<'a, TransactionId>;
type KeysTxId<'a> = Keys<'a, TransactionId, MempoolTransaction>;
impl<'a> TopologicalIndex<'a, KeysTxId<'a>, IterTxId<'a>, TransactionId> for TransactionsPool {
fn topology_nodes(&'a self) -> KeysTxId<'a> {
self.all_transactions.keys()
}
fn topology_node_edges(&'a self, key: &TransactionId) -> Option<IterTxId<'a>> {
self.chained_transactions.get(key).map(|x| x.iter())
}
}
impl Pool for TransactionsPool {
#[inline]
fn all(&self) -> &MempoolTransactionCollection {
&self.all_transactions
}
#[inline]
fn chained(&self) -> &TransactionsEdges {
&self.chained_transactions
}
}