use crate::{
core_mempool::{
index::TxnPointer,
transaction::{MempoolTransaction, TimelineState},
transaction_store::TransactionStore,
ttl_cache::TtlCache,
},
counters,
logging::{LogEntry, LogSchema, TxnsLog},
};
use aptos_config::config::NodeConfig;
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
use aptos_types::{
account_address::AccountAddress,
account_config::AccountSequenceInfo,
mempool_status::{MempoolStatus, MempoolStatusCode},
transaction::SignedTransaction,
};
use std::{
cmp::max,
collections::HashSet,
time::{Duration, SystemTime},
};
pub struct Mempool {
transactions: TransactionStore,
sequence_number_cache: TtlCache<AccountAddress, u64>,
pub(crate) metrics_cache: TtlCache<(AccountAddress, u64), SystemTime>,
pub system_transaction_timeout: Duration,
}
impl Mempool {
pub fn new(config: &NodeConfig) -> Self {
Mempool {
transactions: TransactionStore::new(&config.mempool),
sequence_number_cache: TtlCache::new(config.mempool.capacity, Duration::from_secs(100)),
metrics_cache: TtlCache::new(config.mempool.capacity, Duration::from_secs(100)),
system_transaction_timeout: Duration::from_secs(
config.mempool.system_transaction_timeout_secs,
),
}
}
pub(crate) fn remove_transaction(
&mut self,
sender: &AccountAddress,
sequence_number: u64,
is_rejected: bool,
) {
trace!(
LogSchema::new(LogEntry::RemoveTxn).txns(TxnsLog::new_txn(*sender, sequence_number)),
is_rejected = is_rejected
);
let metric_label = if is_rejected {
counters::COMMIT_REJECTED_LABEL
} else {
counters::COMMIT_ACCEPTED_LABEL
};
self.log_latency(*sender, sequence_number, metric_label);
self.metrics_cache.remove(&(*sender, sequence_number));
let current_seq_number = self
.sequence_number_cache
.remove(sender)
.unwrap_or_default();
if is_rejected {
if sequence_number >= current_seq_number {
self.transactions
.reject_transaction(sender, sequence_number);
}
} else {
let new_seq_number = max(current_seq_number, sequence_number + 1);
self.sequence_number_cache.insert(*sender, new_seq_number);
let new_seq_number = if let Some(mempool_transaction) =
self.transactions.get_mempool_txn(sender, sequence_number)
{
match mempool_transaction
.sequence_info
.account_sequence_number_type
{
x @ AccountSequenceInfo::CRSN { .. } => x,
AccountSequenceInfo::Sequential(_) => {
AccountSequenceInfo::Sequential(new_seq_number)
}
}
} else {
AccountSequenceInfo::Sequential(new_seq_number)
};
self.sequence_number_cache
.insert(*sender, new_seq_number.min_seq());
self.transactions.commit_transaction(sender, new_seq_number);
}
}
fn log_latency(&self, account: AccountAddress, sequence_number: u64, metric: &str) {
if let Some(&creation_time) = self.metrics_cache.get(&(account, sequence_number)) {
if let Ok(time_delta) = SystemTime::now().duration_since(creation_time) {
counters::CORE_MEMPOOL_TXN_COMMIT_LATENCY
.with_label_values(&[metric])
.observe(time_delta.as_secs_f64());
}
}
}
pub(crate) fn get_by_hash(&self, hash: HashValue) -> Option<SignedTransaction> {
self.transactions.get_by_hash(hash)
}
pub(crate) fn add_txn(
&mut self,
txn: SignedTransaction,
ranking_score: u64,
crsn_or_seqno: AccountSequenceInfo,
timeline_state: TimelineState,
) -> MempoolStatus {
let db_sequence_number = crsn_or_seqno.min_seq();
trace!(
LogSchema::new(LogEntry::AddTxn)
.txns(TxnsLog::new_txn(txn.sender(), txn.sequence_number())),
committed_seq_number = db_sequence_number
);
let cached_value = self.sequence_number_cache.get(&txn.sender());
let sequence_number = match crsn_or_seqno {
AccountSequenceInfo::CRSN { .. } => crsn_or_seqno,
AccountSequenceInfo::Sequential(_) => AccountSequenceInfo::Sequential(
cached_value.map_or(db_sequence_number, |value| max(*value, db_sequence_number)),
),
};
self.sequence_number_cache
.insert(txn.sender(), sequence_number.min_seq());
if txn.sequence_number() < sequence_number.min_seq() {
return MempoolStatus::new(MempoolStatusCode::InvalidSeqNumber).with_message(format!(
"transaction sequence number is {}, current sequence number is {}",
txn.sequence_number(),
sequence_number.min_seq(),
));
}
let expiration_time =
aptos_infallible::duration_since_epoch() + self.system_transaction_timeout;
if timeline_state != TimelineState::NonQualified {
self.metrics_cache
.insert((txn.sender(), txn.sequence_number()), SystemTime::now());
}
let txn_info = MempoolTransaction::new(
txn,
expiration_time,
ranking_score,
timeline_state,
sequence_number,
);
self.transactions.insert(txn_info)
}
#[allow(clippy::explicit_counter_loop)]
pub(crate) fn get_batch(
&self,
batch_size: u64,
mut seen: HashSet<TxnPointer>,
) -> Vec<SignedTransaction> {
let mut result = vec![];
let mut skipped = HashSet::new();
let seen_size = seen.len();
let mut txn_walked = 0usize;
'main: for txn in self.transactions.iter_queue() {
txn_walked += 1;
if seen.contains(&TxnPointer::from(txn)) {
continue;
}
let account_seqtype = txn.sequence_number.account_sequence_number_type;
let tx_seq = txn.sequence_number.transaction_sequence_number;
let account_sequence_number = self.sequence_number_cache.get(&txn.address);
let seen_previous = tx_seq > 0 && seen.contains(&(txn.address, tx_seq - 1));
if seen_previous
|| account_sequence_number == Some(&tx_seq)
|| matches!(account_seqtype, AccountSequenceInfo::CRSN { .. })
{
let ptr = TxnPointer::from(txn);
seen.insert(ptr);
result.push(ptr);
if (result.len() as u64) == batch_size {
break;
}
let mut skipped_txn = (txn.address, tx_seq + 1);
while skipped.contains(&skipped_txn) {
seen.insert(skipped_txn);
result.push(skipped_txn);
if (result.len() as u64) == batch_size {
break 'main;
}
skipped_txn = (txn.address, skipped_txn.1 + 1);
}
} else {
skipped.insert(TxnPointer::from(txn));
}
}
let result_size = result.len();
let mut block_log = TxnsLog::new();
let block: Vec<_> = result
.into_iter()
.filter_map(|(address, tx_seq)| {
block_log.add(address, tx_seq);
self.transactions.get(&address, tx_seq)
})
.collect();
debug!(
LogSchema::new(LogEntry::GetBlock).txns(block_log),
seen_consensus = seen_size,
walked = txn_walked,
seen_after = seen.len(),
result_size = result_size,
block_size = block.len()
);
for transaction in &block {
self.log_latency(
transaction.sender(),
transaction.sequence_number(),
counters::GET_BLOCK_STAGE_LABEL,
);
}
block
}
pub(crate) fn gc(&mut self) {
let now = SystemTime::now();
self.transactions.gc_by_system_ttl(&self.metrics_cache);
self.metrics_cache.gc(now);
self.sequence_number_cache.gc(now);
}
pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
self.transactions
.gc_by_expiration_time(block_time, &self.metrics_cache);
}
pub(crate) fn read_timeline(
&self,
timeline_id: u64,
count: usize,
) -> (Vec<SignedTransaction>, u64) {
self.transactions.read_timeline(timeline_id, count)
}
pub(crate) fn timeline_range(&self, start_id: u64, end_id: u64) -> Vec<SignedTransaction> {
self.transactions.timeline_range(start_id, end_id)
}
pub fn gen_snapshot(&self) -> TxnsLog {
self.transactions.gen_snapshot(&self.metrics_cache)
}
#[cfg(test)]
pub fn get_parking_lot_size(&self) -> usize {
self.transactions.get_parking_lot_size()
}
}