use crate::{
blocks::Block,
mempool::{
consts::{MEMPOOL_UNCONFIRMED_POOL_STORAGE_CAPACITY, MEMPOOL_UNCONFIRMED_POOL_WEIGHT_TRANSACTION_SKIP_COUNT},
priority::{FeePriority, PrioritizedTransaction},
unconfirmed_pool::UnconfirmedPoolError,
},
transactions::{transaction::Transaction, types::Signature},
};
use log::*;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
sync::Arc,
};
use tari_crypto::tari_utilities::hex::Hex;
pub const LOG_TARGET: &str = "c::mp::unconfirmed_pool::unconfirmed_pool_storage";
#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct UnconfirmedPoolConfig {
pub storage_capacity: usize,
pub weight_tx_skip_count: usize,
}
impl Default for UnconfirmedPoolConfig {
fn default() -> Self {
Self {
storage_capacity: MEMPOOL_UNCONFIRMED_POOL_STORAGE_CAPACITY,
weight_tx_skip_count: MEMPOOL_UNCONFIRMED_POOL_WEIGHT_TRANSACTION_SKIP_COUNT,
}
}
}
pub struct UnconfirmedPool {
config: UnconfirmedPoolConfig,
txs_by_signature: HashMap<Signature, PrioritizedTransaction>,
txs_by_priority: BTreeMap<FeePriority, Signature>,
}
impl UnconfirmedPool {
pub fn new(config: UnconfirmedPoolConfig) -> Self {
Self {
config,
txs_by_signature: HashMap::new(),
txs_by_priority: BTreeMap::new(),
}
}
fn lowest_priority(&self) -> &FeePriority {
self.txs_by_priority.iter().next().unwrap().0
}
fn remove_lowest_priority_tx(&mut self) {
if let Some((priority, sig)) = self.txs_by_priority.iter().next().map(|(p, s)| (p.clone(), s.clone())) {
self.txs_by_signature.remove(&sig);
self.txs_by_priority.remove(&priority);
}
}
#[allow(clippy::map_entry)]
pub fn insert(&mut self, tx: Arc<Transaction>) -> Result<(), UnconfirmedPoolError> {
let tx_key = tx.body.kernels()[0].excess_sig.clone();
if !self.txs_by_signature.contains_key(&tx_key) {
debug!(
target: LOG_TARGET,
"Inserting tx into unconfirmed pool: {}",
tx_key.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "Transaction inserted: {}", tx);
let prioritized_tx = PrioritizedTransaction::try_from((*tx).clone())?;
if self.txs_by_signature.len() >= self.config.storage_capacity {
if prioritized_tx.priority < *self.lowest_priority() {
return Ok(());
}
self.remove_lowest_priority_tx();
}
self.txs_by_priority
.insert(prioritized_tx.priority.clone(), tx_key.clone());
self.txs_by_signature.insert(tx_key, prioritized_tx);
}
Ok(())
}
#[cfg(test)]
pub fn insert_txs(&mut self, txs: Vec<Arc<Transaction>>) -> Result<(), UnconfirmedPoolError> {
for tx in txs.into_iter() {
self.insert(tx)?;
}
Ok(())
}
pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> bool {
self.txs_by_signature.contains_key(excess_sig)
}
pub fn highest_priority_txs(&self, total_weight: u64) -> Result<Vec<Arc<Transaction>>, UnconfirmedPoolError> {
let mut selected_txs: Vec<Arc<Transaction>> = Vec::new();
let mut curr_weight: u64 = 0;
let mut curr_skip_count: usize = 0;
for (_, tx_key) in self.txs_by_priority.iter().rev() {
let ptx = self
.txs_by_signature
.get(tx_key)
.ok_or_else(|| UnconfirmedPoolError::StorageOutofSync)?;
if curr_weight + ptx.weight <= total_weight {
if !UnconfirmedPool::find_duplicate_input(&selected_txs, &ptx.transaction) {
curr_weight += ptx.weight;
selected_txs.push(ptx.transaction.clone());
}
} else {
curr_skip_count += 1;
if curr_skip_count >= self.config.weight_tx_skip_count {
break;
}
}
}
Ok(selected_txs)
}
fn find_duplicate_input(array_of_tx: &[Arc<Transaction>], tx: &Arc<Transaction>) -> bool {
for transaction in array_of_tx {
for input in transaction.body.inputs() {
if tx.body.inputs().contains(input) {
return true;
}
}
}
false
}
fn discard_double_spends(&mut self, published_block: &Block) {
let mut removed_tx_keys = Vec::new();
for (tx_key, ptx) in self.txs_by_signature.iter() {
for input in ptx.transaction.body.inputs() {
if published_block.body.inputs().contains(input) {
self.txs_by_priority.remove(&ptx.priority);
removed_tx_keys.push(tx_key.clone());
}
}
}
for tx_key in &removed_tx_keys {
trace!(
target: LOG_TARGET,
"Removing double spends from unconfirmed pool: {:?}",
tx_key
);
self.txs_by_signature.remove(&tx_key);
}
}
pub fn remove_published_and_discard_double_spends(&mut self, published_block: &Block) -> Vec<Arc<Transaction>> {
let mut removed_txs = Vec::new();
published_block.body.kernels().iter().for_each(|kernel| {
if let Some(ptx) = self.txs_by_signature.get(&kernel.excess_sig) {
self.txs_by_priority.remove(&ptx.priority);
if let Some(ptx) = self.txs_by_signature.remove(&kernel.excess_sig) {
removed_txs.push(ptx.transaction);
}
}
});
self.discard_double_spends(published_block);
removed_txs
}
pub fn remove_timelocked(&mut self, tip_height: u64) -> Vec<Arc<Transaction>> {
let mut removed_tx_keys: Vec<Signature> = Vec::new();
for (tx_key, ptx) in self.txs_by_signature.iter() {
if ptx.transaction.min_spendable_height() > tip_height + 1 {
self.txs_by_priority.remove(&ptx.priority);
removed_tx_keys.push(tx_key.clone());
}
}
let mut removed_txs: Vec<Arc<Transaction>> = Vec::new();
for tx_key in removed_tx_keys {
trace!(
target: LOG_TARGET,
"Removing time locked transaction from unconfirmed pool: {:?}",
tx_key
);
if let Some(ptx) = self.txs_by_signature.remove(&tx_key) {
removed_txs.push(ptx.transaction);
}
}
removed_txs
}
pub fn len(&self) -> usize {
self.txs_by_signature.len()
}
pub fn snapshot(&self) -> Vec<Arc<Transaction>> {
self.txs_by_signature
.iter()
.map(|(_, ptx)| ptx.transaction.clone())
.collect()
}
pub fn calculate_weight(&self) -> u64 {
self.txs_by_signature
.iter()
.fold(0, |weight, (_, ptx)| weight + ptx.transaction.calculate_weight())
}
#[cfg(test)]
pub fn check_status(&self) -> bool {
if self.txs_by_priority.len() != self.txs_by_signature.len() {
return false;
}
self.txs_by_priority
.iter()
.all(|(_, tx_key)| self.txs_by_signature.contains_key(tx_key))
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
consensus::{ConsensusManagerBuilder, Network},
test_helpers::create_orphan_block,
transactions::tari_amount::MicroTari,
tx,
};
#[test]
fn test_insert_and_retrieve_highest_priority_txs() {
let tx1 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(50), inputs: 2, outputs: 1).0);
let tx2 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(20), inputs: 4, outputs: 1).0);
let tx3 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(100), inputs: 5, outputs: 1).0);
let tx4 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(30), inputs: 3, outputs: 1).0);
let tx5 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(55), inputs: 5, outputs: 1).0);
let mut unconfirmed_pool = UnconfirmedPool::new(UnconfirmedPoolConfig {
storage_capacity: 4,
weight_tx_skip_count: 3,
});
unconfirmed_pool
.insert_txs(vec![tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()])
.unwrap();
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig),
true
);
let desired_weight = tx1.calculate_weight() + tx3.calculate_weight() + tx5.calculate_weight();
let selected_txs = unconfirmed_pool.highest_priority_txs(desired_weight).unwrap();
assert_eq!(selected_txs.len(), 3);
assert!(selected_txs.contains(&tx1));
assert!(selected_txs.contains(&tx3));
assert!(selected_txs.contains(&tx5));
assert!(unconfirmed_pool.check_status());
}
#[test]
fn test_remove_reorg_txs() {
let network = Network::LocalNet;
let consensus = ConsensusManagerBuilder::new(network).build();
let tx1 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(50), inputs:2, outputs: 1).0);
let tx2 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(20), inputs:3, outputs: 1).0);
let tx3 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(100), inputs:2, outputs: 1).0);
let tx4 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(30), inputs:4, outputs: 1).0);
let tx5 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(50), inputs:3, outputs: 1).0);
let tx6 = Arc::new(tx!(MicroTari(10_000), fee: MicroTari(75), inputs:2, outputs: 1).0);
let mut unconfirmed_pool = UnconfirmedPool::new(UnconfirmedPoolConfig {
storage_capacity: 10,
weight_tx_skip_count: 3,
});
unconfirmed_pool
.insert_txs(vec![tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()])
.unwrap();
let snapshot_txs = unconfirmed_pool.snapshot();
assert_eq!(snapshot_txs.len(), 5);
assert!(snapshot_txs.contains(&tx1));
assert!(snapshot_txs.contains(&tx2));
assert!(snapshot_txs.contains(&tx3));
assert!(snapshot_txs.contains(&tx4));
assert!(snapshot_txs.contains(&tx5));
let published_block = create_orphan_block(0, vec![(*tx1).clone(), (*tx3).clone(), (*tx5).clone()], &consensus);
let _ = unconfirmed_pool.remove_published_and_discard_double_spends(&published_block);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig),
false
);
assert!(unconfirmed_pool.check_status());
}
#[test]
fn test_discard_double_spend_txs() {
let network = Network::LocalNet;
let consensus = ConsensusManagerBuilder::new(network).build();
let tx1 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(50), inputs:2, outputs:1).0);
let tx2 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(20), inputs:3, outputs:1).0);
let tx3 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(100), inputs:2, outputs:1).0);
let tx4 = Arc::new(tx!(MicroTari(5_000), fee: MicroTari(30), inputs:2, outputs:1).0);
let mut tx5 = tx!(MicroTari(5_000), fee:MicroTari(50), inputs:3, outputs:1).0;
let mut tx6 = tx!(MicroTari(5_000), fee:MicroTari(75), inputs: 2, outputs: 1).0;
tx5.body.inputs_mut()[0] = tx1.body.inputs()[0].clone();
tx6.body.inputs_mut()[1] = tx3.body.inputs()[1].clone();
let tx5 = Arc::new(tx5);
let tx6 = Arc::new(tx6);
let mut unconfirmed_pool = UnconfirmedPool::new(UnconfirmedPoolConfig {
storage_capacity: 10,
weight_tx_skip_count: 3,
});
unconfirmed_pool
.insert_txs(vec![
tx1.clone(),
tx2.clone(),
tx3.clone(),
tx4.clone(),
tx5.clone(),
tx6.clone(),
])
.unwrap();
let published_block = create_orphan_block(0, vec![(*tx1).clone(), (*tx2).clone(), (*tx3).clone()], &consensus);
let _ = unconfirmed_pool.remove_published_and_discard_double_spends(&published_block);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig),
true
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig),
false
);
assert_eq!(
unconfirmed_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig),
false
);
assert!(unconfirmed_pool.check_status());
}
}