extern crate rustc_hash;
extern crate slab;
use super::links::TxLinks;
use crate::TxEntry;
use crate::component::edges::Edges;
use crate::component::links::{Relation, TxLinksMap};
use crate::component::sort_key::{AncestorsScoreSortKey, EvictKey};
use crate::error::Reject;
use ckb_logger::{debug, error, trace};
use ckb_types::core::error::OutPointError;
use ckb_types::core::{Cycle, FeeRate};
use ckb_types::packed::OutPoint;
use ckb_types::{
bytes::Bytes,
core::TransactionView,
packed::{Byte32, CellOutput, ProposalShortId},
};
use multi_index_map::MultiIndexMap;
use std::collections::HashSet;
type ConflictEntry = (TxEntry, Reject);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Status {
Pending,
Gap,
Proposed,
}
impl std::fmt::Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Status::Pending => write!(f, "pending"),
Status::Gap => write!(f, "gap"),
Status::Proposed => write!(f, "proposed"),
}
}
}
#[derive(Copy, Clone)]
enum EntryOp {
Add,
Remove,
}
#[derive(MultiIndexMap, Clone)]
pub struct PoolEntry {
#[multi_index(hashed_unique)]
pub id: ProposalShortId,
#[multi_index(ordered_non_unique)]
pub score: AncestorsScoreSortKey,
#[multi_index(hashed_non_unique)]
pub status: Status,
#[multi_index(ordered_non_unique)]
pub evict_key: EvictKey,
pub inner: TxEntry,
}
pub struct PoolMap {
pub(crate) entries: MultiIndexPoolEntryMap,
pub(crate) edges: Edges,
pub(crate) links: TxLinksMap,
pub(crate) max_ancestors_count: usize,
pub(crate) total_tx_size: usize,
pub(crate) total_tx_cycles: Cycle,
pub(crate) pending_count: usize,
pub(crate) gap_count: usize,
pub(crate) proposed_count: usize,
}
impl PoolMap {
pub fn new(max_ancestors_count: usize) -> Self {
PoolMap {
entries: MultiIndexPoolEntryMap::default(),
edges: Edges::default(),
links: TxLinksMap::new(),
max_ancestors_count,
total_tx_size: 0,
total_tx_cycles: 0,
pending_count: 0,
gap_count: 0,
proposed_count: 0,
}
}
#[cfg(test)]
pub(crate) fn header_deps_len(&self) -> usize {
self.edges.header_deps_len()
}
#[cfg(test)]
pub(crate) fn deps_len(&self) -> usize {
self.edges.deps_len()
}
#[cfg(test)]
pub(crate) fn inputs_len(&self) -> usize {
self.edges.inputs_len()
}
#[cfg(test)]
pub(crate) fn size(&self) -> usize {
self.entries.len()
}
#[cfg(test)]
pub(crate) fn contains_key(&self, id: &ProposalShortId) -> bool {
self.entries.get_by_id(id).is_some()
}
#[cfg(test)]
pub(crate) fn get_tx(&self, id: &ProposalShortId) -> Option<&TransactionView> {
self.entries
.get_by_id(id)
.map(|entry| entry.inner.transaction())
}
#[cfg(test)]
pub(crate) fn add_proposed(&mut self, entry: TxEntry) -> Result<bool, Reject> {
self.add_entry(entry, Status::Proposed)
.map(|(succ, _)| succ)
}
pub(crate) fn get_max_update_time(&self) -> u64 {
self.entries
.iter()
.map(|(_, entry)| entry.inner.timestamp)
.max()
.unwrap_or(0)
}
pub(crate) fn get_by_id(&self, id: &ProposalShortId) -> Option<&PoolEntry> {
self.entries.get_by_id(id)
}
fn get_by_id_checked(&self, id: &ProposalShortId) -> &PoolEntry {
self.get_by_id(id).expect("inconsistent pool")
}
pub(crate) fn pending_size(&self) -> usize {
self.pending_count + self.gap_count
}
pub(crate) fn proposed_size(&self) -> usize {
self.proposed_count
}
pub(crate) fn sorted_proposed_iter(&self) -> impl Iterator<Item = &TxEntry> {
self.score_sorted_iter_by_status(Status::Proposed)
}
pub(crate) fn get(&self, id: &ProposalShortId) -> Option<&TxEntry> {
self.get_by_id(id).map(|entry| &entry.inner)
}
pub(crate) fn get_proposed(&self, id: &ProposalShortId) -> Option<&TxEntry> {
match self.get_by_id(id) {
Some(entry) if entry.status == Status::Proposed => Some(&entry.inner),
_ => None,
}
}
pub(crate) fn has_proposed(&self, id: &ProposalShortId) -> bool {
self.get_proposed(id).is_some()
}
pub(crate) fn calc_ancestors(&self, short_id: &ProposalShortId) -> HashSet<ProposalShortId> {
self.links.calc_ancestors(short_id)
}
pub(crate) fn calc_descendants(&self, short_id: &ProposalShortId) -> HashSet<ProposalShortId> {
self.links.calc_descendants(short_id)
}
pub(crate) fn get_output_with_data(&self, out_point: &OutPoint) -> Option<(CellOutput, Bytes)> {
self.get(&ProposalShortId::from_tx_hash(&out_point.tx_hash()))
.and_then(|entry| {
entry
.transaction()
.output_with_data(out_point.index().into())
})
}
pub(crate) fn add_entry(
&mut self,
mut entry: TxEntry,
status: Status,
) -> Result<(bool, HashSet<TxEntry>), Reject> {
let tx_short_id = entry.proposal_short_id();
let mut evicts = Default::default();
if self.entries.get_by_id(&tx_short_id).is_some() {
return Ok((false, evicts));
}
let (total_tx_size, total_tx_cycles) =
self.updated_stat_for_add_tx(entry.size, entry.cycles)?;
trace!("pool_map.add_{:?} {}", status, entry.transaction().hash());
evicts = self.check_and_record_ancestors(&mut entry)?;
self.record_entry_edges(&entry)?;
self.insert_entry(&entry, status);
self.record_entry_descendants(&entry);
self.track_entry_statics(None, Some(status));
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
Ok((true, evicts))
}
pub(crate) fn set_entry(&mut self, short_id: &ProposalShortId, status: Status) {
let mut old_status = None;
self.entries
.modify_by_id(short_id, |e| {
old_status = Some(e.status);
e.status = status;
})
.expect("inconsistent pool");
self.track_entry_statics(old_status, Some(status));
}
pub(crate) fn remove_entry(&mut self, id: &ProposalShortId) -> Option<TxEntry> {
self.entries.remove_by_id(id).map(|entry| {
debug!(
"remove entry {} from status: {:?}",
entry.inner.transaction().hash(),
entry.status
);
self.update_ancestors_index_key(&entry.inner, EntryOp::Remove);
self.update_descendants_index_key(&entry.inner, EntryOp::Remove);
self.remove_entry_edges(&entry.inner);
self.remove_entry_links(id);
self.track_entry_statics(Some(entry.status), None);
self.update_stat_for_remove_tx(entry.inner.size, entry.inner.cycles);
entry.inner
})
}
pub(crate) fn remove_entry_and_descendants(&mut self, id: &ProposalShortId) -> Vec<TxEntry> {
let mut removed_ids = vec![id.to_owned()];
removed_ids.extend(self.calc_descendants(id));
for id in &removed_ids {
self.remove_entry_links(id);
}
removed_ids
.iter()
.filter_map(|id| self.remove_entry(id))
.collect()
}
pub(crate) fn resolve_conflict_header_dep(
&mut self,
headers: &HashSet<Byte32>,
) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();
let mut ids = Vec::new();
for (tx_id, deps) in self.edges.header_deps.iter() {
for hash in deps {
if headers.contains(hash) {
ids.push((hash.clone(), tx_id.clone()));
break;
}
}
}
for (blk_hash, id) in ids {
let entries = self.remove_entry_and_descendants(&id);
for entry in entries {
let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash.to_owned()));
conflicts.push((entry, reject));
}
}
conflicts
}
pub(crate) fn find_conflict_tx(&self, tx: &TransactionView) -> HashSet<ProposalShortId> {
tx.input_pts_iter()
.filter_map(|out_point| self.edges.get_input_ref(&out_point).cloned())
.collect()
}
pub(crate) fn find_conflict_outpoint(&self, tx: &TransactionView) -> Option<OutPoint> {
tx.input_pts_iter()
.find_map(|out_point| self.edges.get_input_ref(&out_point).map(|_| out_point))
}
pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();
for i in tx.input_pts_iter() {
if let Some(id) = self.edges.remove_input(&i) {
let entries = self.remove_entry_and_descendants(&id);
if !entries.is_empty() {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
let rejects = std::iter::repeat_n(reject, entries.len());
conflicts.extend(entries.into_iter().zip(rejects));
}
}
if let Some(x) = self.edges.remove_deps(&i) {
for id in x {
let entries = self.remove_entry_and_descendants(&id);
if !entries.is_empty() {
let reject = Reject::Resolve(OutPointError::Dead(i.clone()));
let rejects = std::iter::repeat_n(reject, entries.len());
conflicts.extend(entries.into_iter().zip(rejects));
}
}
}
}
conflicts
}
pub(crate) fn estimate_fee_rate(
&self,
mut target_blocks: usize,
max_block_bytes: usize,
max_block_cycles: Cycle,
min_fee_rate: FeeRate,
) -> FeeRate {
debug_assert!(target_blocks > 0);
let iter = self.entries.iter_by_score().rev();
let mut current_block_bytes = 0;
let mut current_block_cycles = 0;
for entry in iter {
current_block_bytes += entry.inner.size;
current_block_cycles += entry.inner.cycles;
if current_block_bytes >= max_block_bytes || current_block_cycles >= max_block_cycles {
target_blocks -= 1;
if target_blocks == 0 {
return entry.inner.fee_rate();
}
current_block_bytes = entry.inner.size;
current_block_cycles = entry.inner.cycles;
}
}
min_fee_rate
}
pub(crate) fn get_proposals(
&self,
limit: usize,
exclusion: &HashSet<ProposalShortId>,
) -> HashSet<ProposalShortId> {
self.score_sorted_iter_by_status(Status::Pending)
.filter_map(|entry| {
let id = entry.proposal_short_id();
(!exclusion.contains(&id)).then_some(id)
})
.take(limit)
.collect()
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &PoolEntry> {
self.entries.iter().map(|(_, entry)| entry)
}
pub(crate) fn next_evict_entry(&self, status: Status) -> Option<ProposalShortId> {
self.entries
.iter_by_evict_key()
.find(move |entry| entry.status == status)
.map(|entry| entry.id.clone())
}
pub(crate) fn clear(&mut self) {
self.entries = MultiIndexPoolEntryMap::default();
self.edges.clear();
self.links.clear();
self.total_tx_size = 0;
self.total_tx_cycles = 0;
self.pending_count = 0;
self.gap_count = 0;
self.proposed_count = 0;
}
pub(crate) fn score_sorted_iter_by_status(
&self,
status: Status,
) -> impl Iterator<Item = &TxEntry> {
self.entries
.iter_by_score()
.rev()
.filter_map(move |entry| (entry.status == status).then_some(&entry.inner))
}
pub(crate) fn score_sorted_iter_by_statuses(
&self,
statuses: Vec<Status>,
) -> impl Iterator<Item = &TxEntry> {
self.entries
.iter_by_score()
.rev()
.filter_map(move |entry| statuses.contains(&entry.status).then_some(&entry.inner))
}
fn remove_entry_links(&mut self, id: &ProposalShortId) {
if let Some(parents) = self.links.get_parents(id).cloned() {
for parent in parents {
self.links.remove_child(&parent, id);
}
}
if let Some(children) = self.links.get_children(id).cloned() {
for child in children {
self.links.remove_parent(&child, id);
}
}
self.links.remove(id);
}
fn update_ancestors_index_key(&mut self, child: &TxEntry, op: EntryOp) {
let ancestors: HashSet<ProposalShortId> =
self.links.calc_ancestors(&child.proposal_short_id());
for anc_id in &ancestors {
self.entries.modify_by_id(anc_id, |e| {
match op {
EntryOp::Remove => e.inner.sub_descendant_weight(child),
EntryOp::Add => e.inner.add_descendant_weight(child),
};
e.evict_key = e.inner.as_evict_key();
});
}
}
fn update_descendants_index_key(&mut self, parent: &TxEntry, op: EntryOp) {
let descendants: HashSet<ProposalShortId> =
self.links.calc_descendants(&parent.proposal_short_id());
for desc_id in &descendants {
self.entries.modify_by_id(desc_id, |e| {
match op {
EntryOp::Remove => e.inner.sub_ancestor_weight(parent),
EntryOp::Add => e.inner.add_ancestor_weight(parent),
};
e.score = e.inner.as_score_key();
});
}
}
fn record_entry_edges(&mut self, entry: &TxEntry) -> Result<(), Reject> {
let tx_short_id: ProposalShortId = entry.proposal_short_id();
let header_deps = entry.transaction().header_deps();
let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect();
let inputs = entry.transaction().input_pts_iter();
for i in inputs {
self.edges.insert_input(i.to_owned(), tx_short_id.clone())?;
}
for d in related_dep_out_points {
self.edges.insert_deps(d.to_owned(), tx_short_id.clone());
}
if !header_deps.is_empty() {
self.edges
.header_deps
.insert(tx_short_id, header_deps.into_iter().collect());
}
Ok(())
}
fn record_entry_descendants(&mut self, entry: &TxEntry) {
let tx_short_id: ProposalShortId = entry.proposal_short_id();
let outputs = entry.transaction().output_pts();
let mut children = HashSet::new();
for o in outputs {
if let Some(ids) = self.edges.get_deps_ref(&o).cloned() {
children.extend(ids);
}
if let Some(id) = self.edges.get_input_ref(&o).cloned() {
children.insert(id);
}
}
if !children.is_empty() {
for child in &children {
self.links.add_parent(child, tx_short_id.clone());
}
if let Some(links) = self.links.inner.get_mut(&tx_short_id) {
links.children.extend(children);
}
self.update_descendants_index_key(entry, EntryOp::Add);
}
self.update_ancestors_index_key(entry, EntryOp::Add);
}
fn get_tx_ancenstors(
&self,
entry: &TransactionView,
) -> (
HashSet<ProposalShortId>,
HashSet<ProposalShortId>,
HashSet<ProposalShortId>,
) {
let mut parents: HashSet<ProposalShortId> =
HashSet::with_capacity(entry.inputs().len() + entry.cell_deps().len());
let mut cell_ref_parents: HashSet<ProposalShortId> = Default::default();
for input in entry.inputs() {
let input_pt = input.previous_output();
if let Some(deps) = self.edges.deps.get(&input_pt) {
cell_ref_parents.extend(deps.iter().cloned());
parents.extend(deps.iter().cloned());
}
let id = ProposalShortId::from_tx_hash(&input_pt.tx_hash());
if self.links.inner.contains_key(&id) {
parents.insert(id);
}
}
for cell_dep in entry.cell_deps() {
let dep_pt = cell_dep.out_point();
let id = ProposalShortId::from_tx_hash(&dep_pt.tx_hash());
if self.links.inner.contains_key(&id) {
parents.insert(id);
}
}
let ancestors = self
.links
.calc_relation_ids(parents.clone(), Relation::Parents);
(ancestors, parents, cell_ref_parents)
}
fn _record_ancestors(
&mut self,
entry: &mut TxEntry,
ancestors: HashSet<ProposalShortId>,
parents: HashSet<ProposalShortId>,
) {
for ancestor_id in &ancestors {
let ancestor = self.get_by_id_checked(ancestor_id);
entry.add_ancestor_weight(&ancestor.inner);
}
let short_id = entry.proposal_short_id();
for parent in &parents {
self.links.add_child(parent, short_id.clone());
}
self.links.add_link(
short_id,
TxLinks {
parents,
children: Default::default(),
},
);
}
fn check_and_record_ancestors(
&mut self,
entry: &mut TxEntry,
) -> Result<HashSet<TxEntry>, Reject> {
let tx = entry.transaction();
let (ancestors, mut parents, cell_ref_parents) = self.get_tx_ancenstors(tx);
let mut ancestors_count = ancestors.len() + 1;
let mut evicted = Default::default();
if ancestors_count <= self.max_ancestors_count {
self._record_ancestors(entry, ancestors, parents);
return Ok(evicted);
}
if ancestors_count.saturating_sub(cell_ref_parents.len()) <= self.max_ancestors_count {
let evict_candidates: Vec<ProposalShortId> = self
.entries
.iter_by_evict_key()
.filter(move |entry| cell_ref_parents.contains(&entry.id))
.map(|x| x.id.clone())
.collect();
let mut iter = evict_candidates.iter();
while ancestors_count > self.max_ancestors_count {
if let Some(next_id) = iter.next() {
let removed = self.remove_entry_and_descendants(next_id);
ancestors_count = ancestors_count.saturating_sub(1);
parents.remove(next_id);
evicted.extend(removed);
} else {
break;
}
}
} else {
return Err(Reject::ExceededMaximumAncestorsCount);
}
let ancestors = self
.links
.calc_relation_ids(parents.clone(), Relation::Parents);
assert!(ancestors.len() < self.max_ancestors_count);
self._record_ancestors(entry, ancestors, parents);
Ok(evicted)
}
fn remove_entry_edges(&mut self, entry: &TxEntry) {
for i in entry.transaction().input_pts_iter() {
self.edges.remove_input(&i);
}
let id = entry.proposal_short_id();
for d in entry.related_dep_out_points().cloned() {
self.edges.delete_txid_by_dep(d, &id);
}
self.edges.header_deps.remove(&id);
}
fn insert_entry(&mut self, entry: &TxEntry, status: Status) {
let tx_short_id = entry.proposal_short_id();
let score = entry.as_score_key();
let evict_key = entry.as_evict_key();
self.entries.insert(PoolEntry {
id: tx_short_id,
score,
status,
inner: entry.clone(),
evict_key,
});
}
fn track_entry_statics(&mut self, remove: Option<Status>, add: Option<Status>) {
match remove {
Some(Status::Pending) => self.pending_count -= 1,
Some(Status::Gap) => self.gap_count -= 1,
Some(Status::Proposed) => self.proposed_count -= 1,
_ => {}
}
match add {
Some(Status::Pending) => self.pending_count += 1,
Some(Status::Gap) => self.gap_count += 1,
Some(Status::Proposed) => self.proposed_count += 1,
_ => {}
}
assert_eq!(
self.pending_count + self.gap_count + self.proposed_count,
self.entries.len()
);
if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_tx_pool_entry
.pending
.set(self.pending_count as i64);
metrics.ckb_tx_pool_entry.gap.set(self.gap_count as i64);
metrics
.ckb_tx_pool_entry
.proposed
.set(self.proposed_count as i64);
}
}
fn recompute_total_stat(&self) -> Option<(usize, Cycle)> {
self.entries.iter().try_fold(
(0usize, 0 as Cycle),
|(total_size, total_cycles), (_, entry)| {
Some((
total_size.checked_add(entry.inner.size)?,
total_cycles.checked_add(entry.inner.cycles)?,
))
},
)
}
fn updated_stat_for_add_tx(
&self,
tx_size: usize,
cycles: Cycle,
) -> Result<(usize, Cycle), Reject> {
let total_tx_size = self.total_tx_size.checked_add(tx_size).ok_or_else(|| {
Reject::Full(format!(
"tx-pool total_tx_size {} overflows by add {}",
self.total_tx_size, tx_size
))
})?;
let total_tx_cycles = self.total_tx_cycles.checked_add(cycles).ok_or_else(|| {
Reject::Full(format!(
"tx-pool total_tx_cycles {} overflows by add {}",
self.total_tx_cycles, cycles
))
})?;
Ok((total_tx_size, total_tx_cycles))
}
fn update_stat_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) {
match (
self.total_tx_size.checked_sub(tx_size),
self.total_tx_cycles.checked_sub(cycles),
) {
(Some(total_tx_size), Some(total_tx_cycles)) => {
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
}
_ => {
if let Some((total_tx_size, total_tx_cycles)) = self.recompute_total_stat() {
error!(
"tx-pool total stats underflowed when removing size {} cycles {}, recomputed size {} cycles {}",
tx_size, cycles, total_tx_size, total_tx_cycles
);
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
} else {
error!(
"tx-pool total stats underflowed when removing size {} cycles {}, and recomputing overflowed",
tx_size, cycles
);
}
}
}
}
}