use self::core::core::hash::{Hash, Hashed};
use self::core::core::id::ShortId;
use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{
transaction, Block, BlockHeader, HeaderVersion, OutputIdentifier, Transaction, Weighting,
};
use self::core::global;
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use grin_core as core;
use grin_util as util;
use std::collections::VecDeque;
use std::sync::Arc;
pub struct TransactionPool<B, P, V>
where
B: BlockChain,
P: PoolAdapter,
V: VerifierCache,
{
pub config: PoolConfig,
pub txpool: Pool<B, V>,
pub stempool: Pool<B, V>,
pub reorg_cache: Arc<RwLock<VecDeque<PoolEntry>>>,
pub blockchain: Arc<B>,
pub verifier_cache: Arc<RwLock<V>>,
pub adapter: Arc<P>,
}
impl<B, P, V> TransactionPool<B, P, V>
where
B: BlockChain,
P: PoolAdapter,
V: VerifierCache + 'static,
{
pub fn new(
config: PoolConfig,
chain: Arc<B>,
verifier_cache: Arc<RwLock<V>>,
adapter: Arc<P>,
) -> Self {
TransactionPool {
config,
txpool: Pool::new(chain.clone(), verifier_cache.clone(), "txpool".to_string()),
stempool: Pool::new(
chain.clone(),
verifier_cache.clone(),
"stempool".to_string(),
),
reorg_cache: Arc::new(RwLock::new(VecDeque::new())),
blockchain: chain,
verifier_cache,
adapter,
}
}
pub fn chain_head(&self) -> Result<BlockHeader, PoolError> {
self.blockchain.chain_head()
}
fn add_to_stempool(
&mut self,
entry: &PoolEntry,
header: &BlockHeader,
extra_tx: Option<Transaction>,
) -> Result<(), PoolError> {
self.stempool.add_to_pool(entry.clone(), extra_tx, header)
}
fn add_to_reorg_cache(&mut self, entry: &PoolEntry) {
let mut cache = self.reorg_cache.write();
cache.push_back(entry.clone());
if cache.len() > self.config.max_pool_size {
let _ = cache.pop_front();
}
debug!("added tx to reorg_cache: size now {}", cache.len());
}
fn deaggregate_tx(&self, entry: PoolEntry) -> Result<PoolEntry, PoolError> {
if entry.tx.kernels().len() > 1 {
let txs = self.txpool.find_matching_transactions(entry.tx.kernels());
if !txs.is_empty() {
let tx = transaction::deaggregate(entry.tx, &txs)?;
return Ok(PoolEntry::new(tx, TxSource::Deaggregate));
}
}
Ok(entry)
}
fn add_to_txpool(&mut self, entry: &PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
self.txpool.add_to_pool(entry.clone(), None, header)?;
let txpool_agg = self.txpool.all_transactions_aggregate(None)?;
self.stempool.reconcile(txpool_agg, header)?;
Ok(())
}
fn verify_kernel_variants(
&self,
tx: &Transaction,
header: &BlockHeader,
) -> Result<(), PoolError> {
if tx.kernels().iter().any(|k| k.is_nrd()) {
if !global::is_nrd_enabled() {
return Err(PoolError::NRDKernelNotEnabled);
}
if header.version < HeaderVersion(4) {
return Err(PoolError::NRDKernelPreHF3);
}
}
Ok(())
}
pub fn add_to_pool(
&mut self,
src: TxSource,
tx: Transaction,
stem: bool,
header: &BlockHeader,
) -> Result<(), PoolError> {
if stem && self.stempool.contains_tx(&tx) {
return self.add_to_pool(src, tx, false, header);
} else if self.txpool.contains_tx(&tx) {
return Err(PoolError::DuplicateTx);
}
let entry = if stem {
PoolEntry::new(tx, src)
} else {
self.deaggregate_tx(PoolEntry::new(tx, src))?
};
let ref tx = entry.tx;
self.verify_kernel_variants(tx, header)?;
let acceptability = self.is_acceptable(tx, stem);
let mut evict = false;
if !stem && acceptability.as_ref().err() == Some(&PoolError::OverCapacity) {
evict = true;
} else if acceptability.is_err() {
return acceptability;
}
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())
.map_err(PoolError::InvalidTx)?;
self.blockchain.verify_tx_lock_height(tx)?;
let extra_tx = if stem {
self.txpool.all_transactions_aggregate(None)?
} else {
None
};
let (spent_pool, spent_utxo) = if stem {
self.stempool.locate_spends(tx, extra_tx.clone())
} else {
self.txpool.locate_spends(tx, None)
}?;
let coinbase_inputs: Vec<_> = spent_utxo
.iter()
.filter(|x| x.is_coinbase())
.cloned()
.collect();
self.blockchain
.verify_coinbase_maturity(&coinbase_inputs.as_slice().into())?;
let ref entry = self.convert_tx_v2(entry, &spent_pool, &spent_utxo)?;
if stem {
self.add_to_stempool(entry, header, extra_tx)?;
if self.adapter.stem_tx_accepted(entry).is_ok() {
return Ok(());
}
}
self.add_to_txpool(entry, header)?;
self.add_to_reorg_cache(entry);
self.adapter.tx_accepted(entry);
if evict {
self.evict_from_txpool();
}
Ok(())
}
fn convert_tx_v2(
&self,
entry: PoolEntry,
spent_pool: &[OutputIdentifier],
spent_utxo: &[OutputIdentifier],
) -> Result<PoolEntry, PoolError> {
let tx = entry.tx;
debug!(
"convert_tx_v2: {} ({} -> v2)",
tx.hash(),
tx.inputs().version_str(),
);
let mut inputs = spent_utxo.to_vec();
inputs.extend_from_slice(spent_pool);
inputs.sort_unstable();
let tx = Transaction {
body: tx.body.replace_inputs(inputs.as_slice().into()),
..tx
};
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?;
Ok(PoolEntry::new(tx, entry.src))
}
pub fn evict_from_txpool(&mut self) {
self.txpool.evict_transaction()
}
pub fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
let mut cache = self.reorg_cache.write();
while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
let _ = cache.pop_front();
}
debug!("truncate_reorg_cache: size: {}", cache.len());
}
pub fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
debug!(
"reconcile_reorg_cache: size: {}, block: {:?} ...",
entries.len(),
header.hash(),
);
for entry in entries {
let _ = self.add_to_txpool(&entry, header);
}
debug!(
"reconcile_reorg_cache: block: {:?} ... done.",
header.hash()
);
Ok(())
}
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> {
self.txpool.reconcile_block(block);
self.txpool.reconcile(None, &block.header)?;
self.stempool.reconcile_block(block);
{
let txpool_tx = self.txpool.all_transactions_aggregate(None)?;
self.stempool.reconcile(txpool_tx, &block.header)?;
}
Ok(())
}
pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option<Transaction> {
self.txpool.retrieve_tx_by_kernel_hash(hash)
}
pub fn retrieve_transactions(
&self,
hash: Hash,
nonce: u64,
kern_ids: &[ShortId],
) -> (Vec<Transaction>, Vec<ShortId>) {
self.txpool.retrieve_transactions(hash, nonce, kern_ids)
}
fn is_acceptable(&self, tx: &Transaction, stem: bool) -> Result<(), PoolError> {
if self.total_size() > self.config.max_pool_size {
return Err(PoolError::OverCapacity);
}
if stem && self.stempool.size() > self.config.max_stempool_size
|| self.total_size() > self.config.max_pool_size
{
return Err(PoolError::OverCapacity);
}
if self.config.accept_fee_base > 0 {
let threshold = (tx.tx_weight() as u64) * self.config.accept_fee_base;
if tx.fee() < threshold {
return Err(PoolError::LowFeeTransaction(threshold));
}
}
Ok(())
}
pub fn total_size(&self) -> usize {
self.txpool.size()
}
pub fn prepare_mineable_transactions(&self) -> Result<Vec<Transaction>, PoolError> {
self.txpool
.prepare_mineable_transactions(self.config.mineable_max_weight)
}
}