use alloc::{
collections::{BTreeSet, btree_set},
vec::Vec,
};
use core::{fmt, iter, mem, ops};
use hashbrown::HashSet;
mod tests;
pub use super::validate::ValidTransaction;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TransactionId(usize);
pub struct Config {
pub capacity: usize,
pub finalized_block_height: u64,
pub randomness_seed: [u8; 16],
}
pub struct Pool<TTx> {
transactions: slab::Slab<Transaction<TTx>>,
not_validated: HashSet<TransactionId, fnv::FnvBuildHasher>,
by_hash: BTreeSet<([u8; 32], TransactionId)>,
by_height: BTreeSet<(u64, TransactionId)>,
includable: BTreeSet<(u64, TransactionId)>,
by_validation_expiration: BTreeSet<(u64, TransactionId)>,
tags: hashbrown::HashMap<Vec<u8>, TagInfo, crate::util::SipHasherBuild>,
best_block_height: u64,
}
struct Transaction<TTx> {
scale_encoded: Vec<u8>,
validation: Option<(u64, ValidTransaction)>,
included_block_height: Option<u64>,
user_data: TTx,
}
struct TagInfo {
provided_by: hashbrown::HashSet<TransactionId, fnv::FnvBuildHasher>,
required_by: hashbrown::HashSet<TransactionId, fnv::FnvBuildHasher>,
known_to_be_included_in_chain: usize,
}
impl<TTx> Pool<TTx> {
pub fn new(config: Config) -> Self {
Pool {
transactions: slab::Slab::with_capacity(config.capacity),
not_validated: HashSet::with_capacity_and_hasher(config.capacity, Default::default()),
by_hash: BTreeSet::new(),
by_height: BTreeSet::new(),
includable: BTreeSet::new(),
by_validation_expiration: BTreeSet::new(),
tags: hashbrown::HashMap::with_capacity_and_hasher(
8,
crate::util::SipHasherBuild::new(config.randomness_seed),
),
best_block_height: config.finalized_block_height,
}
}
pub fn is_empty(&self) -> bool {
self.transactions.is_empty()
}
pub fn len(&self) -> usize {
self.transactions.len()
}
pub fn add_unvalidated(&mut self, scale_encoded: Vec<u8>, user_data: TTx) -> TransactionId {
self.add_unvalidated_inner(scale_encoded, None, user_data)
}
fn add_unvalidated_inner(
&mut self,
scale_encoded: impl AsRef<[u8]> + Into<Vec<u8>>,
included_block_height: Option<u64>,
user_data: TTx,
) -> TransactionId {
let hash = blake2_hash(scale_encoded.as_ref());
let tx_id = TransactionId(self.transactions.insert(Transaction {
scale_encoded: scale_encoded.into(),
validation: None,
included_block_height,
user_data,
}));
let _was_inserted = self.by_hash.insert((hash, tx_id));
debug_assert!(_was_inserted);
let _was_inserted = self.not_validated.insert(tx_id);
debug_assert!(_was_inserted);
if let Some(included_block_height) = included_block_height {
let _was_inserted = self.by_height.insert((included_block_height, tx_id));
debug_assert!(_was_inserted);
}
tx_id
}
#[track_caller]
pub fn remove(&mut self, id: TransactionId) -> TTx {
self.unvalidate_transaction(id);
let _removed = self.not_validated.remove(&id);
debug_assert!(_removed);
let tx = self.transactions.remove(id.0);
if let Some(included_block_height) = tx.included_block_height {
let _removed = self.by_height.remove(&(included_block_height, id));
debug_assert!(_removed);
}
let _removed = self.by_hash.remove(&(blake2_hash(&tx.scale_encoded), id));
debug_assert!(_removed);
tx.user_data
}
pub fn remove_included(
&mut self,
block_inferior_of_equal: u64,
) -> impl Iterator<Item = (TransactionId, TTx)> {
for tx_id in self
.by_height
.range(
(0, TransactionId(usize::MIN))
..=(block_inferior_of_equal, TransactionId(usize::MAX)),
)
.map(|(_, id)| *id)
.collect::<Vec<_>>()
{
self.unvalidate_transaction(tx_id);
}
let to_remove = {
let remaining_txs = self
.by_height
.split_off(&(block_inferior_of_equal + 1, TransactionId(usize::MIN)));
mem::replace(&mut self.by_height, remaining_txs)
};
struct ToRemoveIterator<'a, TTx> {
pool: &'a mut Pool<TTx>,
transactions: btree_set::IntoIter<(u64, TransactionId)>,
}
impl<'a, TTx> Iterator for ToRemoveIterator<'a, TTx>
where
btree_set::IntoIter<(u64, TransactionId)>: iter::FusedIterator,
{
type Item = (TransactionId, TTx);
fn next(&mut self) -> Option<Self::Item> {
let (_height, tx_id) = self.transactions.next()?;
let tx = self.pool.transactions.remove(tx_id.0);
debug_assert!(tx.validation.is_none());
debug_assert_eq!(tx.included_block_height, Some(_height));
let _removed = self
.pool
.by_hash
.remove(&(blake2_hash(&tx.scale_encoded), tx_id));
debug_assert!(_removed);
Some((tx_id, tx.user_data))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.transactions.size_hint()
}
}
impl<'a, TTx> ExactSizeIterator for ToRemoveIterator<'a, TTx> where
btree_set::IntoIter<(u64, TransactionId)>: ExactSizeIterator
{
}
impl<'a, TTx> Drop for ToRemoveIterator<'a, TTx> {
fn drop(&mut self) {
while self.next().is_some() {}
}
}
ToRemoveIterator {
pool: self,
transactions: to_remove.into_iter(),
}
}
pub fn unvalidated_transactions(
&self,
) -> impl ExactSizeIterator<Item = (TransactionId, &TTx, u64)> {
self.not_validated.iter().copied().map(move |tx_id| {
let tx = self.transactions.get(tx_id.0).unwrap();
let height = tx
.included_block_height
.map(|n| n.checked_sub(1).unwrap())
.unwrap_or(self.best_block_height);
(tx_id, &tx.user_data, height)
})
}
pub fn iter(&self) -> impl Iterator<Item = (TransactionId, &TTx)> {
self.transactions
.iter()
.map(|(id, tx)| (TransactionId(id), &tx.user_data))
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (TransactionId, &mut TTx)> {
self.transactions
.iter_mut()
.map(|(id, tx)| (TransactionId(id), &mut tx.user_data))
}
pub fn included_block_height(&self, id: TransactionId) -> Option<u64> {
self.transactions.get(id.0)?.included_block_height
}
pub fn scale_encoding(&self, id: TransactionId) -> Option<&[u8]> {
Some(&self.transactions.get(id.0)?.scale_encoded)
}
pub fn transactions_by_scale_encoding(
&self,
scale_encoded: &[u8],
) -> impl Iterator<Item = TransactionId> {
let hash = blake2_hash(scale_encoded);
self.by_hash
.range((hash, TransactionId(usize::MIN))..=(hash, TransactionId(usize::MAX)))
.map(|(_, tx_id)| *tx_id)
}
pub fn best_block_height(&self) -> u64 {
self.best_block_height
}
pub fn append_empty_block(&mut self) {
self.best_block_height = self.best_block_height.checked_add(1).unwrap();
for tx_id in self
.by_validation_expiration
.range(
(0, TransactionId(usize::MIN))
..=(self.best_block_height, TransactionId(usize::MAX)),
)
.map(|(_, id)| *id)
.collect::<Vec<_>>()
{
self.unvalidate_transaction(tx_id);
}
}
pub fn retract_blocks(
&mut self,
num_to_retract: u64,
) -> impl Iterator<Item = (TransactionId, u64)> {
debug_assert!(
self.by_height
.range((self.best_block_height + 1, TransactionId(usize::MIN),)..,)
.next()
.is_none()
);
self.best_block_height = self.best_block_height.checked_sub(num_to_retract).unwrap();
let transactions_to_retract = self
.by_height
.range((self.best_block_height + 1, TransactionId(usize::MIN))..)
.map(|(block_height, tx_id)| (*tx_id, *block_height))
.collect::<Vec<_>>();
for (transaction_id, _) in &transactions_to_retract {
self.unvalidate_transaction(*transaction_id);
let tx_data = self.transactions.get_mut(transaction_id.0).unwrap();
debug_assert!(tx_data.included_block_height.unwrap() > self.best_block_height);
self.by_height
.remove(&(tx_data.included_block_height.unwrap(), *transaction_id));
tx_data.included_block_height = None;
}
transactions_to_retract.into_iter().rev()
}
pub fn best_block_includable_transactions(
&self,
) -> impl Iterator<Item = (TransactionId, &TTx)> {
self.includable
.iter()
.rev()
.map(|(_, tx_id)| (*tx_id, &self.transactions[tx_id.0].user_data))
}
pub fn best_block_add_transaction_by_scale_encoding<'a, 'b>(
&'a mut self,
bytes: &'b [u8],
) -> AppendBlockTransaction<'a, 'b, TTx> {
let non_included = {
let hash = blake2_hash(bytes);
self.by_hash
.range((hash, TransactionId(usize::MIN))..=(hash, TransactionId(usize::MAX)))
.find(|(_, tx_id)| {
self.transactions
.get(tx_id.0)
.unwrap()
.included_block_height
.is_none()
})
.map(|(_, tx_id)| *tx_id)
};
if let Some(tx_id) = non_included {
debug_assert_eq!(self.transactions[tx_id.0].scale_encoded, bytes);
self.best_block_add_transaction_by_id(tx_id);
AppendBlockTransaction::NonIncludedUpdated {
id: tx_id,
user_data: &mut self.transactions[tx_id.0].user_data,
}
} else {
AppendBlockTransaction::Unknown(Vacant { inner: self, bytes })
}
}
pub fn best_block_add_transaction_by_id(&mut self, id: TransactionId) {
assert!(self.transactions[id.0].included_block_height.is_none());
let revalidation = if let Some(validation) = self.transactions[id.0].validation.as_ref() {
if validation.0 + 1 == self.best_block_height {
Some(validation.clone())
} else {
None
}
} else {
None
};
self.unvalidate_transaction(id);
self.transactions[id.0].included_block_height = Some(self.best_block_height);
self.by_height.insert((self.best_block_height, id));
if let Some((block_number_validated_against, result)) = revalidation {
self.set_validation_result(id, block_number_validated_against, result);
}
}
pub fn set_validation_result(
&mut self,
id: TransactionId,
block_number_validated_against: u64,
result: ValidTransaction,
) {
if self
.transactions
.get(id.0)
.unwrap()
.included_block_height
.map_or(false, |b| b != block_number_validated_against + 1)
{
return;
}
if self
.transactions
.get(id.0)
.unwrap()
.validation
.as_ref()
.map_or(false, |(b, _)| *b >= block_number_validated_against)
{
return;
}
self.unvalidate_transaction(id);
debug_assert!(self.transactions[id.0].validation.is_none());
let mut includable = self.transactions[id.0].included_block_height.is_none();
for tag in &result.provides {
let tag_info = self.tags.entry(tag.clone()).or_insert_with(|| TagInfo {
provided_by: Default::default(),
required_by: Default::default(),
known_to_be_included_in_chain: 0,
});
if self.transactions[id.0].included_block_height.is_some() {
tag_info.known_to_be_included_in_chain += 1;
if tag_info.known_to_be_included_in_chain == 1 {
for other_tx_id in &tag_info.provided_by {
let _was_in = self.includable.remove(&(
self.transactions[other_tx_id.0]
.validation
.as_ref()
.unwrap()
.1
.priority,
*other_tx_id,
));
debug_assert!(_was_in);
}
for other_tx_id in &tag_info.required_by {
let _was_inserted = self.includable.insert((
self.transactions[other_tx_id.0]
.validation
.as_ref()
.unwrap()
.1
.priority,
*other_tx_id,
));
debug_assert!(_was_inserted);
}
}
}
if tag_info.known_to_be_included_in_chain >= 1 {
includable = false;
}
tag_info.provided_by.insert(id);
}
for tag in &result.requires {
let tag_info = self.tags.entry(tag.clone()).or_insert_with(|| TagInfo {
provided_by: Default::default(),
required_by: Default::default(),
known_to_be_included_in_chain: 0,
});
tag_info.required_by.insert(id);
if tag_info.known_to_be_included_in_chain == 0 {
includable = false;
}
}
self.by_validation_expiration.insert((
block_number_validated_against.saturating_add(result.longevity.get()),
id,
));
if includable {
self.includable.insert((result.priority, id));
}
self.transactions[id.0].validation = Some((block_number_validated_against, result));
}
fn unvalidate_transaction(&mut self, tx_id: TransactionId) {
let Some((block_height_validated_against, validation)) =
self.transactions[tx_id.0].validation.take()
else {
return;
};
self.includable.remove(&(validation.priority, tx_id));
for tag in validation.provides {
let tag_info = self.tags.get_mut(&tag).unwrap();
let _was_in = tag_info.provided_by.remove(&tx_id);
debug_assert!(_was_in);
if self.transactions[tx_id.0].included_block_height.is_some() {
tag_info.known_to_be_included_in_chain -= 1;
if tag_info.known_to_be_included_in_chain == 0 {
for other_tx_id in &tag_info.provided_by {
let _was_inserted = self.includable.insert((
self.transactions[other_tx_id.0]
.validation
.as_ref()
.unwrap()
.1
.priority,
*other_tx_id,
));
debug_assert!(_was_inserted);
}
for other_tx_id in &tag_info.required_by {
let _was_in = self.includable.remove(&(
self.transactions[other_tx_id.0]
.validation
.as_ref()
.unwrap()
.1
.priority,
*other_tx_id,
));
debug_assert!(_was_in);
}
}
}
if tag_info.provided_by.is_empty() && tag_info.required_by.is_empty() {
self.tags.remove(&tag).unwrap();
}
}
for tag in validation.requires {
let tag_info = self.tags.get_mut(&tag).unwrap();
let _was_in = tag_info.required_by.remove(&tx_id);
debug_assert!(_was_in);
if tag_info.provided_by.is_empty() && tag_info.required_by.is_empty() {
self.tags.remove(&tag).unwrap();
}
}
self.not_validated.insert(tx_id);
let _was_in = self.by_validation_expiration.remove(&(
block_height_validated_against.saturating_add(validation.longevity.get()),
tx_id,
));
debug_assert!(_was_in);
}
}
impl<TTx> ops::Index<TransactionId> for Pool<TTx> {
type Output = TTx;
fn index(&self, index: TransactionId) -> &Self::Output {
&self.transactions[index.0].user_data
}
}
impl<TTx> ops::IndexMut<TransactionId> for Pool<TTx> {
fn index_mut(&mut self, index: TransactionId) -> &mut Self::Output {
&mut self.transactions[index.0].user_data
}
}
impl<TTx: fmt::Debug> fmt::Debug for Pool<TTx> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list()
.entries(
self.transactions
.iter()
.map(|t| (TransactionId(t.0), &t.1.user_data)),
)
.finish()
}
}
#[derive(Debug)]
pub enum AppendBlockTransaction<'a, 'b, TTx> {
Unknown(Vacant<'a, 'b, TTx>),
NonIncludedUpdated {
id: TransactionId,
user_data: &'a mut TTx,
},
}
pub struct Vacant<'a, 'b, TTx> {
inner: &'a mut Pool<TTx>,
bytes: &'b [u8],
}
impl<'a, 'b, TTx> Vacant<'a, 'b, TTx> {
pub fn insert(self, user_data: TTx) -> TransactionId {
self.inner
.add_unvalidated_inner(self.bytes, Some(self.inner.best_block_height), user_data)
}
}
impl<'a, 'b, TTx: fmt::Debug> fmt::Debug for Vacant<'a, 'b, TTx> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
}
}
fn blake2_hash(bytes: &[u8]) -> [u8; 32] {
<[u8; 32]>::try_from(blake2_rfc::blake2b::blake2b(32, &[], bytes).as_bytes()).unwrap()
}