#[macro_use]
extern crate log;
extern crate nimiq_account as account;
extern crate nimiq_accounts as accounts;
extern crate nimiq_block as block;
extern crate nimiq_blockchain as blockchain;
extern crate nimiq_collections as collections;
extern crate nimiq_hash as hash;
extern crate nimiq_keys as keys;
extern crate nimiq_primitives as primitives;
extern crate nimiq_transaction as transaction;
extern crate nimiq_utils as utils;
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use account::AccountTransactionInteraction;
use accounts::Accounts;
use beserial::Serialize;
use block::Block;
use blockchain::{Blockchain, BlockchainEvent};
use hash::{Blake2bHash, Hash};
use keys::Address;
use transaction::{Transaction, TransactionFlags};
use utils::observer::Notifier;
use crate::filter::{MempoolFilter, Rules};
pub mod filter;
pub struct Mempool<'env> {
blockchain: Arc<Blockchain<'env>>,
pub notifier: RwLock<Notifier<'env, MempoolEvent>>,
state: RwLock<MempoolState>,
mut_lock: Mutex<()>,
}
struct MempoolState {
transactions_by_hash: HashMap<Blake2bHash, Arc<Transaction>>,
transactions_by_sender: HashMap<Address, BTreeSet<Arc<Transaction>>>,
transactions_by_recipient: HashMap<Address, BTreeSet<Arc<Transaction>>>,
transactions_sorted_fee: BTreeSet<Arc<Transaction>>,
filter: MempoolFilter,
}
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub enum MempoolEvent {
TransactionAdded(Blake2bHash, Arc<Transaction>),
TransactionRestored(Arc<Transaction>),
TransactionMined(Arc<Transaction>),
TransactionEvicted(Arc<Transaction>),
}
#[derive(Debug, Clone)]
pub struct MempoolConfig {
pub filter_rules: Rules,
pub filter_limit: usize,
}
impl Default for MempoolConfig {
fn default() -> MempoolConfig {
MempoolConfig {
filter_rules: Rules::default(),
filter_limit: MempoolFilter::DEFAULT_BLACKLIST_SIZE
}
}
}
impl<'env> Mempool<'env> {
pub fn new(blockchain: Arc<Blockchain<'env>>, config: MempoolConfig) -> Arc<Self> {
let arc = Arc::new(Self {
blockchain: blockchain.clone(),
notifier: RwLock::new(Notifier::new()),
state: RwLock::new(MempoolState {
transactions_by_hash: HashMap::new(),
transactions_by_sender: HashMap::new(),
transactions_by_recipient: HashMap::new(),
transactions_sorted_fee: BTreeSet::new(),
filter: MempoolFilter::new(config.filter_rules, config.filter_limit),
}),
mut_lock: Mutex::new(()),
});
let arc_self = arc.clone();
blockchain.notifier.write().register(move |event: &BlockchainEvent| arc_self.on_blockchain_event(event));
arc
}
pub fn is_filtered(&self, hash: &Blake2bHash) -> bool {
self.state.read().filter.blacklisted(hash)
}
pub fn push_transaction(&self, mut transaction: Transaction) -> ReturnCode {
let hash: Blake2bHash = transaction.hash();
let _push_lock = self.blockchain.push_lock.lock();
let _lock = self.mut_lock.lock();
let mut txs_to_remove = Vec::new();
{
let state = self.state.upgradable_read();
if !state.filter.accepts_transaction(&transaction) || state.filter.blacklisted(&hash) {
let mut state = RwLockUpgradableReadGuard::upgrade(state);
state.filter.blacklist(hash);
trace!("Transaction was filtered: {}", transaction.hash::<Blake2bHash>());
return ReturnCode::Filtered;
}
if state.transactions_by_hash.contains_key(&hash) {
return ReturnCode::Known;
};
if transaction.verify_mut(self.blockchain.network_id).is_err() {
return ReturnCode::Invalid;
}
let txs_by_sender_opt = state.transactions_by_sender.get(&transaction.sender);
if transaction.fee_per_byte() < TRANSACTION_RELAY_FEE_MIN {
let mut num_free_tx = 0;
if let Some(transactions) = txs_by_sender_opt {
for tx in transactions {
if tx.fee_per_byte() < TRANSACTION_RELAY_FEE_MIN {
num_free_tx += 1;
if num_free_tx >= FREE_TRANSACTIONS_PER_SENDER_MAX {
return ReturnCode::FeeTooLow;
}
} else {
break;
}
}
}
}
let recipient_account;
let mut sender_account;
let block_height;
{
let blockchain_state = self.blockchain.state();
let accounts = blockchain_state.accounts();
let transaction_cache = blockchain_state.transaction_cache();
block_height = blockchain_state.main_chain().head.header.height + 1;
if !transaction.is_valid_at(block_height) {
return ReturnCode::Invalid;
}
if transaction_cache.contains(&hash) {
return ReturnCode::Invalid;
}
recipient_account = accounts.get(&transaction.recipient, None);
let is_contract_creation = transaction.flags.contains(TransactionFlags::CONTRACT_CREATION);
let is_type_change = recipient_account.account_type() != transaction.recipient_type;
if is_contract_creation != is_type_change {
return ReturnCode::Invalid;
}
match recipient_account.with_incoming_transaction(&transaction, block_height) {
Err(_) => return ReturnCode::Invalid,
Ok(r) => {
if !state.filter.accepts_recipient_account(&transaction, &recipient_account, &r) {
self.state.write().filter.blacklist(hash);
return ReturnCode::Filtered;
}
}
}
sender_account = accounts.get(&transaction.sender, None);
if sender_account.account_type() != transaction.sender_type {
return ReturnCode::Invalid;
}
}
let empty_btree;
let mut tx_count = 0;
let mut tx_iter = match txs_by_sender_opt {
Some(transactions) => transactions.iter(),
None => {
empty_btree = BTreeSet::new();
empty_btree.iter()
}
};
let mut tx_opt = tx_iter.next_back();
while let Some(tx) = tx_opt {
if transaction.cmp(tx) == Ordering::Greater {
break;
}
sender_account = match sender_account
.with_outgoing_transaction(tx, block_height) {
Ok(s) => s,
Err(_) => return ReturnCode::Invalid
};
tx_count += 1;
tx_opt = tx_iter.next_back();
}
if tx_count >= TRANSACTIONS_PER_SENDER_MAX {
return ReturnCode::FeeTooLow;
}
let old_sender_account = sender_account.clone();
sender_account = match sender_account.with_outgoing_transaction(&transaction, block_height) {
Ok(account) => account,
Err(_) => return ReturnCode::Invalid
};
if !state.filter.accepts_sender_account(&transaction, &old_sender_account, &sender_account) {
self.state.write().filter.blacklist(hash);
return ReturnCode::Filtered;
}
tx_count += 1;
while let Some(tx) = tx_opt {
if tx_count < TRANSACTIONS_PER_SENDER_MAX {
if let Ok(account) = sender_account.with_outgoing_transaction(tx, block_height) {
sender_account = account;
tx_count += 1;
} else {
txs_to_remove.push(tx.clone())
}
} else {
txs_to_remove.push(tx.clone())
}
tx_opt = tx_iter.next_back();
}
}
let tx_arc = Arc::new(transaction);
let mut removed_transactions;
{
let mut state = self.state.write();
Mempool::add_transaction(&mut state, hash.clone(), tx_arc.clone());
for tx in txs_to_remove.iter() {
Mempool::remove_transaction(&mut *state, tx);
}
removed_transactions = txs_to_remove;
if state.transactions_sorted_fee.len() > SIZE_MAX {
let tx = state.transactions_sorted_fee.iter().next().unwrap().clone();
Mempool::remove_transaction(&mut state, &tx);
removed_transactions.push(tx);
}
}
drop(_push_lock);
self.notifier.read().notify(MempoolEvent::TransactionAdded(hash, tx_arc));
for tx in removed_transactions {
self.notifier.read().notify(MempoolEvent::TransactionEvicted(tx));
}
ReturnCode::Accepted
}
pub fn contains(&self, hash: &Blake2bHash) -> bool {
self.state.read().transactions_by_hash.contains_key(hash)
}
pub fn get_transaction(&self, hash: &Blake2bHash) -> Option<Arc<Transaction>> {
self.state.read().transactions_by_hash.get(hash).cloned()
}
pub fn get_transactions(&self, max_count: usize, min_fee_per_byte: f64) -> Vec<Arc<Transaction>> {
self.state.read().transactions_sorted_fee.iter()
.filter(|tx| tx.fee_per_byte() >= min_fee_per_byte)
.take(max_count)
.cloned()
.collect()
}
pub fn get_transactions_for_block(&self, max_size: usize) -> Vec<Transaction> {
let mut txs = Vec::new();
let mut size = 0;
let state = self.state.read();
for tx in state.transactions_sorted_fee.iter() {
let tx_size = tx.serialized_size();
if size + tx_size <= max_size {
txs.push(Transaction::clone(tx));
size += tx_size;
} else if max_size - size < Transaction::MIN_SIZE {
break;
}
};
txs
}
pub fn get_transactions_by_addresses(&self, addresses: HashSet<Address>, max_count: usize) -> Vec<Arc<Transaction>> {
let mut txs = Vec::new();
let state = self.state.read();
for address in addresses {
if let Some(transactions) = state.transactions_by_sender.get(&address) {
for tx in transactions.iter().rev().take(max_count - txs.len()) {
txs.push(Arc::clone(tx));
}
}
if let Some(transactions) = state.transactions_by_recipient.get(&address) {
for tx in transactions.iter().rev().take(max_count - txs.len()) {
txs.push(Arc::clone(tx));
}
}
if txs.len() >= max_count {
break
};
}
txs
}
fn on_blockchain_event(&self, event: &BlockchainEvent) {
match event {
BlockchainEvent::Extended(_) => self.evict_transactions(),
BlockchainEvent::Rebranched(reverted_blocks, _) => {
self.restore_transactions(reverted_blocks);
self.evict_transactions();
},
}
}
fn evict_transactions(&self) {
let _lock = self.mut_lock.lock();
let mut txs_mined = Vec::new();
let mut txs_evicted = Vec::new();
{
let state = self.state.read();
let blockchain_state = self.blockchain.state();
let accounts = blockchain_state.accounts();
let transaction_cache = blockchain_state.transaction_cache();
let block_height = blockchain_state.main_chain().head.header.height + 1;
for (address, transactions) in state.transactions_by_sender.iter() {
let mut sender_account = accounts.get(&address, None);
for tx in transactions.iter().rev() {
if !tx.is_valid_at(block_height) {
txs_evicted.push(tx.clone());
continue;
}
if transaction_cache.contains(&tx.hash()) {
txs_mined.push(tx.clone());
continue;
}
let recipient_account = accounts.get(&tx.recipient, None);
if recipient_account.with_incoming_transaction(&tx, block_height).is_err() {
txs_evicted.push(tx.clone());
continue;
}
let sender_account_res = sender_account.with_outgoing_transaction(&tx, block_height);
if sender_account_res.is_err() {
txs_evicted.push(tx.clone());
continue;
}
sender_account = sender_account_res.unwrap();
}
}
}
{
let mut state = self.state.write();
for tx in txs_mined.iter() {
Mempool::remove_transaction(&mut state, tx);
}
for tx in txs_evicted.iter() {
Mempool::remove_transaction(&mut state, tx);
}
}
for tx in txs_mined {
self.notifier.read().notify(MempoolEvent::TransactionMined(tx));
}
for tx in txs_evicted {
self.notifier.read().notify(MempoolEvent::TransactionEvicted(tx));
}
}
fn restore_transactions(&self, reverted_blocks: &[(Blake2bHash, Block)]) {
let _lock = self.mut_lock.lock();
let mut removed_transactions = Vec::new();
let mut restored_transactions = Vec::new();
{
let blockchain_state = self.blockchain.state();
let accounts = blockchain_state.accounts();
let transaction_cache = blockchain_state.transaction_cache();
let block_height = blockchain_state.main_chain().head.header.height + 1;
let mut txs_by_sender = HashMap::new();
for (_, block) in reverted_blocks {
for tx in block.body.as_ref().unwrap().transactions.iter() {
if !tx.is_valid_at(block_height) {
continue;
}
if transaction_cache.contains(&tx.hash()) {
continue;
}
let recipient_account = accounts.get(&tx.recipient, None);
if recipient_account.with_incoming_transaction(&tx, block_height).is_err() {
continue;
}
let txs = txs_by_sender
.entry(&tx.sender)
.or_insert_with(BTreeSet::new);
txs.insert(tx);
}
}
{
let mut state = self.state.write();
for (sender, restored_txs) in txs_by_sender {
let empty_btree;
let existing_txs = match state.transactions_by_sender.get(&sender) {
Some(txs) => txs,
None => {
empty_btree = BTreeSet::new();
&empty_btree
}
};
let (txs_to_add, txs_to_remove) = Mempool::merge_transactions(&accounts, sender, block_height, existing_txs, &restored_txs);
for tx in txs_to_add {
let transaction = Arc::new(tx.clone());
Mempool::add_transaction(&mut state, tx.hash(), transaction.clone());
restored_transactions.push(transaction);
}
for tx in txs_to_remove {
Mempool::remove_transaction(&mut state, &tx);
removed_transactions.push(tx);
}
}
let size = state.transactions_sorted_fee.len();
if size > SIZE_MAX {
let mut txs_to_remove = Vec::with_capacity(size - SIZE_MAX);
let mut iter = state.transactions_sorted_fee.iter();
for _ in 0..size - SIZE_MAX {
txs_to_remove.push(iter.next().unwrap().clone());
}
for tx in txs_to_remove.iter() {
Mempool::remove_transaction(&mut state, tx);
}
removed_transactions.extend(txs_to_remove);
}
}
}
for tx in removed_transactions {
self.notifier.read().notify(MempoolEvent::TransactionEvicted(tx));
}
for tx in restored_transactions {
self.notifier.read().notify(MempoolEvent::TransactionRestored(tx));
}
}
fn add_transaction(state: &mut MempoolState, hash: Blake2bHash, tx: Arc<Transaction>) {
state.transactions_by_hash.insert(hash, tx.clone());
state.transactions_sorted_fee.insert(tx.clone());
let txs_by_recipient = state.transactions_by_recipient
.entry(tx.recipient.clone())
.or_insert_with(BTreeSet::new);
txs_by_recipient.insert(tx.clone());
let txs_by_sender = state.transactions_by_sender
.entry(tx.sender.clone())
.or_insert_with(BTreeSet::new);
txs_by_sender.insert(tx.clone());
}
fn remove_transaction(state: &mut MempoolState, tx: &Transaction) {
state.transactions_by_hash.remove(&tx.hash());
state.transactions_sorted_fee.remove(tx);
let mut remove_key = false;
if let Some(transactions) = state.transactions_by_sender.get_mut(&tx.sender) {
transactions.remove(tx);
remove_key = transactions.is_empty();
}
if remove_key {
state.transactions_by_sender.remove(&tx.sender);
}
remove_key = false;
if let Some(transactions) = state.transactions_by_recipient.get_mut(&tx.recipient) {
transactions.remove(tx);
remove_key = transactions.is_empty();
}
if remove_key {
state.transactions_by_recipient.remove(&tx.recipient);
}
}
fn merge_transactions<'a>(accounts: &Accounts, sender: &Address, block_height: u32, old_txs: &BTreeSet<Arc<Transaction>>, new_txs: &BTreeSet<&'a Transaction>) -> (Vec<&'a Transaction>, Vec<Arc<Transaction>>) {
let mut txs_to_add = Vec::new();
let mut txs_to_remove = Vec::new();
let mut sender_account = accounts.get(sender, None);
let mut tx_count = 0;
let mut iter_old = old_txs.iter();
let mut iter_new = new_txs.iter();
let mut old_tx = iter_old.next_back();
let mut new_tx = iter_new.next_back();
while old_tx.is_some() || new_tx.is_some() {
let new_is_next = match (old_tx, new_tx) {
(Some(_), None) => false,
(None, Some(_)) => true,
(Some(txc), Some(txn)) => txn.cmp(&txc.as_ref()) == Ordering::Greater,
(None, None) => unreachable!()
};
if new_is_next {
if tx_count < TRANSACTIONS_PER_SENDER_MAX {
let tx = new_tx.unwrap();
if let Ok(account) = sender_account.with_outgoing_transaction(*tx, block_height) {
sender_account = account;
tx_count += 1;
txs_to_add.push(*tx)
}
}
new_tx = iter_new.next_back();
} else {
let tx = old_tx.unwrap();
if tx_count < TRANSACTIONS_PER_SENDER_MAX {
if let Ok(account) = sender_account.with_outgoing_transaction(tx, block_height) {
sender_account = account;
tx_count += 1;
} else {
txs_to_remove.push(tx.clone())
}
} else {
txs_to_remove.push(tx.clone())
}
old_tx = iter_old.next_back();
}
}
(txs_to_add, txs_to_remove)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ReturnCode {
FeeTooLow,
Invalid,
Accepted,
Known,
Filtered,
}
const TRANSACTION_RELAY_FEE_MIN : f64 = 1f64;
const TRANSACTIONS_PER_SENDER_MAX : u32 = 500;
const FREE_TRANSACTIONS_PER_SENDER_MAX : u32 = 10;
pub const SIZE_MAX : usize = 100_000;