extern crate slab;
use crate::component::pool_map::PoolMap;
use crate::component::{entry::TxEntry, sort_key::AncestorsScoreSortKey};
use ckb_logger::debug;
use ckb_types::{core::Cycle, packed::ProposalShortId};
use ckb_util::LinkedHashMap;
use multi_index_map::MultiIndexMap;
use std::collections::HashSet;
#[derive(MultiIndexMap, Clone)]
pub struct ModifiedTx {
#[multi_index(hashed_unique)]
pub id: ProposalShortId,
#[multi_index(ordered_non_unique)]
pub score: AncestorsScoreSortKey,
pub inner: TxEntry,
}
impl MultiIndexModifiedTxMap {
pub fn next_best_entry(&self) -> Option<&TxEntry> {
self.iter_by_score().last().map(|x| &x.inner)
}
pub fn get(&self, id: &ProposalShortId) -> Option<&TxEntry> {
self.get_by_id(id).map(|x| &x.inner)
}
pub fn contains_key(&self, id: &ProposalShortId) -> bool {
self.get_by_id(id).is_some()
}
pub fn insert_entry(&mut self, entry: TxEntry) {
let score = AncestorsScoreSortKey::from(&entry);
self.insert(ModifiedTx {
id: entry.proposal_short_id(),
score,
inner: entry,
});
}
pub fn remove(&mut self, id: &ProposalShortId) -> Option<TxEntry> {
self.remove_by_id(id).map(|x| x.inner)
}
}
const MAX_CONSECUTIVE_FAILURES: usize = 4000;
pub struct TxSelector<'a> {
pool_map: &'a PoolMap,
entries: Vec<TxEntry>,
modified_entries: MultiIndexModifiedTxMap,
fetched_txs: HashSet<ProposalShortId>,
failed_txs: HashSet<ProposalShortId>,
}
impl<'a> TxSelector<'a> {
pub fn new(pool_map: &'a PoolMap) -> TxSelector<'a> {
TxSelector {
entries: Vec::new(),
pool_map,
modified_entries: MultiIndexModifiedTxMap::default(),
fetched_txs: HashSet::default(),
failed_txs: HashSet::default(),
}
}
pub fn txs_to_commit(
mut self,
size_limit: usize,
cycles_limit: Cycle,
) -> (Vec<TxEntry>, usize, Cycle) {
let mut size: usize = 0;
let mut cycles: Cycle = 0;
let mut consecutive_failed = 0;
let mut iter = self
.pool_map
.sorted_proposed_iter()
.filter(|entry| {
entry.ancestors_size <= size_limit && entry.ancestors_cycles <= cycles_limit
})
.peekable();
loop {
let mut using_modified = false;
if let Some(entry) = iter.peek()
&& self.skip_proposed_entry(&entry.proposal_short_id())
{
iter.next();
continue;
}
let tx_entry: TxEntry = match (iter.peek(), self.modified_entries.next_best_entry()) {
(Some(entry), Some(best_modified)) => {
if &best_modified > entry {
using_modified = true;
best_modified.clone()
} else {
iter.next().cloned().expect("peek guard")
}
}
(Some(_), None) => {
iter.next().cloned().expect("peek guarded")
}
(None, Some(best_modified)) => {
using_modified = true;
best_modified.clone()
}
(None, None) => {
break;
}
};
let short_id = tx_entry.proposal_short_id();
let next_size = size.saturating_add(tx_entry.ancestors_size);
let next_cycles = cycles.saturating_add(tx_entry.ancestors_cycles);
if next_cycles > cycles_limit || next_size > size_limit {
consecutive_failed += 1;
if using_modified {
self.modified_entries.remove(&short_id);
self.failed_txs.insert(short_id.clone());
}
if consecutive_failed > MAX_CONSECUTIVE_FAILURES {
break;
}
continue;
}
let only_unconfirmed = |short_id| {
if self.fetched_txs.contains(short_id) {
None
} else {
let entry = self.retrieve_entry(short_id);
debug_assert!(entry.is_some(), "pool should be consistent");
entry
}
};
let ancestors_ids = self.pool_map.calc_ancestors(&short_id);
if ancestors_ids
.iter()
.any(|id| !self.pool_map.has_proposed(id))
{
if using_modified {
self.modified_entries.remove(&short_id);
self.failed_txs.insert(short_id.clone());
}
consecutive_failed += 1;
if consecutive_failed > MAX_CONSECUTIVE_FAILURES {
break;
}
continue;
}
let mut ancestors = ancestors_ids
.iter()
.filter_map(only_unconfirmed)
.cloned()
.collect::<Vec<TxEntry>>();
ancestors.sort_unstable_by_key(|entry| entry.ancestors_count);
ancestors.push(tx_entry.to_owned());
let ancestors: LinkedHashMap<ProposalShortId, TxEntry> = ancestors
.into_iter()
.map(|entry| (entry.proposal_short_id(), entry))
.collect();
for (short_id, entry) in &ancestors {
let is_new = self.fetched_txs.insert(short_id.clone());
if !is_new {
debug!("package duplicate txs {}", short_id);
continue;
}
cycles = cycles.saturating_add(entry.cycles);
size = size.saturating_add(entry.size);
self.entries.push(entry.to_owned());
self.modified_entries.remove(short_id);
}
self.update_modified_entries(&ancestors);
}
(self.entries, size, cycles)
}
fn retrieve_entry(&self, short_id: &ProposalShortId) -> Option<&TxEntry> {
self.modified_entries
.get(short_id)
.or_else(|| self.pool_map.get_proposed(short_id))
}
fn skip_proposed_entry(&self, short_id: &ProposalShortId) -> bool {
self.fetched_txs.contains(short_id)
|| self.modified_entries.contains_key(short_id)
|| self.failed_txs.contains(short_id)
}
fn update_modified_entries(&mut self, already_added: &LinkedHashMap<ProposalShortId, TxEntry>) {
for (id, entry) in already_added {
let descendants = self.pool_map.calc_descendants(id);
for desc_id in descendants
.iter()
.filter(|id| !already_added.contains_key(id) && self.pool_map.has_proposed(id))
{
if let Some(mut desc) = self
.modified_entries
.remove(desc_id)
.or_else(|| self.pool_map.get(desc_id).cloned())
{
desc.sub_ancestor_weight(entry);
self.modified_entries.insert_entry(desc);
}
}
}
}
}