ckb_tx_pool/
process.rs

1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10    after_delay_window, check_tx_fee, check_txid_collision, is_missing_input,
11    non_contextual_verify, time_relative_verify, verify_rtx,
12};
13use ckb_chain_spec::consensus::MAX_BLOCK_PROPOSALS_LIMIT;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_jsonrpc_types::BlockTemplate;
17use ckb_logger::Level::Trace;
18use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
19use ckb_network::PeerIndex;
20use ckb_script::ChunkCommand;
21use ckb_snapshot::Snapshot;
22use ckb_types::core::error::OutPointError;
23use ckb_types::{
24    core::{
25        BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
26        cell::ResolvedTransaction,
27    },
28    packed::{Byte32, ProposalShortId},
29};
30use ckb_util::LinkedHashSet;
31use ckb_verification::{
32    TxVerifyEnv,
33    cache::{CacheEntry, Completed},
34};
35use std::collections::HashSet;
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::watch;
40
41const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks
42
43/// A list for plug target for `plug_entry` method
44pub enum PlugTarget {
45    /// Pending pool
46    Pending,
47    /// Proposed pool
48    Proposed,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TxStatus {
53    Fresh,
54    Gap,
55    Proposed,
56}
57
58impl TxStatus {
59    fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
60        match self {
61            TxStatus::Fresh => TxVerifyEnv::new_submit(header),
62            TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
63            TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
64        }
65    }
66}
67
68impl TxPoolService {
69    pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
70        if let Some(ref block_assembler) = self.block_assembler {
71            Ok(block_assembler.get_current().await)
72        } else {
73            Err(InternalErrorKind::Config
74                .other("BlockAssembler disabled")
75                .into())
76        }
77    }
78
79    pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
80        let guard = self.txs_verify_cache.read().await;
81        guard.peek(&tx.witness_hash()).cloned()
82    }
83
84    async fn fetch_txs_verify_cache(
85        &self,
86        txs: impl Iterator<Item = &TransactionView>,
87    ) -> HashMap<Byte32, CacheEntry> {
88        let guard = self.txs_verify_cache.read().await;
89        txs.filter_map(|tx| {
90            let wtx_hash = tx.witness_hash();
91            guard
92                .peek(&wtx_hash)
93                .cloned()
94                .map(|value| (wtx_hash, value))
95        })
96        .collect()
97    }
98
99    pub(crate) async fn submit_entry(
100        &self,
101        pre_resolve_tip: Byte32,
102        entry: TxEntry,
103        mut status: TxStatus,
104    ) -> (Result<(), Reject>, Arc<Snapshot>) {
105        let (ret, snapshot) = self
106            .with_tx_pool_write_lock(move |tx_pool, snapshot| {
107                // check_rbf must be invoked in `write` lock to avoid concurrent issues.
108                let conflicts = if tx_pool.enable_rbf() {
109                    tx_pool.check_rbf(&snapshot, &entry)?
110                } else {
111                    // RBF is disabled but we found conflicts, return error here
112                    // after_process will put this tx into conflicts_pool
113                    let conflicted_outpoint =
114                        tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
115                    if let Some(outpoint) = conflicted_outpoint {
116                        return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
117                    }
118                    HashSet::new()
119                };
120
121                // if snapshot changed by context switch we need redo time_relative verify
122                let tip_hash = snapshot.tip_hash();
123                if pre_resolve_tip != tip_hash {
124                    debug!(
125                        "submit_entry {} context changed. previous:{} now:{}",
126                        entry.proposal_short_id(),
127                        pre_resolve_tip,
128                        tip_hash
129                    );
130
131                    // destructuring assignments are not currently supported
132                    status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
133
134                    let tip_header = snapshot.tip_header();
135                    let tx_env = status.with_env(tip_header);
136                    time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
137                }
138
139                let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
140                let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
141
142                // in a corner case, a tx with lower fee rate may be rejected immediately
143                // after inserting into pool, return proper reject error here
144                for evict in evicted {
145                    let reject = Reject::Invalidated(format!(
146                        "invalidated by tx {}",
147                        evict.transaction().hash()
148                    ));
149                    self.callbacks.call_reject(tx_pool, &evict, reject);
150                }
151
152                tx_pool.remove_conflict(&entry.proposal_short_id());
153                tx_pool
154                    .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
155                    .map_or(Ok(()), Err)?;
156
157                if !may_recovered_txs.is_empty() {
158                    let self_clone = self.clone();
159                    tokio::spawn(async move {
160                        // push the recovered txs back to verify queue, so that they can be verified and submitted again
161                        let mut queue = self_clone.verify_queue.write().await;
162                        for tx in may_recovered_txs {
163                            debug!("recover back: {:?}", tx.proposal_short_id());
164                            let _ = queue.add_tx(tx, None);
165                        }
166                    });
167                }
168                Ok(())
169            })
170            .await;
171
172        (ret, snapshot)
173    }
174
175    pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
176        if self.should_notify_block_assembler() {
177            match status {
178                TxStatus::Fresh => {
179                    if self
180                        .block_assembler_sender
181                        .send(BlockAssemblerMessage::Pending)
182                        .await
183                        .is_err()
184                    {
185                        error!("block_assembler receiver dropped");
186                    }
187                }
188                TxStatus::Proposed => {
189                    if self
190                        .block_assembler_sender
191                        .send(BlockAssemblerMessage::Proposed)
192                        .await
193                        .is_err()
194                    {
195                        error!("block_assembler receiver dropped");
196                    }
197                }
198                _ => {}
199            }
200        }
201    }
202
203    // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
204    // since they maybe not conflicted anymore
205    fn process_rbf(
206        &self,
207        tx_pool: &mut TxPool,
208        entry: &TxEntry,
209        conflicts: &HashSet<ProposalShortId>,
210    ) -> Vec<TransactionView> {
211        let mut may_recovered_txs = vec![];
212        let mut available_inputs = HashSet::new();
213
214        if conflicts.is_empty() {
215            return may_recovered_txs;
216        }
217
218        let all_removed: Vec<_> = conflicts
219            .iter()
220            .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
221            .collect();
222
223        available_inputs.extend(
224            all_removed
225                .iter()
226                .flat_map(|removed| removed.transaction().input_pts_iter()),
227        );
228
229        for input in entry.transaction().input_pts_iter() {
230            available_inputs.remove(&input);
231        }
232
233        may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
234        for old in all_removed {
235            debug!(
236                "remove conflict tx {} for RBF by new tx {}",
237                old.transaction().hash(),
238                entry.transaction().hash()
239            );
240            let reject =
241                Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
242
243            // RBF replace successfully, put old transactions into conflicts pool
244            tx_pool.record_conflict(old.transaction().clone());
245            // after removing old tx from tx_pool, we call reject callbacks manually
246            self.callbacks.call_reject(tx_pool, &old, reject);
247        }
248        assert!(!may_recovered_txs.contains(entry.transaction()));
249        may_recovered_txs
250    }
251
252    pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
253        let queue = self.verify_queue.read().await;
254        queue.contains_key(&tx.proposal_short_id())
255    }
256
257    pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
258        let orphan = self.orphan.read().await;
259        orphan.contains_key(&tx.proposal_short_id())
260    }
261
262    pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
263        &self,
264        mut f: F,
265    ) -> (U, Arc<Snapshot>) {
266        let tx_pool = self.tx_pool.read().await;
267        let snapshot = tx_pool.cloned_snapshot();
268
269        let ret = f(&tx_pool, Arc::clone(&snapshot));
270        (ret, snapshot)
271    }
272
273    pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
274        &self,
275        mut f: F,
276    ) -> (U, Arc<Snapshot>) {
277        let mut tx_pool = self.tx_pool.write().await;
278        let snapshot = tx_pool.cloned_snapshot();
279
280        let ret = f(&mut tx_pool, Arc::clone(&snapshot));
281        (ret, snapshot)
282    }
283
284    pub(crate) async fn pre_check(
285        &self,
286        tx: &TransactionView,
287    ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
288        // Acquire read lock for cheap check
289        let tx_size = tx.data().serialized_size_in_block();
290
291        let (ret, snapshot) = self
292            .with_tx_pool_read_lock(|tx_pool, snapshot| {
293                let tip_hash = snapshot.tip_hash();
294
295                // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc.
296                // It's also not possible for RBF, reject it directly
297                check_txid_collision(tx_pool, tx)?;
298
299                // Try normal path first, if double-spending check success we don't need RBF check
300                // this make sure RBF won't introduce extra performance cost for hot path
301                let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
302                match res {
303                    Ok((rtx, status)) => {
304                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
305                        Ok((tip_hash, rtx, status, fee, tx_size))
306                    }
307                    Err(Reject::Resolve(OutPointError::Dead(out))) => {
308                        let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
309                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
310                        let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
311                        if conflicts.is_none() {
312                            // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
313                            // we should reject it directly and don't need to put it into conflicts pool
314                            error!(
315                                "{} is resolved as Dead, but there is no conflicted tx",
316                                rtx.transaction.proposal_short_id()
317                            );
318                            return Err(Reject::Resolve(OutPointError::Dead(out)));
319                        }
320                        // we also return Ok here, so that the entry will be continue to be verified before submit
321                        // we only want to put it into conflicts pool after the verification stage passed
322                        // then we will double-check conflicts txs in `submit_entry`
323
324                        Ok((tip_hash, rtx, status, fee, tx_size))
325                    }
326                    Err(err) => Err(err),
327                }
328            })
329            .await;
330        (ret, snapshot)
331    }
332
333    pub(crate) async fn non_contextual_verify(
334        &self,
335        tx: &TransactionView,
336        remote: Option<(Cycle, PeerIndex)>,
337    ) -> Result<(), Reject> {
338        if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
339            if reject.is_malformed_tx() {
340                if let Some(remote) = remote {
341                    self.ban_malformed(remote.1, format!("reject {reject}"))
342                        .await;
343                }
344            }
345            return Err(reject);
346        }
347        Ok(())
348    }
349
350    pub(crate) async fn resumeble_process_tx(
351        &self,
352        tx: TransactionView,
353        remote: Option<(Cycle, PeerIndex)>,
354    ) -> Result<bool, Reject> {
355        // non contextual verify first
356        self.non_contextual_verify(&tx, remote).await?;
357
358        if self.orphan_contains(&tx).await {
359            debug!("reject tx {} already in orphan pool", tx.hash());
360            return Err(Reject::Duplicated(tx.hash()));
361        }
362
363        if self.verify_queue_contains(&tx).await {
364            return Err(Reject::Duplicated(tx.hash()));
365        }
366        self.enqueue_verify_queue(tx, remote).await
367    }
368
369    pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
370        // non contextual verify first
371        self.non_contextual_verify(&tx, None).await?;
372
373        if self.verify_queue_contains(&tx).await {
374            return Err(Reject::Duplicated(tx.hash()));
375        }
376
377        if self.orphan_contains(&tx).await {
378            debug!("reject tx {} already in orphan pool", tx.hash());
379            return Err(Reject::Duplicated(tx.hash()));
380        }
381        self._test_accept_tx(tx.clone()).await
382    }
383
384    pub(crate) async fn process_tx(
385        &self,
386        tx: TransactionView,
387        remote: Option<(Cycle, PeerIndex)>,
388    ) -> Result<Completed, Reject> {
389        // non contextual verify first
390        self.non_contextual_verify(&tx, remote).await?;
391
392        if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
393            return Err(Reject::Duplicated(tx.hash()));
394        }
395
396        if let Some((ret, snapshot)) = self
397            ._process_tx(tx.clone(), remote.map(|r| r.0), None)
398            .await
399        {
400            self.after_process(tx, remote, &snapshot, &ret).await;
401            ret
402        } else {
403            // currently, the returned cycles is not been used, mock 0 if delay
404            Ok(Completed {
405                cycles: 0,
406                fee: Capacity::zero(),
407            })
408        }
409    }
410
411    pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
412        let mut tx_pool = self.tx_pool.write().await;
413        if let Some(ref mut recent_reject) = tx_pool.recent_reject {
414            if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
415                error!(
416                    "Failed to record recent_reject {} {} {}",
417                    tx_hash, reject, e
418                );
419            }
420        }
421    }
422
423    pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
424        let id = ProposalShortId::from_tx_hash(&tx_hash);
425        {
426            let mut queue = self.verify_queue.write().await;
427            if queue.remove_tx(&id).is_some() {
428                return true;
429            }
430        }
431        {
432            let mut orphan = self.orphan.write().await;
433            if orphan.remove_orphan_tx(&id).is_some() {
434                return true;
435            }
436        }
437        let mut tx_pool = self.tx_pool.write().await;
438        tx_pool.remove_tx(&id)
439    }
440
441    pub(crate) async fn after_process(
442        &self,
443        tx: TransactionView,
444        remote: Option<(Cycle, PeerIndex)>,
445        snapshot: &Snapshot,
446        ret: &Result<Completed, Reject>,
447    ) {
448        let tx_hash = tx.hash();
449
450        // The network protocol is switched after tx-pool confirms the cache,
451        // there will be no problem with the current state as the choice of the broadcast protocol.
452        let with_vm_2023 = {
453            let epoch = snapshot
454                .tip_header()
455                .epoch()
456                .minimum_epoch_number_after_n_blocks(1);
457
458            self.consensus
459                .hardfork_switch
460                .ckb2023
461                .is_vm_version_2_and_syscalls_3_enabled(epoch)
462        };
463
464        // log tx verification result for monitor node
465        if log_enabled_target!("ckb_tx_monitor", Trace) {
466            if let Ok(c) = ret {
467                trace_target!(
468                    "ckb_tx_monitor",
469                    r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
470                    tx_hash,
471                    c.cycles
472                );
473            }
474        }
475
476        if matches!(
477            ret,
478            Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
479        ) {
480            let mut tx_pool = self.tx_pool.write().await;
481            if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
482                tx_pool.record_conflict(tx.clone());
483            }
484        }
485
486        match remote {
487            Some((declared_cycle, peer)) => match ret {
488                Ok(_) => {
489                    debug!(
490                        "after_process remote send_result_to_relayer {} {}",
491                        tx_hash, peer
492                    );
493                    self.send_result_to_relayer(TxVerificationResult::Ok {
494                        original_peer: Some(peer),
495                        with_vm_2023,
496                        tx_hash,
497                    });
498                    self.process_orphan_tx(&tx).await;
499                }
500                Err(reject) => {
501                    info!(
502                        "after_process {} {} remote reject: {} ",
503                        tx_hash, peer, reject
504                    );
505                    if is_missing_input(reject) {
506                        self.send_result_to_relayer(TxVerificationResult::UnknownParents {
507                            peer,
508                            parents: tx.unique_parents(),
509                        });
510                        self.add_orphan(tx, peer, declared_cycle).await;
511                    } else {
512                        if reject.is_malformed_tx() {
513                            self.ban_malformed(peer, format!("reject {reject}")).await;
514                        }
515                        if reject.is_allowed_relay() {
516                            self.send_result_to_relayer(TxVerificationResult::Reject {
517                                tx_hash: tx_hash.clone(),
518                            });
519                        }
520                        if reject.should_recorded() {
521                            self.put_recent_reject(&tx_hash, reject).await;
522                        }
523                    }
524                }
525            },
526            None => {
527                match ret {
528                    Ok(_) => {
529                        debug!("after_process local send_result_to_relayer {}", tx_hash);
530                        self.send_result_to_relayer(TxVerificationResult::Ok {
531                            original_peer: None,
532                            with_vm_2023,
533                            tx_hash,
534                        });
535                        self.process_orphan_tx(&tx).await;
536                    }
537                    Err(Reject::Duplicated(_)) => {
538                        debug!("after_process {} duplicated", tx_hash);
539                        // re-broadcast tx when it's duplicated and submitted through local rpc
540                        self.send_result_to_relayer(TxVerificationResult::Ok {
541                            original_peer: None,
542                            with_vm_2023,
543                            tx_hash,
544                        });
545                    }
546                    Err(reject) => {
547                        debug!("after_process {} reject: {} ", tx_hash, reject);
548                        if reject.should_recorded() {
549                            self.put_recent_reject(&tx_hash, reject).await;
550                        }
551                    }
552                }
553            }
554        }
555    }
556
557    pub(crate) async fn add_orphan(
558        &self,
559        tx: TransactionView,
560        peer: PeerIndex,
561        declared_cycle: Cycle,
562    ) {
563        let evicted_txs = self
564            .orphan
565            .write()
566            .await
567            .add_orphan_tx(tx, peer, declared_cycle);
568        // for any evicted orphan tx, we should send reject to relayer
569        // so that we mark it as `unknown` in filter
570        for tx_hash in evicted_txs {
571            self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
572        }
573    }
574
575    pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
576        let orphan = self.orphan.read().await;
577        orphan
578            .find_by_previous(tx)
579            .iter()
580            .filter_map(|id| orphan.get(id).cloned())
581            .collect::<Vec<_>>()
582    }
583
584    pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
585        self.orphan.write().await.remove_orphan_tx(id);
586    }
587
588    /// Remove all orphans which are resolved by the given transaction
589    /// the process is like a breath first search, if there is a cycle in `orphan_queue`,
590    /// `_process_tx` will return `Reject` since we have checked duplicated tx
591    pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
592        let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
593        orphan_queue.push_back(tx.clone());
594
595        while let Some(previous) = orphan_queue.pop_front() {
596            let orphans = self.find_orphan_by_previous(&previous).await;
597            for orphan in orphans.into_iter() {
598                if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
599                    debug!(
600                        "process_orphan {} added to verify queue; find previous from {}",
601                        orphan.tx.hash(),
602                        tx.hash(),
603                    );
604                    self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
605                    self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
606                        .await
607                        .expect("enqueue suspended tx");
608                } else if let Some((ret, snapshot)) = self
609                    ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
610                    .await
611                {
612                    match ret {
613                        Ok(_) => {
614                            let with_vm_2023 = {
615                                let epoch = snapshot
616                                    .tip_header()
617                                    .epoch()
618                                    .minimum_epoch_number_after_n_blocks(1);
619
620                                self.consensus
621                                    .hardfork_switch
622                                    .ckb2023
623                                    .is_vm_version_2_and_syscalls_3_enabled(epoch)
624                            };
625                            self.send_result_to_relayer(TxVerificationResult::Ok {
626                                original_peer: Some(orphan.peer),
627                                with_vm_2023,
628                                tx_hash: orphan.tx.hash(),
629                            });
630                            debug!(
631                                "process_orphan {} success, find previous from {}",
632                                orphan.tx.hash(),
633                                tx.hash()
634                            );
635                            self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
636                            orphan_queue.push_back(orphan.tx);
637                        }
638                        Err(reject) => {
639                            debug!(
640                                "process_orphan {} reject {}, find previous from {}",
641                                orphan.tx.hash(),
642                                reject,
643                                tx.hash(),
644                            );
645
646                            if !is_missing_input(&reject) {
647                                self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
648                                if reject.is_malformed_tx() {
649                                    self.ban_malformed(orphan.peer, format!("reject {reject}"))
650                                        .await;
651                                }
652                                if reject.is_allowed_relay() {
653                                    self.send_result_to_relayer(TxVerificationResult::Reject {
654                                        tx_hash: orphan.tx.hash(),
655                                    });
656                                }
657                                if reject.should_recorded() {
658                                    self.put_recent_reject(&orphan.tx.hash(), &reject).await;
659                                }
660                            }
661                        }
662                    }
663                }
664            }
665        }
666    }
667
668    pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
669        if let Err(e) = self.tx_relay_sender.send(result) {
670            error!("tx-pool tx_relay_sender internal error {}", e);
671        }
672    }
673
674    async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
675        const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
676
677        #[cfg(feature = "with_sentry")]
678        use sentry::{Level, capture_message, with_scope};
679
680        #[cfg(feature = "with_sentry")]
681        with_scope(
682            |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
683            || {
684                capture_message(
685                    &format!(
686                        "Ban peer {} for {} seconds, reason: \
687                        {}",
688                        peer,
689                        DEFAULT_BAN_TIME.as_secs(),
690                        reason
691                    ),
692                    Level::Info,
693                )
694            },
695        );
696        self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
697        self.verify_queue.write().await.remove_txs_by_peer(&peer);
698    }
699
700    pub(crate) async fn _process_tx(
701        &self,
702        tx: TransactionView,
703        declared_cycles: Option<Cycle>,
704        command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
705    ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
706        let wtx_hash = tx.witness_hash();
707        let instant = Instant::now();
708        let is_sync_process = command_rx.is_none();
709
710        let (ret, snapshot) = self.pre_check(&tx).await;
711
712        let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
713
714        if self.is_in_delay_window(&snapshot) {
715            let mut delay = self.delay.write().await;
716            if delay.len() < DELAY_LIMIT {
717                delay.insert(tx.proposal_short_id(), tx);
718            }
719            return None;
720        }
721
722        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
723        let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
724        let tip_header = snapshot.tip_header();
725        let tx_env = Arc::new(status.with_env(tip_header));
726
727        let verified_ret = verify_rtx(
728            Arc::clone(&snapshot),
729            Arc::clone(&rtx),
730            tx_env,
731            &verify_cache,
732            max_cycles,
733            command_rx,
734        )
735        .await;
736
737        let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
738
739        if let Some(declared) = declared_cycles {
740            if declared != verified.cycles {
741                info!(
742                    "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
743                    declared, verified.cycles, tx
744                );
745                return Some((
746                    Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
747                    snapshot,
748                ));
749            }
750        }
751
752        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
753
754        let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
755        try_or_return_with_snapshot!(ret, submit_snapshot);
756
757        self.notify_block_assembler(status).await;
758
759        if verify_cache.is_none() {
760            // update cache
761            let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
762            tokio::spawn(async move {
763                let mut guard = txs_verify_cache.write().await;
764                guard.put(wtx_hash, verified);
765            });
766        }
767
768        if let Some(metrics) = ckb_metrics::handle() {
769            let elapsed = instant.elapsed().as_secs_f64();
770            if is_sync_process {
771                metrics.ckb_tx_pool_sync_process.observe(elapsed);
772            } else {
773                metrics.ckb_tx_pool_async_process.observe(elapsed);
774            }
775        }
776
777        Some((Ok(verified), submit_snapshot))
778    }
779
780    pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
781        let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
782
783        let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
784
785        // skip check the delay window
786
787        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
788        let max_cycles = self.consensus.max_block_cycles();
789        let tip_header = snapshot.tip_header();
790        let tx_env = Arc::new(status.with_env(tip_header));
791
792        verify_rtx(
793            Arc::clone(&snapshot),
794            Arc::clone(&rtx),
795            tx_env,
796            &verify_cache,
797            max_cycles,
798            None,
799        )
800        .await
801    }
802
803    pub(crate) async fn update_tx_pool_for_reorg(
804        &self,
805        detached_blocks: VecDeque<BlockView>,
806        attached_blocks: VecDeque<BlockView>,
807        detached_proposal_id: HashSet<ProposalShortId>,
808        snapshot: Arc<Snapshot>,
809    ) {
810        let mine_mode = self.block_assembler.is_some();
811        let mut detached = LinkedHashSet::default();
812        let mut attached = LinkedHashSet::default();
813
814        let epoch_of_next_block = snapshot
815            .tip_header()
816            .epoch()
817            .minimum_epoch_number_after_n_blocks(1);
818
819        let new_tip_after_delay = after_delay_window(&snapshot);
820        let is_in_delay_window = self.is_in_delay_window(&snapshot);
821
822        let detached_headers: HashSet<Byte32> = detached_blocks
823            .iter()
824            .map(|blk| blk.header().hash())
825            .collect();
826
827        for blk in detached_blocks {
828            detached.extend(blk.transactions().into_iter().skip(1))
829        }
830
831        for blk in attached_blocks {
832            self.fee_estimator.commit_block(&blk);
833            attached.extend(blk.transactions().into_iter().skip(1));
834        }
835        let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
836
837        let fetched_cache = if is_in_delay_window {
838            // If in delay_window, don't use the cache.
839            HashMap::new()
840        } else {
841            self.fetch_txs_verify_cache(retain.iter()).await
842        };
843
844        // If there are any transactions requires re-process, return them.
845        //
846        // At present, there is only one situation:
847        // - If the hardfork was happened, then re-process all transactions.
848        let txs_opt = {
849            // This closure is used to limit the lifetime of mutable tx_pool.
850            let mut tx_pool = self.tx_pool.write().await;
851
852            let txs_opt = if is_in_delay_window {
853                {
854                    self.verify_queue.write().await.clear();
855                }
856                Some(tx_pool.drain_all_transactions())
857            } else {
858                None
859            };
860
861            _update_tx_pool_for_reorg(
862                &mut tx_pool,
863                &attached,
864                &detached_headers,
865                detached_proposal_id,
866                snapshot,
867                &self.callbacks,
868                mine_mode,
869            );
870
871            // Updates network fork switch if required.
872            //
873            // This operation should be ahead of any transaction which is processed with new
874            // hardfork features.
875            if !self.network.load_ckb2023()
876                && self
877                    .consensus
878                    .hardfork_switch
879                    .ckb2023
880                    .is_vm_version_2_and_syscalls_3_enabled(epoch_of_next_block)
881            {
882                self.network.init_ckb2023()
883            }
884
885            // notice: readd_detached_tx don't update cache
886            self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
887                .await;
888
889            txs_opt
890        };
891
892        if let Some(txs) = txs_opt {
893            let mut delay = self.delay.write().await;
894            if delay.len() < DELAY_LIMIT {
895                for tx in txs {
896                    delay.insert(tx.proposal_short_id(), tx);
897                }
898            }
899        }
900
901        {
902            let delay_txs = if !self.after_delay() && new_tip_after_delay {
903                let limit = MAX_BLOCK_PROPOSALS_LIMIT as usize;
904                let mut txs = Vec::with_capacity(limit);
905                let mut delay = self.delay.write().await;
906                let keys: Vec<_> = { delay.keys().take(limit).cloned().collect() };
907                for k in keys {
908                    if let Some(v) = delay.remove(&k) {
909                        txs.push(v);
910                    }
911                }
912                if delay.is_empty() {
913                    self.set_after_delay_true();
914                }
915                Some(txs)
916            } else {
917                None
918            };
919            if let Some(txs) = delay_txs {
920                self.try_process_txs(txs).await;
921            }
922        }
923
924        self.remove_orphan_txs_by_attach(&attached).await;
925        {
926            let mut queue = self.verify_queue.write().await;
927            queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
928        }
929    }
930
931    async fn enqueue_verify_queue(
932        &self,
933        tx: TransactionView,
934        remote: Option<(Cycle, PeerIndex)>,
935    ) -> Result<bool, Reject> {
936        let mut queue = self.verify_queue.write().await;
937        queue.add_tx(tx, remote)
938    }
939
940    async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
941        for tx in txs.iter() {
942            self.process_orphan_tx(tx).await;
943        }
944        let mut orphan = self.orphan.write().await;
945        orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
946    }
947
948    async fn readd_detached_tx(
949        &self,
950        tx_pool: &mut TxPool,
951        txs: Vec<TransactionView>,
952        fetched_cache: HashMap<Byte32, CacheEntry>,
953    ) {
954        let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
955        for tx in txs {
956            let tx_size = tx.data().serialized_size_in_block();
957            let tx_hash = tx.hash();
958            if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
959                if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
960                    let verify_cache = fetched_cache.get(&tx_hash).cloned();
961                    let snapshot = tx_pool.cloned_snapshot();
962                    let tip_header = snapshot.tip_header();
963                    let tx_env = Arc::new(status.with_env(tip_header));
964                    if let Ok(verified) = verify_rtx(
965                        snapshot,
966                        Arc::clone(&rtx),
967                        tx_env,
968                        &verify_cache,
969                        max_cycles,
970                        None,
971                    )
972                    .await
973                    {
974                        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
975                        if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
976                            error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
977                        } else {
978                            debug!("readd_detached_tx submit_entry {}", tx_hash);
979                        }
980                    }
981                }
982            }
983        }
984    }
985
986    pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
987        {
988            let mut tx_pool = self.tx_pool.write().await;
989            tx_pool.clear(Arc::clone(&new_snapshot));
990        }
991        // reset block_assembler
992        if self
993            .block_assembler_sender
994            .send(BlockAssemblerMessage::Reset(new_snapshot))
995            .await
996            .is_err()
997        {
998            error!("block_assembler receiver dropped");
999        }
1000    }
1001
1002    pub(crate) async fn save_pool(&self) {
1003        let mut tx_pool = self.tx_pool.write().await;
1004        if let Err(err) = tx_pool.save_into_file() {
1005            error!("failed to save pool, error: {:?}", err)
1006        } else {
1007            info!("TxPool saved successfully")
1008        }
1009    }
1010
1011    pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
1012        self.fee_estimator.update_ibd_state(in_ibd);
1013    }
1014
1015    pub(crate) async fn estimate_fee_rate(
1016        &self,
1017        estimate_mode: EstimateMode,
1018        enable_fallback: bool,
1019    ) -> Result<FeeRate, AnyError> {
1020        let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
1021        match self
1022            .fee_estimator
1023            .estimate_fee_rate(estimate_mode, all_entry_info)
1024        {
1025            Ok(fee_rate) => Ok(fee_rate),
1026            Err(err) => {
1027                if enable_fallback {
1028                    let target_blocks =
1029                        FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
1030                    self.tx_pool
1031                        .read()
1032                        .await
1033                        .estimate_fee_rate(target_blocks)
1034                        .map_err(Into::into)
1035                } else {
1036                    Err(err.into())
1037                }
1038            }
1039        }
1040    }
1041
1042    // # Notice
1043    //
1044    // This method assumes that the inputs transactions are sorted.
1045    async fn try_process_txs(&self, txs: Vec<TransactionView>) {
1046        if txs.is_empty() {
1047            return;
1048        }
1049        let total = txs.len();
1050        let mut count = 0usize;
1051        for tx in txs {
1052            let tx_hash = tx.hash();
1053            if let Err(err) = self.process_tx(tx, None).await {
1054                error!("failed to process {:#x}, error: {:?}", tx_hash, err);
1055                count += 1;
1056            }
1057        }
1058        if count != 0 {
1059            info!("{}/{} transaction process failed.", count, total);
1060        }
1061    }
1062
1063    pub(crate) fn is_in_delay_window(&self, snapshot: &Snapshot) -> bool {
1064        let epoch = snapshot.tip_header().epoch();
1065        self.consensus.is_in_delay_window(&epoch)
1066    }
1067}
1068
1069type PreCheckedTx = (
1070    Byte32,                   // tip_hash
1071    Arc<ResolvedTransaction>, // rtx
1072    TxStatus,                 // status
1073    Capacity,                 // tx fee
1074    usize,                    // tx size
1075);
1076
1077type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
1078
1079fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
1080    if snapshot.proposals().contains_proposed(short_id) {
1081        TxStatus::Proposed
1082    } else if snapshot.proposals().contains_gap(short_id) {
1083        TxStatus::Gap
1084    } else {
1085        TxStatus::Fresh
1086    }
1087}
1088
1089fn check_rtx(
1090    tx_pool: &TxPool,
1091    snapshot: &Snapshot,
1092    rtx: &ResolvedTransaction,
1093) -> Result<TxStatus, Reject> {
1094    let short_id = rtx.transaction.proposal_short_id();
1095    let tx_status = get_tx_status(snapshot, &short_id);
1096    tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
1097}
1098
1099fn resolve_tx(
1100    tx_pool: &TxPool,
1101    snapshot: &Snapshot,
1102    tx: TransactionView,
1103    rbf: bool,
1104) -> ResolveResult {
1105    let short_id = tx.proposal_short_id();
1106    let tx_status = get_tx_status(snapshot, &short_id);
1107    tx_pool
1108        .resolve_tx_from_pool(tx, rbf)
1109        .map(|rtx| (rtx, tx_status))
1110}
1111
1112fn _submit_entry(
1113    tx_pool: &mut TxPool,
1114    status: TxStatus,
1115    entry: TxEntry,
1116    callbacks: &Callbacks,
1117) -> Result<HashSet<TxEntry>, Reject> {
1118    let tx_hash = entry.transaction().hash();
1119    debug!("submit_entry {:?} {}", status, tx_hash);
1120    let (succ, evicts) = match status {
1121        TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
1122        TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
1123        TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
1124    };
1125    if succ {
1126        match status {
1127            TxStatus::Fresh => callbacks.call_pending(&entry),
1128            TxStatus::Gap => callbacks.call_pending(&entry),
1129            TxStatus::Proposed => callbacks.call_proposed(&entry),
1130        }
1131    }
1132    Ok(evicts)
1133}
1134
1135fn _update_tx_pool_for_reorg(
1136    tx_pool: &mut TxPool,
1137    attached: &LinkedHashSet<TransactionView>,
1138    detached_headers: &HashSet<Byte32>,
1139    detached_proposal_id: HashSet<ProposalShortId>,
1140    snapshot: Arc<Snapshot>,
1141    callbacks: &Callbacks,
1142    mine_mode: bool,
1143) {
1144    tx_pool.snapshot = Arc::clone(&snapshot);
1145
1146    // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
1147    // pending-pool if they can be found within txpool. As for a transaction
1148    // which is both expired and committed at the one time(commit at its end of commit-window),
1149    // we should treat it as a committed and not re-put into pending-pool. So we should ensure
1150    // that involves `remove_committed_txs` before `remove_expired`.
1151    tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1152    tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1153
1154    // mine mode:
1155    // pending ---> gap ----> proposed
1156    // try move gap to proposed
1157    if mine_mode {
1158        let mut proposals = Vec::new();
1159        let mut gaps = Vec::new();
1160
1161        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1162            let short_id = entry.inner.proposal_short_id();
1163            if snapshot.proposals().contains_proposed(&short_id) {
1164                proposals.push((short_id, entry.inner.clone()));
1165            }
1166        }
1167
1168        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1169            let short_id = entry.inner.proposal_short_id();
1170            let elem = (short_id.clone(), entry.inner.clone());
1171            if snapshot.proposals().contains_proposed(&short_id) {
1172                proposals.push(elem);
1173            } else if snapshot.proposals().contains_gap(&short_id) {
1174                gaps.push(elem);
1175            }
1176        }
1177
1178        for (id, entry) in proposals {
1179            debug!("begin to proposed: {:x}", id);
1180            if let Err(e) = tx_pool.proposed_rtx(&id) {
1181                debug!(
1182                    "Failed to add proposed tx {}, reason: {}",
1183                    entry.transaction().hash(),
1184                    e
1185                );
1186                callbacks.call_reject(tx_pool, &entry, e);
1187            } else {
1188                callbacks.call_proposed(&entry)
1189            }
1190        }
1191
1192        for (id, entry) in gaps {
1193            debug!("begin to gap: {:x}", id);
1194            if let Err(e) = tx_pool.gap_rtx(&id) {
1195                debug!(
1196                    "Failed to add tx to gap {}, reason: {}",
1197                    entry.transaction().hash(),
1198                    e
1199                );
1200                callbacks.call_reject(tx_pool, &entry, e.clone());
1201            }
1202        }
1203    }
1204
1205    // Remove expired transaction from pending
1206    tx_pool.remove_expired(callbacks);
1207
1208    // Remove transactions from the pool until its size <= size_limit.
1209    let _ = tx_pool.limit_size(callbacks, None);
1210}