use crate::callback::Callbacks;
use crate::component::entry::TxEntry;
use crate::component::orphan::Entry as OrphanEntry;
use crate::component::pool_map::Status;
use crate::error::Reject;
use crate::pool::TxPool;
use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
use crate::try_or_return_with_snapshot;
use crate::util::{
after_delay_window, check_tx_fee, check_txid_collision, is_missing_input,
non_contextual_verify, time_relative_verify, verify_rtx,
};
use ckb_chain_spec::consensus::MAX_BLOCK_PROPOSALS_LIMIT;
use ckb_error::{AnyError, InternalErrorKind};
use ckb_fee_estimator::FeeEstimator;
use ckb_jsonrpc_types::BlockTemplate;
use ckb_logger::Level::Trace;
use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
use ckb_network::PeerIndex;
use ckb_script::ChunkCommand;
use ckb_snapshot::Snapshot;
use ckb_types::core::error::OutPointError;
use ckb_types::{
core::{
cell::ResolvedTransaction, BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView,
TransactionView,
},
packed::{Byte32, ProposalShortId},
};
use ckb_util::LinkedHashSet;
use ckb_verification::{
cache::{CacheEntry, Completed},
TxVerifyEnv,
};
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
const DELAY_LIMIT: usize = 1_500 * 21; pub enum PlugTarget {
Pending,
Proposed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxStatus {
Fresh,
Gap,
Proposed,
}
impl TxStatus {
fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
match self {
TxStatus::Fresh => TxVerifyEnv::new_submit(header),
TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
}
}
}
impl TxPoolService {
pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
if let Some(ref block_assembler) = self.block_assembler {
Ok(block_assembler.get_current().await)
} else {
Err(InternalErrorKind::Config
.other("BlockAssembler disabled")
.into())
}
}
pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
let guard = self.txs_verify_cache.read().await;
guard.peek(&tx.witness_hash()).cloned()
}
async fn fetch_txs_verify_cache(
&self,
txs: impl Iterator<Item = &TransactionView>,
) -> HashMap<Byte32, CacheEntry> {
let guard = self.txs_verify_cache.read().await;
txs.filter_map(|tx| {
let wtx_hash = tx.witness_hash();
guard
.peek(&wtx_hash)
.cloned()
.map(|value| (wtx_hash, value))
})
.collect()
}
pub(crate) async fn submit_entry(
&self,
pre_resolve_tip: Byte32,
entry: TxEntry,
mut status: TxStatus,
) -> (Result<(), Reject>, Arc<Snapshot>) {
let (ret, snapshot) = self
.with_tx_pool_write_lock(move |tx_pool, snapshot| {
let conflicts = if tx_pool.enable_rbf() {
tx_pool.check_rbf(&snapshot, &entry)?
} else {
let conflicted_outpoint =
tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
if let Some(outpoint) = conflicted_outpoint {
return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
}
HashSet::new()
};
let tip_hash = snapshot.tip_hash();
if pre_resolve_tip != tip_hash {
debug!(
"submit_entry {} context changed. previous:{} now:{}",
entry.proposal_short_id(),
pre_resolve_tip,
tip_hash
);
status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
let tip_header = snapshot.tip_header();
let tx_env = status.with_env(tip_header);
time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
}
let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
for evict in evicted {
let reject = Reject::Invalidated(format!(
"invalidated by tx {}",
evict.transaction().hash()
));
self.callbacks.call_reject(tx_pool, &evict, reject);
}
tx_pool.remove_conflict(&entry.proposal_short_id());
tx_pool
.limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
.map_or(Ok(()), Err)?;
if !may_recovered_txs.is_empty() {
let self_clone = self.clone();
tokio::spawn(async move {
let mut queue = self_clone.verify_queue.write().await;
for tx in may_recovered_txs {
debug!("recover back: {:?}", tx.proposal_short_id());
let _ = queue.add_tx(tx, None);
}
});
}
Ok(())
})
.await;
(ret, snapshot)
}
pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
if self.should_notify_block_assembler() {
match status {
TxStatus::Fresh => {
if self
.block_assembler_sender
.send(BlockAssemblerMessage::Pending)
.await
.is_err()
{
error!("block_assembler receiver dropped");
}
}
TxStatus::Proposed => {
if self
.block_assembler_sender
.send(BlockAssemblerMessage::Proposed)
.await
.is_err()
{
error!("block_assembler receiver dropped");
}
}
_ => {}
}
}
}
fn process_rbf(
&self,
tx_pool: &mut TxPool,
entry: &TxEntry,
conflicts: &HashSet<ProposalShortId>,
) -> Vec<TransactionView> {
let mut may_recovered_txs = vec![];
let mut available_inputs = HashSet::new();
if conflicts.is_empty() {
return may_recovered_txs;
}
let all_removed: Vec<_> = conflicts
.iter()
.flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
.collect();
available_inputs.extend(
all_removed
.iter()
.flat_map(|removed| removed.transaction().input_pts_iter()),
);
for input in entry.transaction().input_pts_iter() {
available_inputs.remove(&input);
}
may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
for old in all_removed {
debug!(
"remove conflict tx {} for RBF by new tx {}",
old.transaction().hash(),
entry.transaction().hash()
);
let reject =
Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
tx_pool.record_conflict(old.transaction().clone());
self.callbacks.call_reject(tx_pool, &old, reject);
}
assert!(!may_recovered_txs.contains(entry.transaction()));
may_recovered_txs
}
pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
let queue = self.verify_queue.read().await;
queue.contains_key(&tx.proposal_short_id())
}
pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
let orphan = self.orphan.read().await;
orphan.contains_key(&tx.proposal_short_id())
}
pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
&self,
mut f: F,
) -> (U, Arc<Snapshot>) {
let tx_pool = self.tx_pool.read().await;
let snapshot = tx_pool.cloned_snapshot();
let ret = f(&tx_pool, Arc::clone(&snapshot));
(ret, snapshot)
}
pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
&self,
mut f: F,
) -> (U, Arc<Snapshot>) {
let mut tx_pool = self.tx_pool.write().await;
let snapshot = tx_pool.cloned_snapshot();
let ret = f(&mut tx_pool, Arc::clone(&snapshot));
(ret, snapshot)
}
pub(crate) async fn pre_check(
&self,
tx: &TransactionView,
) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
let tx_size = tx.data().serialized_size_in_block();
let (ret, snapshot) = self
.with_tx_pool_read_lock(|tx_pool, snapshot| {
let tip_hash = snapshot.tip_hash();
check_txid_collision(tx_pool, tx)?;
let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
match res {
Ok((rtx, status)) => {
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
Ok((tip_hash, rtx, status, fee, tx_size))
}
Err(Reject::Resolve(OutPointError::Dead(out))) => {
let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
if conflicts.is_none() {
error!(
"{} is resolved as Dead, but there is no conflicted tx",
rtx.transaction.proposal_short_id()
);
return Err(Reject::Resolve(OutPointError::Dead(out)));
}
Ok((tip_hash, rtx, status, fee, tx_size))
}
Err(err) => Err(err),
}
})
.await;
(ret, snapshot)
}
pub(crate) fn non_contextual_verify(
&self,
tx: &TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<(), Reject> {
if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
if reject.is_malformed_tx() {
if let Some(remote) = remote {
self.ban_malformed(remote.1, format!("reject {reject}"));
}
}
return Err(reject);
}
Ok(())
}
pub(crate) async fn resumeble_process_tx(
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<bool, Reject> {
self.non_contextual_verify(&tx, remote)?;
if self.orphan_contains(&tx).await {
debug!("reject tx {} already in orphan pool", tx.hash());
return Err(Reject::Duplicated(tx.hash()));
}
if self.verify_queue_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}
self.enqueue_verify_queue(tx, remote).await
}
pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
self.non_contextual_verify(&tx, None)?;
if self.verify_queue_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}
if self.orphan_contains(&tx).await {
debug!("reject tx {} already in orphan pool", tx.hash());
return Err(Reject::Duplicated(tx.hash()));
}
self._test_accept_tx(tx.clone()).await
}
pub(crate) async fn process_tx(
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<Completed, Reject> {
self.non_contextual_verify(&tx, remote)?;
if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}
if let Some((ret, snapshot)) = self
._process_tx(tx.clone(), remote.map(|r| r.0), None)
.await
{
self.after_process(tx, remote, &snapshot, &ret).await;
ret
} else {
Ok(Completed {
cycles: 0,
fee: Capacity::zero(),
})
}
}
pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
let mut tx_pool = self.tx_pool.write().await;
if let Some(ref mut recent_reject) = tx_pool.recent_reject {
if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
error!(
"Failed to record recent_reject {} {} {}",
tx_hash, reject, e
);
}
}
}
pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
let id = ProposalShortId::from_tx_hash(&tx_hash);
{
let mut queue = self.verify_queue.write().await;
if queue.remove_tx(&id).is_some() {
return true;
}
}
{
let mut orphan = self.orphan.write().await;
if orphan.remove_orphan_tx(&id).is_some() {
return true;
}
}
let mut tx_pool = self.tx_pool.write().await;
tx_pool.remove_tx(&id)
}
pub(crate) async fn after_process(
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
snapshot: &Snapshot,
ret: &Result<Completed, Reject>,
) {
let tx_hash = tx.hash();
let with_vm_2023 = {
let epoch = snapshot
.tip_header()
.epoch()
.minimum_epoch_number_after_n_blocks(1);
self.consensus
.hardfork_switch
.ckb2023
.is_vm_version_2_and_syscalls_3_enabled(epoch)
};
if log_enabled_target!("ckb_tx_monitor", Trace) {
if let Ok(c) = ret {
trace_target!(
"ckb_tx_monitor",
r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
tx_hash,
c.cycles
);
}
}
if matches!(
ret,
Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
) {
let mut tx_pool = self.tx_pool.write().await;
if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
tx_pool.record_conflict(tx.clone());
}
}
match remote {
Some((declared_cycle, peer)) => match ret {
Ok(_) => {
debug!(
"after_process remote send_result_to_relayer {} {}",
tx_hash, peer
);
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: Some(peer),
with_vm_2023,
tx_hash,
});
self.process_orphan_tx(&tx).await;
}
Err(reject) => {
info!(
"after_process {} {} remote reject: {} ",
tx_hash, peer, reject
);
if is_missing_input(reject) {
self.send_result_to_relayer(TxVerificationResult::UnknownParents {
peer,
parents: tx.unique_parents(),
});
self.add_orphan(tx, peer, declared_cycle).await;
} else {
if reject.is_malformed_tx() {
self.ban_malformed(peer, format!("reject {reject}"));
}
if reject.is_allowed_relay() {
self.send_result_to_relayer(TxVerificationResult::Reject {
tx_hash: tx_hash.clone(),
});
}
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
}
},
None => {
match ret {
Ok(_) => {
debug!("after_process local send_result_to_relayer {}", tx_hash);
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: None,
with_vm_2023,
tx_hash,
});
self.process_orphan_tx(&tx).await;
}
Err(Reject::Duplicated(_)) => {
debug!("after_process {} duplicated", tx_hash);
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: None,
with_vm_2023,
tx_hash,
});
}
Err(reject) => {
debug!("after_process {} reject: {} ", tx_hash, reject);
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
}
}
}
}
pub(crate) async fn add_orphan(
&self,
tx: TransactionView,
peer: PeerIndex,
declared_cycle: Cycle,
) {
let evicted_txs = self
.orphan
.write()
.await
.add_orphan_tx(tx, peer, declared_cycle);
for tx_hash in evicted_txs {
self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
}
}
pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
let orphan = self.orphan.read().await;
orphan
.find_by_previous(tx)
.iter()
.filter_map(|id| orphan.get(id).cloned())
.collect::<Vec<_>>()
}
pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
self.orphan.write().await.remove_orphan_tx(id);
}
pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
orphan_queue.push_back(tx.clone());
while let Some(previous) = orphan_queue.pop_front() {
let orphans = self.find_orphan_by_previous(&previous).await;
for orphan in orphans.into_iter() {
if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
debug!(
"process_orphan {} added to verify queue; find previous from {}",
orphan.tx.hash(),
tx.hash(),
);
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
.await
.expect("enqueue suspended tx");
} else if let Some((ret, snapshot)) = self
._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
.await
{
match ret {
Ok(_) => {
let with_vm_2023 = {
let epoch = snapshot
.tip_header()
.epoch()
.minimum_epoch_number_after_n_blocks(1);
self.consensus
.hardfork_switch
.ckb2023
.is_vm_version_2_and_syscalls_3_enabled(epoch)
};
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: Some(orphan.peer),
with_vm_2023,
tx_hash: orphan.tx.hash(),
});
debug!(
"process_orphan {} success, find previous from {}",
orphan.tx.hash(),
tx.hash()
);
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
orphan_queue.push_back(orphan.tx);
}
Err(reject) => {
debug!(
"process_orphan {} reject {}, find previous from {}",
orphan.tx.hash(),
reject,
tx.hash(),
);
if !is_missing_input(&reject) {
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
if reject.is_malformed_tx() {
self.ban_malformed(orphan.peer, format!("reject {reject}"));
}
if reject.is_allowed_relay() {
self.send_result_to_relayer(TxVerificationResult::Reject {
tx_hash: orphan.tx.hash(),
});
}
if reject.should_recorded() {
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
}
}
}
}
}
}
}
}
pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
if let Err(e) = self.tx_relay_sender.send(result) {
error!("tx-pool tx_relay_sender internal error {}", e);
}
}
fn ban_malformed(&self, peer: PeerIndex, reason: String) {
const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
#[cfg(feature = "with_sentry")]
use sentry::{capture_message, with_scope, Level};
#[cfg(feature = "with_sentry")]
with_scope(
|scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
|| {
capture_message(
&format!(
"Ban peer {} for {} seconds, reason: \
{}",
peer,
DEFAULT_BAN_TIME.as_secs(),
reason
),
Level::Info,
)
},
);
self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
}
pub(crate) async fn _process_tx(
&self,
tx: TransactionView,
declared_cycles: Option<Cycle>,
command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
let wtx_hash = tx.witness_hash();
let instant = Instant::now();
let is_sync_process = command_rx.is_none();
let (ret, snapshot) = self.pre_check(&tx).await;
let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
if self.is_in_delay_window(&snapshot) {
let mut delay = self.delay.write().await;
if delay.len() < DELAY_LIMIT {
delay.insert(tx.proposal_short_id(), tx);
}
return None;
}
let verify_cache = self.fetch_tx_verify_cache(&tx).await;
let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
let tip_header = snapshot.tip_header();
let tx_env = Arc::new(status.with_env(tip_header));
let verified_ret = verify_rtx(
Arc::clone(&snapshot),
Arc::clone(&rtx),
tx_env,
&verify_cache,
max_cycles,
command_rx,
)
.await;
let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
if let Some(declared) = declared_cycles {
if declared != verified.cycles {
info!(
"process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
declared, verified.cycles, tx
);
return Some((
Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
snapshot,
));
}
}
let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, submit_snapshot);
self.notify_block_assembler(status).await;
if verify_cache.is_none() {
let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
tokio::spawn(async move {
let mut guard = txs_verify_cache.write().await;
guard.put(wtx_hash, verified);
});
}
if let Some(metrics) = ckb_metrics::handle() {
let elapsed = instant.elapsed().as_secs_f64();
if is_sync_process {
metrics.ckb_tx_pool_sync_process.observe(elapsed);
} else {
metrics.ckb_tx_pool_async_process.observe(elapsed);
}
}
Some((Ok(verified), submit_snapshot))
}
pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
let verify_cache = self.fetch_tx_verify_cache(&tx).await;
let max_cycles = self.consensus.max_block_cycles();
let tip_header = snapshot.tip_header();
let tx_env = Arc::new(status.with_env(tip_header));
verify_rtx(
Arc::clone(&snapshot),
Arc::clone(&rtx),
tx_env,
&verify_cache,
max_cycles,
None,
)
.await
}
pub(crate) async fn update_tx_pool_for_reorg(
&self,
detached_blocks: VecDeque<BlockView>,
attached_blocks: VecDeque<BlockView>,
detached_proposal_id: HashSet<ProposalShortId>,
snapshot: Arc<Snapshot>,
) {
let mine_mode = self.block_assembler.is_some();
let mut detached = LinkedHashSet::default();
let mut attached = LinkedHashSet::default();
let epoch_of_next_block = snapshot
.tip_header()
.epoch()
.minimum_epoch_number_after_n_blocks(1);
let new_tip_after_delay = after_delay_window(&snapshot);
let is_in_delay_window = self.is_in_delay_window(&snapshot);
let detached_headers: HashSet<Byte32> = detached_blocks
.iter()
.map(|blk| blk.header().hash())
.collect();
for blk in detached_blocks {
detached.extend(blk.transactions().into_iter().skip(1))
}
for blk in attached_blocks {
self.fee_estimator.commit_block(&blk);
attached.extend(blk.transactions().into_iter().skip(1));
}
let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
let fetched_cache = if is_in_delay_window {
HashMap::new()
} else {
self.fetch_txs_verify_cache(retain.iter()).await
};
let txs_opt = {
let mut tx_pool = self.tx_pool.write().await;
let txs_opt = if is_in_delay_window {
{
self.verify_queue.write().await.clear();
}
Some(tx_pool.drain_all_transactions())
} else {
None
};
_update_tx_pool_for_reorg(
&mut tx_pool,
&attached,
&detached_headers,
detached_proposal_id,
snapshot,
&self.callbacks,
mine_mode,
);
if !self.network.load_ckb2023()
&& self
.consensus
.hardfork_switch
.ckb2023
.is_vm_version_2_and_syscalls_3_enabled(epoch_of_next_block)
{
self.network.init_ckb2023()
}
self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
.await;
txs_opt
};
if let Some(txs) = txs_opt {
let mut delay = self.delay.write().await;
if delay.len() < DELAY_LIMIT {
for tx in txs {
delay.insert(tx.proposal_short_id(), tx);
}
}
}
{
let delay_txs = if !self.after_delay() && new_tip_after_delay {
let limit = MAX_BLOCK_PROPOSALS_LIMIT as usize;
let mut txs = Vec::with_capacity(limit);
let mut delay = self.delay.write().await;
let keys: Vec<_> = { delay.keys().take(limit).cloned().collect() };
for k in keys {
if let Some(v) = delay.remove(&k) {
txs.push(v);
}
}
if delay.is_empty() {
self.set_after_delay_true();
}
Some(txs)
} else {
None
};
if let Some(txs) = delay_txs {
self.try_process_txs(txs).await;
}
}
self.remove_orphan_txs_by_attach(&attached).await;
{
let mut queue = self.verify_queue.write().await;
queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
}
}
async fn enqueue_verify_queue(
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<bool, Reject> {
let mut queue = self.verify_queue.write().await;
queue.add_tx(tx, remote)
}
async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
for tx in txs.iter() {
self.process_orphan_tx(tx).await;
}
let mut orphan = self.orphan.write().await;
orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
}
async fn readd_detached_tx(
&self,
tx_pool: &mut TxPool,
txs: Vec<TransactionView>,
fetched_cache: HashMap<Byte32, CacheEntry>,
) {
let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
for tx in txs {
let tx_size = tx.data().serialized_size_in_block();
let tx_hash = tx.hash();
if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
let verify_cache = fetched_cache.get(&tx_hash).cloned();
let snapshot = tx_pool.cloned_snapshot();
let tip_header = snapshot.tip_header();
let tx_env = Arc::new(status.with_env(tip_header));
if let Ok(verified) = verify_rtx(
snapshot,
Arc::clone(&rtx),
tx_env,
&verify_cache,
max_cycles,
None,
)
.await
{
let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
} else {
debug!("readd_detached_tx submit_entry {}", tx_hash);
}
}
}
}
}
}
pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
{
let mut tx_pool = self.tx_pool.write().await;
tx_pool.clear(Arc::clone(&new_snapshot));
}
if self
.block_assembler_sender
.send(BlockAssemblerMessage::Reset(new_snapshot))
.await
.is_err()
{
error!("block_assembler receiver dropped");
}
}
pub(crate) async fn save_pool(&self) {
let mut tx_pool = self.tx_pool.write().await;
if let Err(err) = tx_pool.save_into_file() {
error!("failed to save pool, error: {:?}", err)
} else {
info!("TxPool saved successfully")
}
}
pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
self.fee_estimator.update_ibd_state(in_ibd);
}
pub(crate) async fn estimate_fee_rate(
&self,
estimate_mode: EstimateMode,
enable_fallback: bool,
) -> Result<FeeRate, AnyError> {
let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
match self
.fee_estimator
.estimate_fee_rate(estimate_mode, all_entry_info)
{
Ok(fee_rate) => Ok(fee_rate),
Err(err) => {
if enable_fallback {
let target_blocks =
FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
self.tx_pool
.read()
.await
.estimate_fee_rate(target_blocks)
.map_err(Into::into)
} else {
Err(err.into())
}
}
}
}
async fn try_process_txs(&self, txs: Vec<TransactionView>) {
if txs.is_empty() {
return;
}
let total = txs.len();
let mut count = 0usize;
for tx in txs {
let tx_hash = tx.hash();
if let Err(err) = self.process_tx(tx, None).await {
error!("failed to process {:#x}, error: {:?}", tx_hash, err);
count += 1;
}
}
if count != 0 {
info!("{}/{} transaction process failed.", count, total);
}
}
pub(crate) fn is_in_delay_window(&self, snapshot: &Snapshot) -> bool {
let epoch = snapshot.tip_header().epoch();
self.consensus.is_in_delay_window(&epoch)
}
}
type PreCheckedTx = (
Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
if snapshot.proposals().contains_proposed(short_id) {
TxStatus::Proposed
} else if snapshot.proposals().contains_gap(short_id) {
TxStatus::Gap
} else {
TxStatus::Fresh
}
}
fn check_rtx(
tx_pool: &TxPool,
snapshot: &Snapshot,
rtx: &ResolvedTransaction,
) -> Result<TxStatus, Reject> {
let short_id = rtx.transaction.proposal_short_id();
let tx_status = get_tx_status(snapshot, &short_id);
tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
}
fn resolve_tx(
tx_pool: &TxPool,
snapshot: &Snapshot,
tx: TransactionView,
rbf: bool,
) -> ResolveResult {
let short_id = tx.proposal_short_id();
let tx_status = get_tx_status(snapshot, &short_id);
tx_pool
.resolve_tx_from_pool(tx, rbf)
.map(|rtx| (rtx, tx_status))
}
fn _submit_entry(
tx_pool: &mut TxPool,
status: TxStatus,
entry: TxEntry,
callbacks: &Callbacks,
) -> Result<HashSet<TxEntry>, Reject> {
let tx_hash = entry.transaction().hash();
debug!("submit_entry {:?} {}", status, tx_hash);
let (succ, evicts) = match status {
TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
};
if succ {
match status {
TxStatus::Fresh => callbacks.call_pending(&entry),
TxStatus::Gap => callbacks.call_pending(&entry),
TxStatus::Proposed => callbacks.call_proposed(&entry),
}
}
Ok(evicts)
}
fn _update_tx_pool_for_reorg(
tx_pool: &mut TxPool,
attached: &LinkedHashSet<TransactionView>,
detached_headers: &HashSet<Byte32>,
detached_proposal_id: HashSet<ProposalShortId>,
snapshot: Arc<Snapshot>,
callbacks: &Callbacks,
mine_mode: bool,
) {
tx_pool.snapshot = Arc::clone(&snapshot);
tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
if mine_mode {
let mut proposals = Vec::new();
let mut gaps = Vec::new();
for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
let short_id = entry.inner.proposal_short_id();
if snapshot.proposals().contains_proposed(&short_id) {
proposals.push((short_id, entry.inner.clone()));
}
}
for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
let short_id = entry.inner.proposal_short_id();
let elem = (short_id.clone(), entry.inner.clone());
if snapshot.proposals().contains_proposed(&short_id) {
proposals.push(elem);
} else if snapshot.proposals().contains_gap(&short_id) {
gaps.push(elem);
}
}
for (id, entry) in proposals {
debug!("begin to proposed: {:x}", id);
if let Err(e) = tx_pool.proposed_rtx(&id) {
debug!(
"Failed to add proposed tx {}, reason: {}",
entry.transaction().hash(),
e
);
callbacks.call_reject(tx_pool, &entry, e);
} else {
callbacks.call_proposed(&entry)
}
}
for (id, entry) in gaps {
debug!("begin to gap: {:x}", id);
if let Err(e) = tx_pool.gap_rtx(&id) {
debug!(
"Failed to add tx to gap {}, reason: {}",
entry.transaction().hash(),
e
);
callbacks.call_reject(tx_pool, &entry, e.clone());
}
}
}
tx_pool.remove_expired(callbacks);
let _ = tx_pool.limit_size(callbacks, None);
}