use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap, hash_map::Entry},
num::NonZeroUsize,
};
use anyhow::{Result, bail};
use lru::LruCache;
use snarkvm::{ledger::Transaction, prelude::*};
use crate::{CAPACITY_FOR_DEPLOYMENTS, CAPACITY_FOR_EXECUTIONS};
pub struct TransactionsQueue<N: Network> {
pub deployments: TransactionsQueueInner<N>,
pub executions: TransactionsQueueInner<N>,
}
impl<N: Network> Default for TransactionsQueue<N> {
fn default() -> Self {
Self {
deployments: TransactionsQueueInner::new(CAPACITY_FOR_DEPLOYMENTS),
executions: TransactionsQueueInner::new(CAPACITY_FOR_EXECUTIONS),
}
}
}
impl<N: Network> TransactionsQueue<N> {
pub fn contains(&self, transaction_id: &N::TransactionID) -> bool {
self.executions.contains(transaction_id) || self.deployments.contains(transaction_id)
}
pub fn insert(
&mut self,
transaction_id: N::TransactionID,
transaction: Transaction<N>,
priority_fee: U64<N>,
) -> Result<()> {
if transaction.is_execute() {
self.executions.insert(transaction_id, transaction, priority_fee)
} else {
self.deployments.insert(transaction_id, transaction, priority_fee)
}
}
pub fn transactions(&self) -> impl Iterator<Item = (N::TransactionID, Transaction<N>)> + use<N> {
self.deployments
.priority_queue
.transactions
.clone()
.into_iter()
.chain(self.deployments.queue.clone())
.chain(self.executions.priority_queue.transactions.clone())
.chain(self.executions.queue.clone())
}
}
pub struct TransactionsQueueInner<N: Network> {
capacity: usize,
queue: LruCache<N::TransactionID, Transaction<N>>,
priority_queue: PriorityQueue<N>,
}
impl<N: Network> TransactionsQueueInner<N> {
fn new(capacity: usize) -> Self {
Self {
capacity,
queue: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
priority_queue: Default::default(),
}
}
pub fn len(&self) -> usize {
self.queue.len().saturating_add(self.priority_queue.len())
}
fn contains(&self, transaction_id: &N::TransactionID) -> bool {
self.queue.contains(transaction_id) || self.priority_queue.transactions.contains_key(transaction_id)
}
fn insert(
&mut self,
transaction_id: N::TransactionID,
transaction: Transaction<N>,
priority_fee: U64<N>,
) -> Result<()> {
if self.len() < self.capacity {
if priority_fee.is_zero() {
self.queue.get_or_insert(transaction_id, || transaction);
} else {
self.priority_queue.insert(transaction_id, transaction, priority_fee);
}
return Ok(());
}
match (self.priority_queue.len() < self.capacity, *priority_fee) {
(true, 0) => {
let _ = self.queue.get_or_insert(transaction_id, || transaction);
}
(true, _fee) => {
self.queue.pop_lru();
self.priority_queue.insert(transaction_id, transaction, priority_fee)
}
(false, 0) => bail!("The memory pool is full"),
(false, _fee) => self.priority_queue.compare_insert(transaction_id, transaction, priority_fee),
}
Ok(())
}
pub fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
self.priority_queue.pop().or_else(|| self.queue.pop_lru())
}
}
struct PriorityQueue<N: Network> {
counter: u64,
transaction_ids: BTreeMap<(Reverse<U64<N>>, u64), N::TransactionID>,
transactions: HashMap<N::TransactionID, Transaction<N>>,
}
impl<N: Network> Default for PriorityQueue<N> {
fn default() -> Self {
Self { counter: Default::default(), transaction_ids: Default::default(), transactions: Default::default() }
}
}
impl<N: Network> PriorityQueue<N> {
fn len(&self) -> usize {
self.transactions.len()
}
fn insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
if let Entry::Vacant(entry) = self.transactions.entry(transaction_id) {
entry.insert(transaction);
self.transaction_ids.insert((Reverse(fee), self.counter), transaction_id);
self.counter += 1;
}
}
fn compare_insert(&mut self, transaction_id: N::TransactionID, transaction: Transaction<N>, fee: U64<N>) {
if self.transaction_ids.is_empty() {
return;
}
let ((Reverse(lowest_fee), _), _) = self.transaction_ids.last_key_value().expect("item must be present");
if lowest_fee > &fee {
return;
}
let (_, id) = self.transaction_ids.pop_last().expect("item must be present");
self.transactions.remove(&id);
self.insert(transaction_id, transaction, fee);
}
fn pop(&mut self) -> Option<(N::TransactionID, Transaction<N>)> {
let (_, transaction_id) = self.transaction_ids.pop_first()?;
self.transactions.remove(&transaction_id).map(|transaction| (transaction_id, transaction))
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkvm::{
ledger::test_helpers::{sample_deployment_transaction, sample_execution_transaction_with_fee},
prelude::{MainnetV0, TestRng},
};
type CurrentNetwork = MainnetV0;
#[test]
fn insert_and_pop_low_priority_transactions() {
let mut rng = TestRng::default();
let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
let execution_id = execution_transaction.id();
let zero_fee = U64::new(0);
let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
assert!(!transactions_queue.contains(&execution_id));
transactions_queue.insert(execution_id, execution_transaction.clone(), zero_fee).unwrap();
assert!(transactions_queue.contains(&execution_id));
assert!(transactions_queue.executions.contains(&execution_id));
assert!(!transactions_queue.deployments.contains(&execution_id));
let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
assert_eq!(popped_execution_id, execution_id);
assert_eq!(popped_execution_transaction, execution_transaction);
assert!(!transactions_queue.contains(&execution_id));
let deployment_transaction = sample_deployment_transaction(2, 0, false, false, &mut rng);
let deployment_id = deployment_transaction.id();
assert!(!transactions_queue.contains(&deployment_id));
transactions_queue.insert(deployment_id, deployment_transaction.clone(), zero_fee).unwrap();
assert!(transactions_queue.contains(&deployment_id));
assert!(transactions_queue.deployments.contains(&deployment_id));
assert!(!transactions_queue.executions.contains(&deployment_id));
let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
assert_eq!(popped_deployment_id, deployment_id);
assert_eq!(popped_deployment_transaction, deployment_transaction);
assert!(!transactions_queue.contains(&deployment_id));
}
#[test]
fn insert_and_pop_high_priority_transactions() {
let mut rng = TestRng::default();
let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
let execution_id = execution_transaction.id();
let high_fee = U64::new(100);
let mut transactions_queue = TransactionsQueue::<CurrentNetwork>::default();
assert!(!transactions_queue.contains(&execution_id));
transactions_queue.insert(execution_id, execution_transaction.clone(), high_fee).unwrap();
assert!(transactions_queue.contains(&execution_id));
assert!(transactions_queue.executions.contains(&execution_id));
assert!(transactions_queue.executions.priority_queue.transactions.contains_key(&execution_id));
let (popped_execution_id, popped_execution_transaction) = transactions_queue.executions.pop().unwrap();
assert_eq!(popped_execution_id, execution_id);
assert_eq!(popped_execution_transaction, execution_transaction);
assert!(!transactions_queue.contains(&execution_id));
let deployment_transaction = sample_deployment_transaction(2, 0, false, false, &mut rng);
let deployment_id = deployment_transaction.id();
assert!(!transactions_queue.contains(&deployment_id));
transactions_queue.insert(deployment_id, deployment_transaction.clone(), high_fee).unwrap();
assert!(transactions_queue.contains(&deployment_id));
assert!(transactions_queue.deployments.contains(&deployment_id));
assert!(transactions_queue.deployments.priority_queue.transactions.contains_key(&deployment_id));
let (popped_deployment_id, popped_deployment_transaction) = transactions_queue.deployments.pop().unwrap();
assert_eq!(popped_deployment_id, deployment_id);
assert_eq!(popped_deployment_transaction, deployment_transaction);
assert!(!transactions_queue.contains(&deployment_id));
}
#[test]
fn insert_and_pop_ordering_with_eviction() {
let mut rng = TestRng::default();
let executions: Vec<_> = (0..10)
.map(|_| {
let execution_transaction = sample_execution_transaction_with_fee(false, &mut rng, 0);
(execution_transaction.id(), execution_transaction)
})
.collect();
let mut executions_queue = TransactionsQueueInner::new(4);
executions_queue.insert(executions[0].0, executions[0].1.clone(), U64::new(300)).unwrap();
executions_queue.insert(executions[1].0, executions[1].1.clone(), U64::new(0)).unwrap();
executions_queue.insert(executions[2].0, executions[2].1.clone(), U64::new(100)).unwrap();
executions_queue.insert(executions[3].0, executions[3].1.clone(), U64::new(200)).unwrap();
assert_eq!(executions_queue.len(), 4);
executions_queue.insert(executions[4].0, executions[4].1.clone(), U64::new(50)).unwrap();
assert_eq!(executions_queue.queue.len(), 0);
assert_eq!(executions_queue.priority_queue.len(), 4);
assert!(executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[1].0));
executions_queue.insert(executions[5].0, executions[5].1.clone(), U64::new(150)).unwrap();
assert_eq!(executions_queue.queue.len(), 0);
assert_eq!(executions_queue.priority_queue.len(), 4);
assert!(!executions_queue.priority_queue.transactions.contains_key(&executions[4].0));
assert!(executions_queue.priority_queue.transactions.contains_key(&executions[5].0));
assert!(executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).is_err());
assert_eq!(executions_queue.pop().unwrap(), executions[0]);
assert_eq!(executions_queue.pop().unwrap(), executions[3]);
assert_eq!(executions_queue.pop().unwrap(), executions[5]);
assert_eq!(executions_queue.pop().unwrap(), executions[2]);
assert_eq!(executions_queue.len(), 0);
assert_eq!(executions_queue.queue.len(), 0);
assert_eq!(executions_queue.priority_queue.len(), 0);
assert!(executions_queue.pop().is_none());
executions_queue.insert(executions[6].0, executions[6].1.clone(), U64::new(0)).unwrap();
assert_eq!(executions_queue.len(), 1);
assert_eq!(executions_queue.queue.len(), 1);
assert_eq!(executions_queue.priority_queue.len(), 0);
}
}