use self::core::core::hash::{Hash, Hashed};
use self::core::core::id::{ShortId, ShortIdentifiable};
use self::core::core::transaction;
use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
};
use crate::types::{BlockChain, PoolEntry, PoolError};
use epic_core as core;
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
pub struct Pool<B>
where
B: BlockChain,
{
pub entries: Vec<PoolEntry>,
pub blockchain: Arc<B>,
pub name: String,
}
impl<B> Pool<B>
where
B: BlockChain,
{
pub fn new(chain: Arc<B>, name: String) -> Self {
Pool {
entries: vec![],
blockchain: chain,
name,
}
}
pub fn contains_tx(&self, hash: Hash) -> bool {
self.entries.iter().any(|x| x.tx.hash() == hash)
}
pub fn get_tx(&self, hash: Hash) -> Option<Transaction> {
self.entries
.iter()
.find(|x| x.tx.hash() == hash)
.map(|x| x.tx.clone())
}
pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option<Transaction> {
for x in &self.entries {
for k in x.tx.kernels() {
if k.hash() == hash {
return Some(x.tx.clone());
}
}
}
None
}
pub fn retrieve_transactions(
&self,
hash: Hash,
nonce: u64,
kern_ids: &[ShortId],
) -> (Vec<Transaction>, Vec<ShortId>) {
let mut txs = vec![];
let mut found_ids = vec![];
'outer: for x in &self.entries {
for k in x.tx.kernels() {
let short_id = k.short_id(&hash, nonce);
if kern_ids.contains(&short_id) {
txs.push(x.tx.clone());
found_ids.push(short_id);
}
if found_ids.len() == kern_ids.len() {
break 'outer;
}
}
}
txs.dedup();
(
txs,
kern_ids
.into_iter()
.filter(|id| !found_ids.contains(id))
.cloned()
.collect(),
)
}
pub fn prepare_mineable_transactions(
&self,
max_weight: usize,
) -> Result<Vec<Transaction>, PoolError> {
let weighting = Weighting::AsLimitedTransaction(max_weight);
let txs = self.bucket_transactions(weighting);
let header = self.blockchain.chain_head()?;
let valid_txs = self.validate_raw_txs(&txs, None, &header, weighting)?;
Ok(valid_txs)
}
pub fn all_transactions(&self) -> Vec<Transaction> {
self.entries.iter().map(|x| x.tx.clone()).collect()
}
pub fn all_transactions_aggregate(&self) -> Result<Option<Transaction>, PoolError> {
let txs = self.all_transactions();
if txs.is_empty() {
return Ok(None);
}
let tx = transaction::aggregate(txs)?;
tx.validate(Weighting::NoLimit)?;
Ok(Some(tx))
}
pub fn add_to_pool(
&mut self,
entry: PoolEntry,
extra_txs: Vec<Transaction>,
header: &BlockHeader,
) -> Result<(), PoolError> {
let mut txs = self.all_transactions();
if txs.contains(&entry.tx) {
return Err(PoolError::DuplicateTx);
}
txs.extend(extra_txs);
let agg_tx = if txs.is_empty() {
entry.tx.clone()
} else {
txs.push(entry.tx.clone());
transaction::aggregate(txs)?
};
self.validate_raw_tx(&agg_tx, header, Weighting::NoLimit)?;
self.log_pool_add(&entry, header);
self.entries.push(entry);
Ok(())
}
fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) {
debug!(
"add_to_pool [{}]: {} ({:?}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
self.name,
entry.tx.hash(),
entry.src,
entry.tx.inputs().len(),
entry.tx.outputs().len(),
entry.tx.kernels().len(),
self.size(),
header.hash(),
);
}
fn validate_raw_tx(
&self,
tx: &Transaction,
header: &BlockHeader,
weighting: Weighting,
) -> Result<BlockSums, PoolError> {
tx.validate(weighting)?;
self.blockchain.validate_tx(tx)?;
let new_sums = self.apply_tx_to_block_sums(tx, header)?;
Ok(new_sums)
}
pub fn validate_raw_txs(
&self,
txs: &[Transaction],
extra_tx: Option<Transaction>,
header: &BlockHeader,
weighting: Weighting,
) -> Result<Vec<Transaction>, PoolError> {
let mut valid_txs = vec![];
for tx in txs {
let mut candidate_txs = vec![];
if let Some(extra_tx) = extra_tx.clone() {
candidate_txs.push(extra_tx);
};
candidate_txs.extend(valid_txs.clone());
candidate_txs.push(tx.clone());
let agg_tx = transaction::aggregate(candidate_txs)?;
if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() {
valid_txs.push(tx.clone());
}
}
Ok(valid_txs)
}
fn apply_tx_to_block_sums(
&self,
tx: &Transaction,
header: &BlockHeader,
) -> Result<BlockSums, PoolError> {
let overage = tx.overage();
let offset = (header.total_kernel_offset() + tx.offset.clone())?;
let block_sums = self.blockchain.get_block_sums(&header.hash())?;
let (utxo_sum, kernel_sum) =
(block_sums, tx as &dyn Committed).verify_kernel_sums(overage, offset)?;
Ok(BlockSums {
utxo_sum,
kernel_sum,
})
}
pub fn reconcile(
&mut self,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<(), PoolError> {
let existing_entries = self.entries.clone();
self.entries.clear();
let mut extra_txs = vec![];
if let Some(extra_tx) = extra_tx {
extra_txs.push(extra_tx);
}
for x in existing_entries {
let _ = self.add_to_pool(x, extra_txs.clone(), header);
}
Ok(())
}
pub fn bucket_transactions(&self, weighting: Weighting) -> Vec<Transaction> {
let mut tx_buckets: Vec<Bucket> = Vec::new();
let mut output_commits = HashMap::new();
let mut rejected = HashSet::new();
for entry in &self.entries {
let mut insert_pos = None;
let mut is_rejected = false;
for input in entry.tx.inputs() {
if rejected.contains(&input.commitment()) {
is_rejected = true;
continue;
} else if let Some(pos) = output_commits.get(&input.commitment()) {
if insert_pos.is_some() {
is_rejected = true;
continue;
} else {
insert_pos = Some(*pos);
}
}
}
if is_rejected {
for out in entry.tx.outputs() {
rejected.insert(out.commitment());
}
continue;
}
match insert_pos {
None => {
insert_pos = Some(tx_buckets.len());
tx_buckets.push(Bucket::new(entry.tx.clone(), tx_buckets.len()));
}
Some(pos) => {
let bucket = &tx_buckets[pos];
if let Ok(new_bucket) = bucket.aggregate_with_tx(entry.tx.clone(), weighting) {
if new_bucket.fee_to_weight >= bucket.fee_to_weight {
tx_buckets[pos] = new_bucket;
} else {
tx_buckets.push(Bucket::new(entry.tx.clone(), tx_buckets.len()));
}
} else {
is_rejected = true;
}
}
}
if is_rejected {
for out in entry.tx.outputs() {
rejected.insert(out.commitment());
}
} else if let Some(insert_pos) = insert_pos {
for out in entry.tx.outputs() {
output_commits.insert(out.commitment(), insert_pos);
}
}
}
tx_buckets.sort_unstable_by_key(|x| (Reverse(x.fee_to_weight), x.age_idx));
tx_buckets
.into_iter()
.map(|x| x.raw_txs)
.flatten()
.collect()
}
pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec<Transaction> {
let mut found_txs = vec![];
let kernel_set = kernels.into_iter().collect::<HashSet<_>>();
for entry in &self.entries {
let entry_kernel_set = entry.tx.kernels().iter().collect::<HashSet<_>>();
if entry_kernel_set.is_subset(&kernel_set) {
found_txs.push(entry.tx.clone());
}
}
found_txs
}
pub fn reconcile_block(&mut self, block: &Block) {
self.entries.retain(|x| {
!x.tx.kernels().iter().any(|y| block.kernels().contains(y))
&& !x.tx.inputs().iter().any(|y| block.inputs().contains(y))
});
}
pub fn size(&self) -> usize {
self.entries.len()
}
pub fn kernel_count(&self) -> usize {
self.entries.iter().map(|x| x.tx.kernels().len()).sum()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
struct Bucket {
raw_txs: Vec<Transaction>,
fee_to_weight: u64,
age_idx: usize,
}
impl Bucket {
fn new(tx: Transaction, age_idx: usize) -> Bucket {
Bucket {
fee_to_weight: tx.fee_to_weight(),
raw_txs: vec![tx.clone()],
age_idx,
}
}
fn aggregate_with_tx(
&self,
new_tx: Transaction,
weighting: Weighting,
) -> Result<Bucket, PoolError> {
let mut raw_txs = self.raw_txs.clone();
raw_txs.push(new_tx);
let agg_tx = transaction::aggregate(raw_txs.clone())?;
agg_tx.validate(weighting)?;
Ok(Bucket {
fee_to_weight: agg_tx.fee_to_weight(),
raw_txs: raw_txs,
age_idx: self.age_idx,
})
}
}