use crate::core_mempool::transaction::{MempoolTransaction, SequenceInfo, TimelineState};
use crate::{
counters,
logging::{LogEntry, LogSchema},
};
use aptos_logger::prelude::*;
use aptos_types::account_address::AccountAddress;
use rand::seq::SliceRandom;
use std::{
cmp::Ordering,
collections::{btree_set::Iter, BTreeMap, BTreeSet, HashMap},
iter::Rev,
ops::Bound,
time::Duration,
};
pub type AccountTransactions = BTreeMap<u64, MempoolTransaction>;
pub struct PriorityIndex {
data: BTreeSet<OrderedQueueKey>,
}
pub type PriorityQueueIter<'a> = Rev<Iter<'a, OrderedQueueKey>>;
impl PriorityIndex {
pub(crate) fn new() -> Self {
Self {
data: BTreeSet::new(),
}
}
pub(crate) fn insert(&mut self, txn: &MempoolTransaction) {
self.data.insert(self.make_key(txn));
}
pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
self.data.remove(&self.make_key(txn));
}
pub(crate) fn contains(&self, txn: &MempoolTransaction) -> bool {
self.data.contains(&self.make_key(txn))
}
fn make_key(&self, txn: &MempoolTransaction) -> OrderedQueueKey {
OrderedQueueKey {
gas_ranking_score: txn.ranking_score,
expiration_time: txn.expiration_time,
address: txn.get_sender(),
sequence_number: txn.sequence_info,
}
}
pub(crate) fn iter(&self) -> PriorityQueueIter {
self.data.iter().rev()
}
pub(crate) fn size(&self) -> usize {
self.data.len()
}
}
#[derive(Eq, PartialEq, Clone, Debug, Hash)]
pub struct OrderedQueueKey {
pub gas_ranking_score: u64,
pub expiration_time: Duration,
pub address: AccountAddress,
pub sequence_number: SequenceInfo,
}
impl PartialOrd for OrderedQueueKey {
fn partial_cmp(&self, other: &OrderedQueueKey) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrderedQueueKey {
fn cmp(&self, other: &OrderedQueueKey) -> Ordering {
match self.gas_ranking_score.cmp(&other.gas_ranking_score) {
Ordering::Equal => {}
ordering => return ordering,
}
match self.expiration_time.cmp(&other.expiration_time).reverse() {
Ordering::Equal => {}
ordering => return ordering,
}
match self.address.cmp(&other.address) {
Ordering::Equal => {}
ordering => return ordering,
}
self.sequence_number
.transaction_sequence_number
.cmp(&other.sequence_number.transaction_sequence_number)
.reverse()
}
}
pub struct TTLIndex {
data: BTreeSet<TTLOrderingKey>,
get_expiration_time: Box<dyn Fn(&MempoolTransaction) -> Duration + Send + Sync>,
}
impl TTLIndex {
pub(crate) fn new<F>(get_expiration_time: Box<F>) -> Self
where
F: Fn(&MempoolTransaction) -> Duration + 'static + Send + Sync,
{
Self {
data: BTreeSet::new(),
get_expiration_time,
}
}
pub(crate) fn insert(&mut self, txn: &MempoolTransaction) {
self.data.insert(self.make_key(txn));
}
pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
self.data.remove(&self.make_key(txn));
}
pub(crate) fn gc(&mut self, now: Duration) -> Vec<TTLOrderingKey> {
let ttl_key = TTLOrderingKey {
expiration_time: now,
address: AccountAddress::ZERO,
sequence_number: 0,
};
let mut active = self.data.split_off(&ttl_key);
let ttl_transactions = self.data.iter().cloned().collect();
self.data.clear();
self.data.append(&mut active);
ttl_transactions
}
fn make_key(&self, txn: &MempoolTransaction) -> TTLOrderingKey {
TTLOrderingKey {
expiration_time: (self.get_expiration_time)(txn),
address: txn.get_sender(),
sequence_number: txn.sequence_info.transaction_sequence_number,
}
}
pub(crate) fn size(&self) -> usize {
self.data.len()
}
}
#[allow(clippy::derive_ord_xor_partial_ord)]
#[derive(Eq, PartialEq, PartialOrd, Clone, Debug)]
pub struct TTLOrderingKey {
pub expiration_time: Duration,
pub address: AccountAddress,
pub sequence_number: u64,
}
#[allow(clippy::derive_ord_xor_partial_ord)]
impl Ord for TTLOrderingKey {
fn cmp(&self, other: &TTLOrderingKey) -> Ordering {
match self.expiration_time.cmp(&other.expiration_time) {
Ordering::Equal => {
(&self.address, self.sequence_number).cmp(&(&other.address, other.sequence_number))
}
ordering => ordering,
}
}
}
pub struct TimelineIndex {
timeline_id: u64,
timeline: BTreeMap<u64, (AccountAddress, u64)>,
}
impl TimelineIndex {
pub(crate) fn new() -> Self {
Self {
timeline_id: 1,
timeline: BTreeMap::new(),
}
}
pub(crate) fn read_timeline(
&self,
timeline_id: u64,
count: usize,
) -> Vec<(AccountAddress, u64)> {
let mut batch = vec![];
for (_id, &(address, sequence_number)) in self
.timeline
.range((Bound::Excluded(timeline_id), Bound::Unbounded))
{
batch.push((address, sequence_number));
if batch.len() == count {
break;
}
}
batch
}
pub(crate) fn timeline_range(&self, start_id: u64, end_id: u64) -> Vec<(AccountAddress, u64)> {
self.timeline
.range((Bound::Excluded(start_id), Bound::Included(end_id)))
.map(|(_idx, txn)| txn)
.cloned()
.collect()
}
pub(crate) fn insert(&mut self, txn: &mut MempoolTransaction) {
self.timeline.insert(
self.timeline_id,
(
txn.get_sender(),
txn.sequence_info.transaction_sequence_number,
),
);
txn.timeline_state = TimelineState::Ready(self.timeline_id);
self.timeline_id += 1;
}
pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
if let TimelineState::Ready(timeline_id) = txn.timeline_state {
self.timeline.remove(&timeline_id);
}
}
pub(crate) fn size(&self) -> usize {
self.timeline.len()
}
}
pub struct ParkingLotIndex {
data: Vec<(AccountAddress, BTreeSet<u64>)>,
account_indices: HashMap<AccountAddress, usize>,
size: usize,
}
impl ParkingLotIndex {
pub(crate) fn new() -> Self {
Self {
data: vec![],
account_indices: HashMap::new(),
size: 0,
}
}
pub(crate) fn insert(&mut self, txn: &MempoolTransaction) {
let sender = &txn.txn.sender();
let sequence_number = txn.txn.sequence_number();
let is_new_entry = match self.account_indices.get(sender) {
Some(index) => {
if let Some((_account, seq_nums)) = self.data.get_mut(*index) {
seq_nums.insert(sequence_number)
} else {
counters::CORE_MEMPOOL_INVARIANT_VIOLATION_COUNT.inc();
error!(
LogSchema::new(LogEntry::InvariantViolated),
"Parking lot invariant violated: for account {}, account index exists but missing entry in data",
sender
);
return;
}
}
None => {
let seq_nums = [sequence_number].iter().cloned().collect::<BTreeSet<_>>();
self.data.push((*sender, seq_nums));
self.account_indices.insert(*sender, self.data.len() - 1);
true
}
};
if is_new_entry {
self.size += 1;
}
}
pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
let sender = &txn.txn.sender();
if let Some(index) = self.account_indices.get(sender).cloned() {
if let Some((_account, txns)) = self.data.get_mut(index) {
if txns.remove(&txn.txn.sequence_number()) {
self.size -= 1;
}
if txns.is_empty() {
self.data.swap_remove(index);
self.account_indices.remove(sender);
if let Some((swapped_account, _)) = self.data.get(index) {
self.account_indices.insert(*swapped_account, index);
}
}
}
}
}
pub(crate) fn contains(&self, account: &AccountAddress, seq_num: &u64) -> bool {
self.account_indices
.get(account)
.and_then(|idx| self.data.get(*idx))
.map_or(false, |(_account, txns)| txns.contains(seq_num))
}
pub(crate) fn get_poppable(&self) -> Option<TxnPointer> {
let mut rng = rand::thread_rng();
self.data
.choose(&mut rng)
.and_then(|(sender, txns)| txns.iter().rev().next().map(|seq_num| (*sender, *seq_num)))
}
pub(crate) fn size(&self) -> usize {
self.size
}
}
pub type TxnPointer = (AccountAddress, u64);
impl From<&MempoolTransaction> for TxnPointer {
fn from(transaction: &MempoolTransaction) -> Self {
(
transaction.get_sender(),
transaction.sequence_info.transaction_sequence_number,
)
}
}
impl From<&OrderedQueueKey> for TxnPointer {
fn from(key: &OrderedQueueKey) -> Self {
(key.address, key.sequence_number.transaction_sequence_number)
}
}