use self::core::core::hash::{Hash, Hashed};
use self::core::core::id::ShortId;
use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting};
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use epic_core as core;
use epic_util as util;
use std::collections::VecDeque;
use std::sync::Arc;
pub struct TransactionPool<B, P>
where
B: BlockChain,
P: PoolAdapter,
{
pub config: PoolConfig,
pub txpool: Pool<B>,
pub stempool: Pool<B>,
pub reorg_cache: Arc<RwLock<VecDeque<PoolEntry>>>,
pub blockchain: Arc<B>,
pub adapter: Arc<P>,
}
impl<B, P> TransactionPool<B, P>
where
B: BlockChain,
P: PoolAdapter,
{
pub fn new(config: PoolConfig, chain: Arc<B>, adapter: Arc<P>) -> Self {
TransactionPool {
config,
txpool: Pool::new(chain.clone(), "txpool".to_string()),
stempool: Pool::new(chain.clone(), "stempool".to_string()),
reorg_cache: Arc::new(RwLock::new(VecDeque::new())),
blockchain: chain,
adapter,
}
}
pub fn chain_head(&self) -> Result<BlockHeader, PoolError> {
self.blockchain.chain_head()
}
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;
Ok(())
}
fn add_to_reorg_cache(&mut self, entry: PoolEntry) {
let mut cache = self.reorg_cache.write();
cache.push_back(entry);
if cache.len() > self.config.max_pool_size {
let _ = cache.pop_front();
}
debug!("added tx to reorg_cache: size now {}", cache.len());
}
fn add_to_txpool(
&mut self,
mut entry: PoolEntry,
header: &BlockHeader,
) -> Result<(), PoolError> {
if entry.tx.kernels().len() > 1 {
let txs = self.txpool.find_matching_transactions(entry.tx.kernels());
if !txs.is_empty() {
let tx = transaction::deaggregate(entry.tx, txs)?;
tx.validate(Weighting::AsTransaction)?;
entry.tx = tx;
entry.src = TxSource::Deaggregate;
}
}
self.txpool.add_to_pool(entry.clone(), vec![], header)?;
{
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}
Ok(())
}
pub fn add_to_pool(
&mut self,
src: TxSource,
tx: Transaction,
stem: bool,
header: &BlockHeader,
) -> Result<(), PoolError> {
if !stem && self.txpool.contains_tx(tx.hash()) {
return Err(PoolError::DuplicateTx);
}
let acceptability = self.is_acceptable(&tx, stem);
let mut evict = false;
if !stem && acceptability.as_ref().err() == Some(&PoolError::OverCapacity) {
evict = true;
} else if acceptability.is_err() {
return acceptability;
}
tx.validate(Weighting::AsTransaction)
.map_err(PoolError::InvalidTx)?;
self.blockchain.verify_tx_lock_height(&tx)?;
self.blockchain.verify_coinbase_maturity(&tx)?;
let entry = PoolEntry {
src,
tx_at: Utc::now(),
tx,
};
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry))
.is_err()
{
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry);
}
if evict {
self.evict_from_txpool();
}
Ok(())
}
pub fn evict_from_txpool(&mut self) {
let bucket_transactions = self.txpool.bucket_transactions(Weighting::NoLimit);
match bucket_transactions.last() {
Some(evictable_transaction) => {
self.txpool.entries = self
.txpool
.entries
.iter()
.filter(|x| x.tx != *evictable_transaction)
.map(|x| x.clone())
.collect::<Vec<_>>();
}
None => (),
}
}
pub fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
let mut cache = self.reorg_cache.write();
while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
let _ = cache.pop_front();
}
debug!("truncate_reorg_cache: size: {}", cache.len());
}
pub fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
debug!(
"reconcile_reorg_cache: size: {}, block: {:?} ...",
entries.len(),
header.hash(),
);
for entry in entries {
let _ = &self.add_to_txpool(entry.clone(), header);
}
debug!(
"reconcile_reorg_cache: block: {:?} ... done.",
header.hash()
);
Ok(())
}
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> {
self.txpool.reconcile_block(block);
self.txpool.reconcile(None, &block.header)?;
self.stempool.reconcile_block(block);
{
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, &block.header)?;
}
Ok(())
}
pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option<Transaction> {
self.txpool.retrieve_tx_by_kernel_hash(hash)
}
pub fn retrieve_transactions(
&self,
hash: Hash,
nonce: u64,
kern_ids: &[ShortId],
) -> (Vec<Transaction>, Vec<ShortId>) {
self.txpool.retrieve_transactions(hash, nonce, kern_ids)
}
fn is_acceptable(&self, tx: &Transaction, stem: bool) -> Result<(), PoolError> {
if self.total_size() > self.config.max_pool_size {
return Err(PoolError::OverCapacity);
}
if stem && self.stempool.size() > self.config.max_stempool_size {
return Err(PoolError::OverCapacity);
} else if self.total_size() > self.config.max_pool_size {
return Err(PoolError::OverCapacity);
}
if self.config.accept_fee_base > 0 {
let threshold = (tx.tx_weight() as u64) * self.config.accept_fee_base;
if tx.fee() < threshold {
return Err(PoolError::LowFeeTransaction(threshold));
}
}
Ok(())
}
pub fn total_size(&self) -> usize {
self.txpool.size()
}
pub fn prepare_mineable_transactions(&self) -> Result<Vec<Transaction>, PoolError> {
self.txpool
.prepare_mineable_transactions(self.config.mineable_max_weight)
}
}