ckb_tx_pool/
process.rs

1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10    after_delay_window, check_tx_fee, check_txid_collision, is_missing_input,
11    non_contextual_verify, time_relative_verify, verify_rtx,
12};
13use ckb_chain_spec::consensus::MAX_BLOCK_PROPOSALS_LIMIT;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_jsonrpc_types::BlockTemplate;
17use ckb_logger::Level::Trace;
18use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
19use ckb_network::PeerIndex;
20use ckb_script::ChunkCommand;
21use ckb_snapshot::Snapshot;
22use ckb_types::core::error::OutPointError;
23use ckb_types::{
24    core::{
25        cell::ResolvedTransaction, BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView,
26        TransactionView,
27    },
28    packed::{Byte32, ProposalShortId},
29};
30use ckb_util::LinkedHashSet;
31use ckb_verification::{
32    cache::{CacheEntry, Completed},
33    TxVerifyEnv,
34};
35use std::collections::HashSet;
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::watch;
40
41const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks
42
43/// A list for plug target for `plug_entry` method
44pub enum PlugTarget {
45    /// Pending pool
46    Pending,
47    /// Proposed pool
48    Proposed,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TxStatus {
53    Fresh,
54    Gap,
55    Proposed,
56}
57
58impl TxStatus {
59    fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
60        match self {
61            TxStatus::Fresh => TxVerifyEnv::new_submit(header),
62            TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
63            TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
64        }
65    }
66}
67
68impl TxPoolService {
69    pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
70        if let Some(ref block_assembler) = self.block_assembler {
71            Ok(block_assembler.get_current().await)
72        } else {
73            Err(InternalErrorKind::Config
74                .other("BlockAssembler disabled")
75                .into())
76        }
77    }
78
79    pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
80        let guard = self.txs_verify_cache.read().await;
81        guard.peek(&tx.witness_hash()).cloned()
82    }
83
84    async fn fetch_txs_verify_cache(
85        &self,
86        txs: impl Iterator<Item = &TransactionView>,
87    ) -> HashMap<Byte32, CacheEntry> {
88        let guard = self.txs_verify_cache.read().await;
89        txs.filter_map(|tx| {
90            let wtx_hash = tx.witness_hash();
91            guard
92                .peek(&wtx_hash)
93                .cloned()
94                .map(|value| (wtx_hash, value))
95        })
96        .collect()
97    }
98
99    pub(crate) async fn submit_entry(
100        &self,
101        pre_resolve_tip: Byte32,
102        entry: TxEntry,
103        mut status: TxStatus,
104    ) -> (Result<(), Reject>, Arc<Snapshot>) {
105        let (ret, snapshot) = self
106            .with_tx_pool_write_lock(move |tx_pool, snapshot| {
107                // check_rbf must be invoked in `write` lock to avoid concurrent issues.
108                let conflicts = if tx_pool.enable_rbf() {
109                    tx_pool.check_rbf(&snapshot, &entry)?
110                } else {
111                    // RBF is disabled but we found conflicts, return error here
112                    // after_process will put this tx into conflicts_pool
113                    let conflicted_outpoint =
114                        tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
115                    if let Some(outpoint) = conflicted_outpoint {
116                        return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
117                    }
118                    HashSet::new()
119                };
120
121                // if snapshot changed by context switch we need redo time_relative verify
122                let tip_hash = snapshot.tip_hash();
123                if pre_resolve_tip != tip_hash {
124                    debug!(
125                        "submit_entry {} context changed. previous:{} now:{}",
126                        entry.proposal_short_id(),
127                        pre_resolve_tip,
128                        tip_hash
129                    );
130
131                    // destructuring assignments are not currently supported
132                    status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
133
134                    let tip_header = snapshot.tip_header();
135                    let tx_env = status.with_env(tip_header);
136                    time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
137                }
138
139                let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
140                let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
141
142                // in a corner case, a tx with lower fee rate may be rejected immediately
143                // after inserting into pool, return proper reject error here
144                for evict in evicted {
145                    let reject = Reject::Invalidated(format!(
146                        "invalidated by tx {}",
147                        evict.transaction().hash()
148                    ));
149                    self.callbacks.call_reject(tx_pool, &evict, reject);
150                }
151
152                tx_pool.remove_conflict(&entry.proposal_short_id());
153                tx_pool
154                    .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
155                    .map_or(Ok(()), Err)?;
156
157                if !may_recovered_txs.is_empty() {
158                    let self_clone = self.clone();
159                    tokio::spawn(async move {
160                        // push the recovered txs back to verify queue, so that they can be verified and submitted again
161                        let mut queue = self_clone.verify_queue.write().await;
162                        for tx in may_recovered_txs {
163                            debug!("recover back: {:?}", tx.proposal_short_id());
164                            let _ = queue.add_tx(tx, None);
165                        }
166                    });
167                }
168                Ok(())
169            })
170            .await;
171
172        (ret, snapshot)
173    }
174
175    pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
176        if self.should_notify_block_assembler() {
177            match status {
178                TxStatus::Fresh => {
179                    if self
180                        .block_assembler_sender
181                        .send(BlockAssemblerMessage::Pending)
182                        .await
183                        .is_err()
184                    {
185                        error!("block_assembler receiver dropped");
186                    }
187                }
188                TxStatus::Proposed => {
189                    if self
190                        .block_assembler_sender
191                        .send(BlockAssemblerMessage::Proposed)
192                        .await
193                        .is_err()
194                    {
195                        error!("block_assembler receiver dropped");
196                    }
197                }
198                _ => {}
199            }
200        }
201    }
202
203    // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
204    // since they maybe not conflicted anymore
205    fn process_rbf(
206        &self,
207        tx_pool: &mut TxPool,
208        entry: &TxEntry,
209        conflicts: &HashSet<ProposalShortId>,
210    ) -> Vec<TransactionView> {
211        let mut may_recovered_txs = vec![];
212        let mut available_inputs = HashSet::new();
213
214        if conflicts.is_empty() {
215            return may_recovered_txs;
216        }
217
218        let all_removed: Vec<_> = conflicts
219            .iter()
220            .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
221            .collect();
222
223        available_inputs.extend(
224            all_removed
225                .iter()
226                .flat_map(|removed| removed.transaction().input_pts_iter()),
227        );
228
229        for input in entry.transaction().input_pts_iter() {
230            available_inputs.remove(&input);
231        }
232
233        may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
234        for old in all_removed {
235            debug!(
236                "remove conflict tx {} for RBF by new tx {}",
237                old.transaction().hash(),
238                entry.transaction().hash()
239            );
240            let reject =
241                Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
242
243            // RBF replace successfully, put old transactions into conflicts pool
244            tx_pool.record_conflict(old.transaction().clone());
245            // after removing old tx from tx_pool, we call reject callbacks manually
246            self.callbacks.call_reject(tx_pool, &old, reject);
247        }
248        assert!(!may_recovered_txs.contains(entry.transaction()));
249        may_recovered_txs
250    }
251
252    pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
253        let queue = self.verify_queue.read().await;
254        queue.contains_key(&tx.proposal_short_id())
255    }
256
257    pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
258        let orphan = self.orphan.read().await;
259        orphan.contains_key(&tx.proposal_short_id())
260    }
261
262    pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
263        &self,
264        mut f: F,
265    ) -> (U, Arc<Snapshot>) {
266        let tx_pool = self.tx_pool.read().await;
267        let snapshot = tx_pool.cloned_snapshot();
268
269        let ret = f(&tx_pool, Arc::clone(&snapshot));
270        (ret, snapshot)
271    }
272
273    pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
274        &self,
275        mut f: F,
276    ) -> (U, Arc<Snapshot>) {
277        let mut tx_pool = self.tx_pool.write().await;
278        let snapshot = tx_pool.cloned_snapshot();
279
280        let ret = f(&mut tx_pool, Arc::clone(&snapshot));
281        (ret, snapshot)
282    }
283
284    pub(crate) async fn pre_check(
285        &self,
286        tx: &TransactionView,
287    ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
288        // Acquire read lock for cheap check
289        let tx_size = tx.data().serialized_size_in_block();
290
291        let (ret, snapshot) = self
292            .with_tx_pool_read_lock(|tx_pool, snapshot| {
293                let tip_hash = snapshot.tip_hash();
294
295                // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc.
296                // It's also not possible for RBF, reject it directly
297                check_txid_collision(tx_pool, tx)?;
298
299                // Try normal path first, if double-spending check success we don't need RBF check
300                // this make sure RBF won't introduce extra performance cost for hot path
301                let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
302                match res {
303                    Ok((rtx, status)) => {
304                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
305                        Ok((tip_hash, rtx, status, fee, tx_size))
306                    }
307                    Err(Reject::Resolve(OutPointError::Dead(out))) => {
308                        let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
309                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
310                        let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
311                        if conflicts.is_none() {
312                            // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
313                            // we should reject it directly and don't need to put it into conflicts pool
314                            error!(
315                                "{} is resolved as Dead, but there is no conflicted tx",
316                                rtx.transaction.proposal_short_id()
317                            );
318                            return Err(Reject::Resolve(OutPointError::Dead(out)));
319                        }
320                        // we also return Ok here, so that the entry will be continue to be verified before submit
321                        // we only want to put it into conflicts pool after the verification stage passed
322                        // then we will double-check conflicts txs in `submit_entry`
323
324                        Ok((tip_hash, rtx, status, fee, tx_size))
325                    }
326                    Err(err) => Err(err),
327                }
328            })
329            .await;
330        (ret, snapshot)
331    }
332
333    pub(crate) fn non_contextual_verify(
334        &self,
335        tx: &TransactionView,
336        remote: Option<(Cycle, PeerIndex)>,
337    ) -> Result<(), Reject> {
338        if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
339            if reject.is_malformed_tx() {
340                if let Some(remote) = remote {
341                    self.ban_malformed(remote.1, format!("reject {reject}"));
342                }
343            }
344            return Err(reject);
345        }
346        Ok(())
347    }
348
349    pub(crate) async fn resumeble_process_tx(
350        &self,
351        tx: TransactionView,
352        remote: Option<(Cycle, PeerIndex)>,
353    ) -> Result<bool, Reject> {
354        // non contextual verify first
355        self.non_contextual_verify(&tx, remote)?;
356
357        if self.orphan_contains(&tx).await {
358            debug!("reject tx {} already in orphan pool", tx.hash());
359            return Err(Reject::Duplicated(tx.hash()));
360        }
361
362        if self.verify_queue_contains(&tx).await {
363            return Err(Reject::Duplicated(tx.hash()));
364        }
365        self.enqueue_verify_queue(tx, remote).await
366    }
367
368    pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
369        // non contextual verify first
370        self.non_contextual_verify(&tx, None)?;
371
372        if self.verify_queue_contains(&tx).await {
373            return Err(Reject::Duplicated(tx.hash()));
374        }
375
376        if self.orphan_contains(&tx).await {
377            debug!("reject tx {} already in orphan pool", tx.hash());
378            return Err(Reject::Duplicated(tx.hash()));
379        }
380        self._test_accept_tx(tx.clone()).await
381    }
382
383    pub(crate) async fn process_tx(
384        &self,
385        tx: TransactionView,
386        remote: Option<(Cycle, PeerIndex)>,
387    ) -> Result<Completed, Reject> {
388        // non contextual verify first
389        self.non_contextual_verify(&tx, remote)?;
390
391        if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
392            return Err(Reject::Duplicated(tx.hash()));
393        }
394
395        if let Some((ret, snapshot)) = self
396            ._process_tx(tx.clone(), remote.map(|r| r.0), None)
397            .await
398        {
399            self.after_process(tx, remote, &snapshot, &ret).await;
400            ret
401        } else {
402            // currently, the returned cycles is not been used, mock 0 if delay
403            Ok(Completed {
404                cycles: 0,
405                fee: Capacity::zero(),
406            })
407        }
408    }
409
410    pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
411        let mut tx_pool = self.tx_pool.write().await;
412        if let Some(ref mut recent_reject) = tx_pool.recent_reject {
413            if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
414                error!(
415                    "Failed to record recent_reject {} {} {}",
416                    tx_hash, reject, e
417                );
418            }
419        }
420    }
421
422    pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
423        let id = ProposalShortId::from_tx_hash(&tx_hash);
424        {
425            let mut queue = self.verify_queue.write().await;
426            if queue.remove_tx(&id).is_some() {
427                return true;
428            }
429        }
430        {
431            let mut orphan = self.orphan.write().await;
432            if orphan.remove_orphan_tx(&id).is_some() {
433                return true;
434            }
435        }
436        let mut tx_pool = self.tx_pool.write().await;
437        tx_pool.remove_tx(&id)
438    }
439
440    pub(crate) async fn after_process(
441        &self,
442        tx: TransactionView,
443        remote: Option<(Cycle, PeerIndex)>,
444        snapshot: &Snapshot,
445        ret: &Result<Completed, Reject>,
446    ) {
447        let tx_hash = tx.hash();
448
449        // The network protocol is switched after tx-pool confirms the cache,
450        // there will be no problem with the current state as the choice of the broadcast protocol.
451        let with_vm_2023 = {
452            let epoch = snapshot
453                .tip_header()
454                .epoch()
455                .minimum_epoch_number_after_n_blocks(1);
456
457            self.consensus
458                .hardfork_switch
459                .ckb2023
460                .is_vm_version_2_and_syscalls_3_enabled(epoch)
461        };
462
463        // log tx verification result for monitor node
464        if log_enabled_target!("ckb_tx_monitor", Trace) {
465            if let Ok(c) = ret {
466                trace_target!(
467                    "ckb_tx_monitor",
468                    r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
469                    tx_hash,
470                    c.cycles
471                );
472            }
473        }
474
475        if matches!(
476            ret,
477            Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
478        ) {
479            let mut tx_pool = self.tx_pool.write().await;
480            if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
481                tx_pool.record_conflict(tx.clone());
482            }
483        }
484
485        match remote {
486            Some((declared_cycle, peer)) => match ret {
487                Ok(_) => {
488                    debug!(
489                        "after_process remote send_result_to_relayer {} {}",
490                        tx_hash, peer
491                    );
492                    self.send_result_to_relayer(TxVerificationResult::Ok {
493                        original_peer: Some(peer),
494                        with_vm_2023,
495                        tx_hash,
496                    });
497                    self.process_orphan_tx(&tx).await;
498                }
499                Err(reject) => {
500                    info!(
501                        "after_process {} {} remote reject: {} ",
502                        tx_hash, peer, reject
503                    );
504                    if is_missing_input(reject) {
505                        self.send_result_to_relayer(TxVerificationResult::UnknownParents {
506                            peer,
507                            parents: tx.unique_parents(),
508                        });
509                        self.add_orphan(tx, peer, declared_cycle).await;
510                    } else {
511                        if reject.is_malformed_tx() {
512                            self.ban_malformed(peer, format!("reject {reject}"));
513                        }
514                        if reject.is_allowed_relay() {
515                            self.send_result_to_relayer(TxVerificationResult::Reject {
516                                tx_hash: tx_hash.clone(),
517                            });
518                        }
519                        if reject.should_recorded() {
520                            self.put_recent_reject(&tx_hash, reject).await;
521                        }
522                    }
523                }
524            },
525            None => {
526                match ret {
527                    Ok(_) => {
528                        debug!("after_process local send_result_to_relayer {}", tx_hash);
529                        self.send_result_to_relayer(TxVerificationResult::Ok {
530                            original_peer: None,
531                            with_vm_2023,
532                            tx_hash,
533                        });
534                        self.process_orphan_tx(&tx).await;
535                    }
536                    Err(Reject::Duplicated(_)) => {
537                        debug!("after_process {} duplicated", tx_hash);
538                        // re-broadcast tx when it's duplicated and submitted through local rpc
539                        self.send_result_to_relayer(TxVerificationResult::Ok {
540                            original_peer: None,
541                            with_vm_2023,
542                            tx_hash,
543                        });
544                    }
545                    Err(reject) => {
546                        debug!("after_process {} reject: {} ", tx_hash, reject);
547                        if reject.should_recorded() {
548                            self.put_recent_reject(&tx_hash, reject).await;
549                        }
550                    }
551                }
552            }
553        }
554    }
555
556    pub(crate) async fn add_orphan(
557        &self,
558        tx: TransactionView,
559        peer: PeerIndex,
560        declared_cycle: Cycle,
561    ) {
562        let evicted_txs = self
563            .orphan
564            .write()
565            .await
566            .add_orphan_tx(tx, peer, declared_cycle);
567        // for any evicted orphan tx, we should send reject to relayer
568        // so that we mark it as `unknown` in filter
569        for tx_hash in evicted_txs {
570            self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
571        }
572    }
573
574    pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
575        let orphan = self.orphan.read().await;
576        orphan
577            .find_by_previous(tx)
578            .iter()
579            .filter_map(|id| orphan.get(id).cloned())
580            .collect::<Vec<_>>()
581    }
582
583    pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
584        self.orphan.write().await.remove_orphan_tx(id);
585    }
586
587    /// Remove all orphans which are resolved by the given transaction
588    /// the process is like a breath first search, if there is a cycle in `orphan_queue`,
589    /// `_process_tx` will return `Reject` since we have checked duplicated tx
590    pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
591        let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
592        orphan_queue.push_back(tx.clone());
593
594        while let Some(previous) = orphan_queue.pop_front() {
595            let orphans = self.find_orphan_by_previous(&previous).await;
596            for orphan in orphans.into_iter() {
597                if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
598                    debug!(
599                        "process_orphan {} added to verify queue; find previous from {}",
600                        orphan.tx.hash(),
601                        tx.hash(),
602                    );
603                    self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
604                    self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
605                        .await
606                        .expect("enqueue suspended tx");
607                } else if let Some((ret, snapshot)) = self
608                    ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
609                    .await
610                {
611                    match ret {
612                        Ok(_) => {
613                            let with_vm_2023 = {
614                                let epoch = snapshot
615                                    .tip_header()
616                                    .epoch()
617                                    .minimum_epoch_number_after_n_blocks(1);
618
619                                self.consensus
620                                    .hardfork_switch
621                                    .ckb2023
622                                    .is_vm_version_2_and_syscalls_3_enabled(epoch)
623                            };
624                            self.send_result_to_relayer(TxVerificationResult::Ok {
625                                original_peer: Some(orphan.peer),
626                                with_vm_2023,
627                                tx_hash: orphan.tx.hash(),
628                            });
629                            debug!(
630                                "process_orphan {} success, find previous from {}",
631                                orphan.tx.hash(),
632                                tx.hash()
633                            );
634                            self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
635                            orphan_queue.push_back(orphan.tx);
636                        }
637                        Err(reject) => {
638                            debug!(
639                                "process_orphan {} reject {}, find previous from {}",
640                                orphan.tx.hash(),
641                                reject,
642                                tx.hash(),
643                            );
644
645                            if !is_missing_input(&reject) {
646                                self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
647                                if reject.is_malformed_tx() {
648                                    self.ban_malformed(orphan.peer, format!("reject {reject}"));
649                                }
650                                if reject.is_allowed_relay() {
651                                    self.send_result_to_relayer(TxVerificationResult::Reject {
652                                        tx_hash: orphan.tx.hash(),
653                                    });
654                                }
655                                if reject.should_recorded() {
656                                    self.put_recent_reject(&orphan.tx.hash(), &reject).await;
657                                }
658                            }
659                        }
660                    }
661                }
662            }
663        }
664    }
665
666    pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
667        if let Err(e) = self.tx_relay_sender.send(result) {
668            error!("tx-pool tx_relay_sender internal error {}", e);
669        }
670    }
671
672    fn ban_malformed(&self, peer: PeerIndex, reason: String) {
673        const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
674
675        #[cfg(feature = "with_sentry")]
676        use sentry::{capture_message, with_scope, Level};
677
678        #[cfg(feature = "with_sentry")]
679        with_scope(
680            |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
681            || {
682                capture_message(
683                    &format!(
684                        "Ban peer {} for {} seconds, reason: \
685                        {}",
686                        peer,
687                        DEFAULT_BAN_TIME.as_secs(),
688                        reason
689                    ),
690                    Level::Info,
691                )
692            },
693        );
694        self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
695    }
696
697    pub(crate) async fn _process_tx(
698        &self,
699        tx: TransactionView,
700        declared_cycles: Option<Cycle>,
701        command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
702    ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
703        let wtx_hash = tx.witness_hash();
704        let instant = Instant::now();
705        let is_sync_process = command_rx.is_none();
706
707        let (ret, snapshot) = self.pre_check(&tx).await;
708
709        let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
710
711        if self.is_in_delay_window(&snapshot) {
712            let mut delay = self.delay.write().await;
713            if delay.len() < DELAY_LIMIT {
714                delay.insert(tx.proposal_short_id(), tx);
715            }
716            return None;
717        }
718
719        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
720        let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
721        let tip_header = snapshot.tip_header();
722        let tx_env = Arc::new(status.with_env(tip_header));
723
724        let verified_ret = verify_rtx(
725            Arc::clone(&snapshot),
726            Arc::clone(&rtx),
727            tx_env,
728            &verify_cache,
729            max_cycles,
730            command_rx,
731        )
732        .await;
733
734        let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
735
736        if let Some(declared) = declared_cycles {
737            if declared != verified.cycles {
738                info!(
739                    "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
740                    declared, verified.cycles, tx
741                );
742                return Some((
743                    Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
744                    snapshot,
745                ));
746            }
747        }
748
749        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
750
751        let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
752        try_or_return_with_snapshot!(ret, submit_snapshot);
753
754        self.notify_block_assembler(status).await;
755
756        if verify_cache.is_none() {
757            // update cache
758            let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
759            tokio::spawn(async move {
760                let mut guard = txs_verify_cache.write().await;
761                guard.put(wtx_hash, verified);
762            });
763        }
764
765        if let Some(metrics) = ckb_metrics::handle() {
766            let elapsed = instant.elapsed().as_secs_f64();
767            if is_sync_process {
768                metrics.ckb_tx_pool_sync_process.observe(elapsed);
769            } else {
770                metrics.ckb_tx_pool_async_process.observe(elapsed);
771            }
772        }
773
774        Some((Ok(verified), submit_snapshot))
775    }
776
777    pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
778        let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
779
780        let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
781
782        // skip check the delay window
783
784        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
785        let max_cycles = self.consensus.max_block_cycles();
786        let tip_header = snapshot.tip_header();
787        let tx_env = Arc::new(status.with_env(tip_header));
788
789        verify_rtx(
790            Arc::clone(&snapshot),
791            Arc::clone(&rtx),
792            tx_env,
793            &verify_cache,
794            max_cycles,
795            None,
796        )
797        .await
798    }
799
800    pub(crate) async fn update_tx_pool_for_reorg(
801        &self,
802        detached_blocks: VecDeque<BlockView>,
803        attached_blocks: VecDeque<BlockView>,
804        detached_proposal_id: HashSet<ProposalShortId>,
805        snapshot: Arc<Snapshot>,
806    ) {
807        let mine_mode = self.block_assembler.is_some();
808        let mut detached = LinkedHashSet::default();
809        let mut attached = LinkedHashSet::default();
810
811        let epoch_of_next_block = snapshot
812            .tip_header()
813            .epoch()
814            .minimum_epoch_number_after_n_blocks(1);
815
816        let new_tip_after_delay = after_delay_window(&snapshot);
817        let is_in_delay_window = self.is_in_delay_window(&snapshot);
818
819        let detached_headers: HashSet<Byte32> = detached_blocks
820            .iter()
821            .map(|blk| blk.header().hash())
822            .collect();
823
824        for blk in detached_blocks {
825            detached.extend(blk.transactions().into_iter().skip(1))
826        }
827
828        for blk in attached_blocks {
829            self.fee_estimator.commit_block(&blk);
830            attached.extend(blk.transactions().into_iter().skip(1));
831        }
832        let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
833
834        let fetched_cache = if is_in_delay_window {
835            // If in delay_window, don't use the cache.
836            HashMap::new()
837        } else {
838            self.fetch_txs_verify_cache(retain.iter()).await
839        };
840
841        // If there are any transactions requires re-process, return them.
842        //
843        // At present, there is only one situation:
844        // - If the hardfork was happened, then re-process all transactions.
845        let txs_opt = {
846            // This closure is used to limit the lifetime of mutable tx_pool.
847            let mut tx_pool = self.tx_pool.write().await;
848
849            let txs_opt = if is_in_delay_window {
850                {
851                    self.verify_queue.write().await.clear();
852                }
853                Some(tx_pool.drain_all_transactions())
854            } else {
855                None
856            };
857
858            _update_tx_pool_for_reorg(
859                &mut tx_pool,
860                &attached,
861                &detached_headers,
862                detached_proposal_id,
863                snapshot,
864                &self.callbacks,
865                mine_mode,
866            );
867
868            // Updates network fork switch if required.
869            //
870            // This operation should be ahead of any transaction which is processed with new
871            // hardfork features.
872            if !self.network.load_ckb2023()
873                && self
874                    .consensus
875                    .hardfork_switch
876                    .ckb2023
877                    .is_vm_version_2_and_syscalls_3_enabled(epoch_of_next_block)
878            {
879                self.network.init_ckb2023()
880            }
881
882            // notice: readd_detached_tx don't update cache
883            self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
884                .await;
885
886            txs_opt
887        };
888
889        if let Some(txs) = txs_opt {
890            let mut delay = self.delay.write().await;
891            if delay.len() < DELAY_LIMIT {
892                for tx in txs {
893                    delay.insert(tx.proposal_short_id(), tx);
894                }
895            }
896        }
897
898        {
899            let delay_txs = if !self.after_delay() && new_tip_after_delay {
900                let limit = MAX_BLOCK_PROPOSALS_LIMIT as usize;
901                let mut txs = Vec::with_capacity(limit);
902                let mut delay = self.delay.write().await;
903                let keys: Vec<_> = { delay.keys().take(limit).cloned().collect() };
904                for k in keys {
905                    if let Some(v) = delay.remove(&k) {
906                        txs.push(v);
907                    }
908                }
909                if delay.is_empty() {
910                    self.set_after_delay_true();
911                }
912                Some(txs)
913            } else {
914                None
915            };
916            if let Some(txs) = delay_txs {
917                self.try_process_txs(txs).await;
918            }
919        }
920
921        self.remove_orphan_txs_by_attach(&attached).await;
922        {
923            let mut queue = self.verify_queue.write().await;
924            queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
925        }
926    }
927
928    async fn enqueue_verify_queue(
929        &self,
930        tx: TransactionView,
931        remote: Option<(Cycle, PeerIndex)>,
932    ) -> Result<bool, Reject> {
933        let mut queue = self.verify_queue.write().await;
934        queue.add_tx(tx, remote)
935    }
936
937    async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
938        for tx in txs.iter() {
939            self.process_orphan_tx(tx).await;
940        }
941        let mut orphan = self.orphan.write().await;
942        orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
943    }
944
945    async fn readd_detached_tx(
946        &self,
947        tx_pool: &mut TxPool,
948        txs: Vec<TransactionView>,
949        fetched_cache: HashMap<Byte32, CacheEntry>,
950    ) {
951        let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
952        for tx in txs {
953            let tx_size = tx.data().serialized_size_in_block();
954            let tx_hash = tx.hash();
955            if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
956                if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
957                    let verify_cache = fetched_cache.get(&tx_hash).cloned();
958                    let snapshot = tx_pool.cloned_snapshot();
959                    let tip_header = snapshot.tip_header();
960                    let tx_env = Arc::new(status.with_env(tip_header));
961                    if let Ok(verified) = verify_rtx(
962                        snapshot,
963                        Arc::clone(&rtx),
964                        tx_env,
965                        &verify_cache,
966                        max_cycles,
967                        None,
968                    )
969                    .await
970                    {
971                        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
972                        if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
973                            error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
974                        } else {
975                            debug!("readd_detached_tx submit_entry {}", tx_hash);
976                        }
977                    }
978                }
979            }
980        }
981    }
982
983    pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
984        {
985            let mut tx_pool = self.tx_pool.write().await;
986            tx_pool.clear(Arc::clone(&new_snapshot));
987        }
988        // reset block_assembler
989        if self
990            .block_assembler_sender
991            .send(BlockAssemblerMessage::Reset(new_snapshot))
992            .await
993            .is_err()
994        {
995            error!("block_assembler receiver dropped");
996        }
997    }
998
999    pub(crate) async fn save_pool(&self) {
1000        let mut tx_pool = self.tx_pool.write().await;
1001        if let Err(err) = tx_pool.save_into_file() {
1002            error!("failed to save pool, error: {:?}", err)
1003        } else {
1004            info!("TxPool saved successfully")
1005        }
1006    }
1007
1008    pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
1009        self.fee_estimator.update_ibd_state(in_ibd);
1010    }
1011
1012    pub(crate) async fn estimate_fee_rate(
1013        &self,
1014        estimate_mode: EstimateMode,
1015        enable_fallback: bool,
1016    ) -> Result<FeeRate, AnyError> {
1017        let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
1018        match self
1019            .fee_estimator
1020            .estimate_fee_rate(estimate_mode, all_entry_info)
1021        {
1022            Ok(fee_rate) => Ok(fee_rate),
1023            Err(err) => {
1024                if enable_fallback {
1025                    let target_blocks =
1026                        FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
1027                    self.tx_pool
1028                        .read()
1029                        .await
1030                        .estimate_fee_rate(target_blocks)
1031                        .map_err(Into::into)
1032                } else {
1033                    Err(err.into())
1034                }
1035            }
1036        }
1037    }
1038
1039    // # Notice
1040    //
1041    // This method assumes that the inputs transactions are sorted.
1042    async fn try_process_txs(&self, txs: Vec<TransactionView>) {
1043        if txs.is_empty() {
1044            return;
1045        }
1046        let total = txs.len();
1047        let mut count = 0usize;
1048        for tx in txs {
1049            let tx_hash = tx.hash();
1050            if let Err(err) = self.process_tx(tx, None).await {
1051                error!("failed to process {:#x}, error: {:?}", tx_hash, err);
1052                count += 1;
1053            }
1054        }
1055        if count != 0 {
1056            info!("{}/{} transaction process failed.", count, total);
1057        }
1058    }
1059
1060    pub(crate) fn is_in_delay_window(&self, snapshot: &Snapshot) -> bool {
1061        let epoch = snapshot.tip_header().epoch();
1062        self.consensus.is_in_delay_window(&epoch)
1063    }
1064}
1065
1066type PreCheckedTx = (
1067    Byte32,                   // tip_hash
1068    Arc<ResolvedTransaction>, // rtx
1069    TxStatus,                 // status
1070    Capacity,                 // tx fee
1071    usize,                    // tx size
1072);
1073
1074type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
1075
1076fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
1077    if snapshot.proposals().contains_proposed(short_id) {
1078        TxStatus::Proposed
1079    } else if snapshot.proposals().contains_gap(short_id) {
1080        TxStatus::Gap
1081    } else {
1082        TxStatus::Fresh
1083    }
1084}
1085
1086fn check_rtx(
1087    tx_pool: &TxPool,
1088    snapshot: &Snapshot,
1089    rtx: &ResolvedTransaction,
1090) -> Result<TxStatus, Reject> {
1091    let short_id = rtx.transaction.proposal_short_id();
1092    let tx_status = get_tx_status(snapshot, &short_id);
1093    tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
1094}
1095
1096fn resolve_tx(
1097    tx_pool: &TxPool,
1098    snapshot: &Snapshot,
1099    tx: TransactionView,
1100    rbf: bool,
1101) -> ResolveResult {
1102    let short_id = tx.proposal_short_id();
1103    let tx_status = get_tx_status(snapshot, &short_id);
1104    tx_pool
1105        .resolve_tx_from_pool(tx, rbf)
1106        .map(|rtx| (rtx, tx_status))
1107}
1108
1109fn _submit_entry(
1110    tx_pool: &mut TxPool,
1111    status: TxStatus,
1112    entry: TxEntry,
1113    callbacks: &Callbacks,
1114) -> Result<HashSet<TxEntry>, Reject> {
1115    let tx_hash = entry.transaction().hash();
1116    debug!("submit_entry {:?} {}", status, tx_hash);
1117    let (succ, evicts) = match status {
1118        TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
1119        TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
1120        TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
1121    };
1122    if succ {
1123        match status {
1124            TxStatus::Fresh => callbacks.call_pending(&entry),
1125            TxStatus::Gap => callbacks.call_pending(&entry),
1126            TxStatus::Proposed => callbacks.call_proposed(&entry),
1127        }
1128    }
1129    Ok(evicts)
1130}
1131
1132fn _update_tx_pool_for_reorg(
1133    tx_pool: &mut TxPool,
1134    attached: &LinkedHashSet<TransactionView>,
1135    detached_headers: &HashSet<Byte32>,
1136    detached_proposal_id: HashSet<ProposalShortId>,
1137    snapshot: Arc<Snapshot>,
1138    callbacks: &Callbacks,
1139    mine_mode: bool,
1140) {
1141    tx_pool.snapshot = Arc::clone(&snapshot);
1142
1143    // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
1144    // pending-pool if they can be found within txpool. As for a transaction
1145    // which is both expired and committed at the one time(commit at its end of commit-window),
1146    // we should treat it as a committed and not re-put into pending-pool. So we should ensure
1147    // that involves `remove_committed_txs` before `remove_expired`.
1148    tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1149    tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1150
1151    // mine mode:
1152    // pending ---> gap ----> proposed
1153    // try move gap to proposed
1154    if mine_mode {
1155        let mut proposals = Vec::new();
1156        let mut gaps = Vec::new();
1157
1158        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1159            let short_id = entry.inner.proposal_short_id();
1160            if snapshot.proposals().contains_proposed(&short_id) {
1161                proposals.push((short_id, entry.inner.clone()));
1162            }
1163        }
1164
1165        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1166            let short_id = entry.inner.proposal_short_id();
1167            let elem = (short_id.clone(), entry.inner.clone());
1168            if snapshot.proposals().contains_proposed(&short_id) {
1169                proposals.push(elem);
1170            } else if snapshot.proposals().contains_gap(&short_id) {
1171                gaps.push(elem);
1172            }
1173        }
1174
1175        for (id, entry) in proposals {
1176            debug!("begin to proposed: {:x}", id);
1177            if let Err(e) = tx_pool.proposed_rtx(&id) {
1178                debug!(
1179                    "Failed to add proposed tx {}, reason: {}",
1180                    entry.transaction().hash(),
1181                    e
1182                );
1183                callbacks.call_reject(tx_pool, &entry, e);
1184            } else {
1185                callbacks.call_proposed(&entry)
1186            }
1187        }
1188
1189        for (id, entry) in gaps {
1190            debug!("begin to gap: {:x}", id);
1191            if let Err(e) = tx_pool.gap_rtx(&id) {
1192                debug!(
1193                    "Failed to add tx to gap {}, reason: {}",
1194                    entry.transaction().hash(),
1195                    e
1196                );
1197                callbacks.call_reject(tx_pool, &entry, e.clone());
1198            }
1199        }
1200    }
1201
1202    // Remove expired transaction from pending
1203    tx_pool.remove_expired(callbacks);
1204
1205    // Remove transactions from the pool until its size <= size_limit.
1206    let _ = tx_pool.limit_size(callbacks, None);
1207}