#![allow(missing_docs)]
extern crate rustc_hash;
extern crate slab;
use ckb_logger::error;
use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{
core::{Cycle, TransactionView, tx_pool::Reject},
packed::ProposalShortId,
};
use ckb_util::shrink_to_fit;
use multi_index_map::MultiIndexMap;
use std::sync::Arc;
use tokio::sync::Notify;
const DEFAULT_MAX_VERIFY_QUEUE_TX_SIZE: usize = 256_000_000;
const SHRINK_THRESHOLD: usize = 100;
#[derive(Debug, Clone, Eq)]
pub struct Entry {
pub(crate) tx: TransactionView,
pub(crate) remote: Option<(Cycle, PeerIndex)>,
}
impl PartialEq for Entry {
fn eq(&self, other: &Entry) -> bool {
self.tx == other.tx
}
}
#[derive(MultiIndexMap, Clone)]
struct VerifyEntry {
#[multi_index(hashed_unique)]
id: ProposalShortId,
#[multi_index(ordered_non_unique)]
added_time: u64,
#[multi_index(hashed_non_unique)]
is_large_cycle: bool,
is_proposal_tx: bool,
inner: Entry,
}
pub(crate) struct VerifyQueue {
inner: MultiIndexVerifyEntryMap,
ready_rx: Arc<Notify>,
total_tx_size: usize,
large_cycle_threshold: u64,
}
impl VerifyQueue {
pub(crate) fn new(large_cycle_threshold: u64) -> Self {
VerifyQueue {
inner: MultiIndexVerifyEntryMap::default(),
ready_rx: Arc::new(Notify::new()),
total_tx_size: 0,
large_cycle_threshold,
}
}
fn recompute_total_tx_size(&self) -> Option<usize> {
self.inner.iter().try_fold(0usize, |total, (_, entry)| {
total.checked_add(entry.inner.tx.data().serialized_size_in_block())
})
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn len(&self) -> usize {
self.inner.len()
}
#[cfg(test)]
pub fn total_tx_size(&self) -> usize {
self.total_tx_size
}
#[cfg(test)]
pub(crate) fn set_total_tx_size_for_test(&mut self, total_tx_size: usize) {
self.total_tx_size = total_tx_size;
}
pub fn is_full(&self, add_tx_size: usize) -> bool {
add_tx_size >= DEFAULT_MAX_VERIFY_QUEUE_TX_SIZE - self.total_tx_size
}
pub fn contains_key(&self, id: &ProposalShortId) -> bool {
self.inner.get_by_id(id).is_some()
}
pub fn get_tx_by_id(&self, id: &ProposalShortId) -> Option<&Entry> {
self.inner.get_by_id(id).map(|e| &e.inner)
}
pub fn shrink_to_fit(&mut self) {
shrink_to_fit!(self.inner, SHRINK_THRESHOLD);
}
pub fn subscribe(&self) -> Arc<Notify> {
Arc::clone(&self.ready_rx)
}
pub fn remove_tx(&mut self, id: &ProposalShortId) -> Option<Entry> {
self.inner.remove_by_id(id).map(|e| {
let tx_size = e.inner.tx.data().serialized_size_in_block();
if let Some(total_tx_size) = self.total_tx_size.checked_sub(tx_size) {
self.total_tx_size = total_tx_size;
} else if let Some(total_tx_size) = self.recompute_total_tx_size() {
error!(
"verify_queue total_tx_size {} underflowed by sub {}, recomputed {}",
self.total_tx_size, tx_size, total_tx_size
);
self.total_tx_size = total_tx_size;
} else {
error!(
"verify_queue total_tx_size {} underflowed by sub {}, and recomputing overflowed",
self.total_tx_size, tx_size
);
}
self.shrink_to_fit();
e.inner
})
}
pub fn remove_txs(&mut self, ids: impl Iterator<Item = ProposalShortId>) {
for id in ids {
self.remove_tx(&id);
}
}
pub fn remove_txs_by_peer(&mut self, peer: &PeerIndex) {
let ids: Vec<_> = self
.inner
.iter()
.filter(|&(_cycle, entry)| entry.inner.remote.as_ref().is_some_and(|(_, p)| p == peer))
.map(|(_cycle, entry)| entry.id.clone())
.collect();
self.remove_txs(ids.into_iter());
}
pub fn pop_front(&mut self, only_small_cycle: bool) -> Option<Entry> {
if let Some(short_id) = self.peek(only_small_cycle) {
self.remove_tx(&short_id)
} else {
None
}
}
pub fn peek(&self, only_small_cycle: bool) -> Option<ProposalShortId> {
let mut iter = self.inner.iter_by_added_time();
if let Some(proposal_entry) = iter.find(|e| e.is_proposal_tx) {
return Some(proposal_entry.inner.tx.proposal_short_id());
}
let entry = if only_small_cycle {
self.inner.iter_by_added_time().find(|e| !e.is_large_cycle)
} else {
self.inner.iter_by_added_time().next()
};
entry.map(|e| e.inner.tx.proposal_short_id())
}
pub fn add_tx(
&mut self,
tx: TransactionView,
is_proposal_tx: bool,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<bool, Reject> {
if self.contains_key(&tx.proposal_short_id()) {
if is_proposal_tx {
self.remove_tx(&tx.proposal_short_id());
} else {
return Ok(false);
}
}
let tx_size = tx.data().serialized_size_in_block();
let is_large_cycle = remote
.map(|(cycles, _)| cycles > self.large_cycle_threshold)
.unwrap_or(false);
if self.is_full(tx_size) {
return Err(Reject::Full(format!(
"verify_queue total_tx_size exceeded, failed to add tx: {:#x}",
tx.hash()
)));
}
let total_tx_size = self.total_tx_size.checked_add(tx_size).ok_or_else(|| {
Reject::Full(format!(
"verify_queue total_tx_size overflowed, failed to add tx: {:#x}",
tx.hash()
))
})?;
self.inner.insert(VerifyEntry {
id: tx.proposal_short_id(),
added_time: unix_time_as_millis(),
inner: Entry { tx, remote },
is_large_cycle,
is_proposal_tx,
});
self.total_tx_size = total_tx_size;
self.ready_rx.notify_one();
Ok(true)
}
pub fn re_notify(&self) {
self.ready_rx.notify_one();
}
pub fn clear(&mut self) {
self.inner.clear();
self.total_tx_size = 0;
self.shrink_to_fit();
}
}