Skip to main content

ckb_tx_pool/
process.rs

1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10    check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11    time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target, warn};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23    core::{
24        BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25        cell::ResolvedTransaction,
26    },
27    packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31    TxVerifyEnv,
32    cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40/// A list for plug target for `plug_entry` method
41pub enum PlugTarget {
42    /// Pending pool
43    Pending,
44    /// Proposed pool
45    Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50    Fresh,
51    Gap,
52    Proposed,
53}
54
55impl TxStatus {
56    fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57        match self {
58            TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59            TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60            TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61        }
62    }
63}
64
65impl TxPoolService {
66    pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67        if let Some(ref block_assembler) = self.block_assembler {
68            Ok(block_assembler.get_current().await)
69        } else {
70            Err(InternalErrorKind::Config
71                .other("BlockAssembler disabled")
72                .into())
73        }
74    }
75
76    pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77        let guard = self.txs_verify_cache.read().await;
78        guard.peek(&tx.witness_hash()).cloned()
79    }
80
81    async fn fetch_txs_verify_cache(
82        &self,
83        txs: impl Iterator<Item = &TransactionView>,
84    ) -> HashMap<Byte32, CacheEntry> {
85        let guard = self.txs_verify_cache.read().await;
86        txs.filter_map(|tx| {
87            let wtx_hash = tx.witness_hash();
88            guard
89                .peek(&wtx_hash)
90                .cloned()
91                .map(|value| (wtx_hash, value))
92        })
93        .collect()
94    }
95
96    pub(crate) async fn submit_entry(
97        &self,
98        pre_resolve_tip: Byte32,
99        entry: TxEntry,
100        mut status: TxStatus,
101    ) -> (Result<(), Reject>, Arc<Snapshot>) {
102        let (ret, snapshot) = self
103            .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104                // check_rbf must be invoked in `write` lock to avoid concurrent issues.
105                let conflicts = if tx_pool.enable_rbf() {
106                    tx_pool.check_rbf(&snapshot, &entry)?
107                } else {
108                    // RBF is disabled but we found conflicts, return error here
109                    // after_process will put this tx into conflicts_pool
110                    let conflicted_outpoint =
111                        tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112                    if let Some(outpoint) = conflicted_outpoint {
113                        return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114                    }
115                    HashSet::new()
116                };
117
118                // if snapshot changed by context switch we need redo time_relative verify
119                let tip_hash = snapshot.tip_hash();
120                if pre_resolve_tip != tip_hash {
121                    debug!(
122                        "submit_entry {} context changed. previous:{} now:{}",
123                        entry.proposal_short_id(),
124                        pre_resolve_tip,
125                        tip_hash
126                    );
127
128                    // destructuring assignments are not currently supported
129                    status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131                    let tip_header = snapshot.tip_header();
132                    let tx_env = status.with_env(tip_header);
133                    time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134                }
135
136                let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137                let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139                // in a corner case, a tx with lower fee rate may be rejected immediately
140                // after inserting into pool, return proper reject error here
141                for evict in evicted {
142                    let reject = Reject::Invalidated(format!(
143                        "invalidated by tx {}",
144                        evict.transaction().hash()
145                    ));
146                    self.callbacks.call_reject(tx_pool, &evict, reject);
147                }
148
149                tx_pool.remove_conflict(&entry.proposal_short_id());
150                tx_pool
151                    .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152                    .map_or(Ok(()), Err)?;
153
154                if !may_recovered_txs.is_empty() {
155                    let self_clone = self.clone();
156                    tokio::spawn(async move {
157                        // push the recovered txs back to verify queue, so that they can be verified and submitted again
158                        let mut queue = self_clone.verify_queue.write().await;
159                        for tx in may_recovered_txs {
160                            debug!("recover back: {:?}", tx.proposal_short_id());
161                            let _ = queue.add_tx(tx, false, None);
162                        }
163                    });
164                }
165                Ok(())
166            })
167            .await;
168
169        (ret, snapshot)
170    }
171
172    pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173        if self.should_notify_block_assembler() {
174            let message = match status {
175                TxStatus::Fresh => Some(BlockAssemblerMessage::Pending),
176                TxStatus::Proposed => Some(BlockAssemblerMessage::Proposed),
177                _ => None,
178            };
179
180            if let Some(message) = message
181                && self.block_assembler_sender.send(message).await.is_err()
182            {
183                error!("block_assembler receiver dropped");
184            }
185        }
186    }
187
188    // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
189    // since they maybe not conflicted anymore
190    fn process_rbf(
191        &self,
192        tx_pool: &mut TxPool,
193        entry: &TxEntry,
194        conflicts: &HashSet<ProposalShortId>,
195    ) -> Vec<TransactionView> {
196        let mut may_recovered_txs = vec![];
197        let mut available_inputs = HashSet::new();
198
199        if conflicts.is_empty() {
200            return may_recovered_txs;
201        }
202
203        let all_removed: Vec<_> = conflicts
204            .iter()
205            .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
206            .collect();
207
208        available_inputs.extend(
209            all_removed
210                .iter()
211                .flat_map(|removed| removed.transaction().input_pts_iter()),
212        );
213
214        for input in entry.transaction().input_pts_iter() {
215            available_inputs.remove(&input);
216        }
217
218        may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
219        for old in all_removed {
220            debug!(
221                "remove conflict tx {} for RBF by new tx {}",
222                old.transaction().hash(),
223                entry.transaction().hash()
224            );
225            let reject =
226                Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
227
228            // RBF replace successfully, put old transactions into conflicts pool
229            tx_pool.record_conflict(old.transaction().clone());
230            // after removing old tx from tx_pool, we call reject callbacks manually
231            self.callbacks.call_reject(tx_pool, &old, reject);
232        }
233        assert!(!may_recovered_txs.contains(entry.transaction()));
234        may_recovered_txs
235    }
236
237    pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
238        let queue = self.verify_queue.read().await;
239        queue.contains_key(&tx.proposal_short_id())
240    }
241
242    pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
243        let orphan = self.orphan.read().await;
244        orphan.contains_key(&tx.proposal_short_id())
245    }
246
247    pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
248        &self,
249        mut f: F,
250    ) -> (U, Arc<Snapshot>) {
251        let tx_pool = self.tx_pool.read().await;
252        let snapshot = tx_pool.cloned_snapshot();
253
254        let ret = f(&tx_pool, Arc::clone(&snapshot));
255        (ret, snapshot)
256    }
257
258    pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
259        &self,
260        mut f: F,
261    ) -> (U, Arc<Snapshot>) {
262        let mut tx_pool = self.tx_pool.write().await;
263        let snapshot = tx_pool.cloned_snapshot();
264
265        let ret = f(&mut tx_pool, Arc::clone(&snapshot));
266        (ret, snapshot)
267    }
268
269    pub(crate) async fn pre_check(
270        &self,
271        tx: &TransactionView,
272    ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
273        // Acquire read lock for cheap check
274        let tx_size = tx.data().serialized_size_in_block();
275
276        let (ret, snapshot) = self
277            .with_tx_pool_read_lock(|tx_pool, snapshot| {
278                let tip_hash = snapshot.tip_hash();
279
280                // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc.
281                // It's also not possible for RBF, reject it directly
282                check_txid_collision(tx_pool, tx)?;
283
284                // Try normal path first, if double-spending check success we don't need RBF check
285                // this make sure RBF won't introduce extra performance cost for hot path
286                let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
287                match res {
288                    Ok((rtx, status)) => {
289                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
290                        Ok((tip_hash, rtx, status, fee, tx_size))
291                    }
292                    Err(Reject::Resolve(OutPointError::Dead(out))) => {
293                        let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
294                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
295                        let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
296                        if conflicts.is_none() {
297                            // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
298                            // we should reject it directly and don't need to put it into conflicts pool
299                            error!(
300                                "{} is resolved as Dead, but there is no conflicted tx",
301                                rtx.transaction.proposal_short_id()
302                            );
303                            return Err(Reject::Resolve(OutPointError::Dead(out)));
304                        }
305                        // we also return Ok here, so that the entry will be continue to be verified before submit
306                        // we only want to put it into conflicts pool after the verification stage passed
307                        // then we will double-check conflicts txs in `submit_entry`
308
309                        Ok((tip_hash, rtx, status, fee, tx_size))
310                    }
311                    Err(err) => Err(err),
312                }
313            })
314            .await;
315        (ret, snapshot)
316    }
317
318    pub(crate) async fn non_contextual_verify(
319        &self,
320        tx: &TransactionView,
321        remote: Option<(Cycle, PeerIndex)>,
322    ) -> Result<(), Reject> {
323        if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
324            if reject.is_malformed_tx()
325                && let Some(remote) = remote
326            {
327                self.ban_malformed(remote.1, format!("reject {reject}"))
328                    .await;
329            }
330            return Err(reject);
331        }
332        Ok(())
333    }
334
335    pub(crate) async fn resumeble_process_tx(
336        &self,
337        tx: TransactionView,
338        is_proposal_tx: bool,
339        remote: Option<(Cycle, PeerIndex)>,
340    ) -> Result<bool, Reject> {
341        // non contextual verify first
342        self.non_contextual_verify(&tx, remote).await?;
343
344        if self.orphan_contains(&tx).await {
345            debug!("reject tx {} already in orphan pool", tx.hash());
346            return Err(Reject::Duplicated(tx.hash()));
347        }
348
349        if self.verify_queue_contains(&tx).await {
350            return Err(Reject::Duplicated(tx.hash()));
351        }
352        self.enqueue_verify_queue(tx, is_proposal_tx, remote).await
353    }
354
355    pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
356        // non contextual verify first
357        self.non_contextual_verify(&tx, None).await?;
358
359        if self.verify_queue_contains(&tx).await {
360            return Err(Reject::Duplicated(tx.hash()));
361        }
362
363        if self.orphan_contains(&tx).await {
364            debug!("reject tx {} already in orphan pool", tx.hash());
365            return Err(Reject::Duplicated(tx.hash()));
366        }
367        self._test_accept_tx(tx.clone()).await
368    }
369
370    pub(crate) async fn process_tx(
371        &self,
372        tx: TransactionView,
373        remote: Option<(Cycle, PeerIndex)>,
374    ) -> Result<Completed, Reject> {
375        // non contextual verify first
376        self.non_contextual_verify(&tx, remote).await?;
377
378        if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
379            return Err(Reject::Duplicated(tx.hash()));
380        }
381
382        if let Some((ret, snapshot)) = self
383            ._process_tx(tx.clone(), remote.map(|r| r.0), None)
384            .await
385        {
386            self.after_process(tx, remote, &snapshot, &ret).await;
387            ret
388        } else {
389            // currently, the returned cycles is not been used, mock 0 if delay
390            Ok(Completed {
391                cycles: 0,
392                fee: Capacity::zero(),
393            })
394        }
395    }
396
397    pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
398        let mut tx_pool = self.tx_pool.write().await;
399        if let Some(ref mut recent_reject) = tx_pool.recent_reject
400            && let Err(e) = recent_reject.put(tx_hash, reject.clone())
401        {
402            error!(
403                "Failed to record recent_reject {} {} {}",
404                tx_hash, reject, e
405            );
406        }
407    }
408
409    pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
410        let id = ProposalShortId::from_tx_hash(&tx_hash);
411        {
412            let mut queue = self.verify_queue.write().await;
413            if queue.remove_tx(&id).is_some() {
414                return true;
415            }
416        }
417        {
418            let mut orphan = self.orphan.write().await;
419            if orphan.remove_orphan_tx(&id).is_some() {
420                return true;
421            }
422        }
423        let mut tx_pool = self.tx_pool.write().await;
424        tx_pool.remove_tx(&id)
425    }
426
427    pub(crate) async fn after_process(
428        &self,
429        tx: TransactionView,
430        remote: Option<(Cycle, PeerIndex)>,
431        _snapshot: &Snapshot,
432        ret: &Result<Completed, Reject>,
433    ) {
434        let tx_hash = tx.hash();
435
436        // log tx verification result for monitor node
437        if log_enabled_target!("ckb_tx_monitor", Trace)
438            && let Ok(c) = ret
439        {
440            trace_target!(
441                "ckb_tx_monitor",
442                r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
443                tx_hash,
444                c.cycles
445            );
446        }
447
448        if matches!(
449            ret,
450            Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
451        ) {
452            let mut tx_pool = self.tx_pool.write().await;
453            if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
454                tx_pool.record_conflict(tx.clone());
455            }
456        }
457
458        match remote {
459            Some((declared_cycle, peer)) => match ret {
460                Ok(_) => {
461                    debug!(
462                        "after_process remote send_result_to_relayer {} {}",
463                        tx_hash, peer
464                    );
465                    self.send_result_to_relayer(TxVerificationResult::Ok {
466                        original_peer: Some(peer),
467                        tx_hash,
468                    });
469                    self.process_orphan_tx(&tx).await;
470                }
471                Err(reject) => {
472                    info!(
473                        "after_process {} {} remote reject: {} ",
474                        tx_hash, peer, reject
475                    );
476                    if is_missing_input(reject) {
477                        self.send_result_to_relayer(TxVerificationResult::UnknownParents {
478                            peer,
479                            parents: tx.unique_parents(),
480                        });
481                        self.add_orphan(tx, peer, declared_cycle).await;
482                    } else {
483                        if reject.is_malformed_tx() {
484                            self.ban_malformed(peer, format!("reject {reject}")).await;
485                        }
486                        if reject.is_allowed_relay() {
487                            self.send_result_to_relayer(TxVerificationResult::Reject {
488                                tx_hash: tx_hash.clone(),
489                            });
490                        }
491                        if reject.should_recorded() {
492                            self.put_recent_reject(&tx_hash, reject).await;
493                        }
494                    }
495                }
496            },
497            None => {
498                match ret {
499                    Ok(_) => {
500                        debug!("after_process local send_result_to_relayer {}", tx_hash);
501                        self.send_result_to_relayer(TxVerificationResult::Ok {
502                            original_peer: None,
503                            tx_hash,
504                        });
505                        self.process_orphan_tx(&tx).await;
506                    }
507                    Err(Reject::Duplicated(_)) => {
508                        debug!("after_process {} duplicated", tx_hash);
509                        // re-broadcast tx when it's duplicated and submitted through local rpc
510                        self.send_result_to_relayer(TxVerificationResult::Ok {
511                            original_peer: None,
512                            tx_hash,
513                        });
514                    }
515                    Err(reject) => {
516                        debug!("after_process {} reject: {} ", tx_hash, reject);
517                        if reject.should_recorded() {
518                            self.put_recent_reject(&tx_hash, reject).await;
519                        }
520                    }
521                }
522            }
523        }
524    }
525
526    pub(crate) async fn add_orphan(
527        &self,
528        tx: TransactionView,
529        peer: PeerIndex,
530        declared_cycle: Cycle,
531    ) {
532        let evicted_txs = self
533            .orphan
534            .write()
535            .await
536            .add_orphan_tx(tx, peer, declared_cycle);
537        // for any evicted orphan tx, we should send reject to relayer
538        // so that we mark it as `unknown` in filter
539        for tx_hash in evicted_txs {
540            self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
541        }
542    }
543
544    pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
545        let orphan = self.orphan.read().await;
546        orphan
547            .find_by_previous(tx)
548            .iter()
549            .filter_map(|id| orphan.get(id).cloned())
550            .collect::<Vec<_>>()
551    }
552
553    pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
554        self.orphan.write().await.remove_orphan_tx(id);
555    }
556
557    /// Remove all orphans which are resolved by the given transaction
558    /// the process is like a breath first search, if there is a cycle in `orphan_queue`,
559    /// `_process_tx` will return `Reject` since we have checked duplicated tx
560    pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
561        let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
562        orphan_queue.push_back(tx.clone());
563
564        while let Some(previous) = orphan_queue.pop_front() {
565            let orphans = self.find_orphan_by_previous(&previous).await;
566            for orphan in orphans.into_iter() {
567                if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
568                    debug!(
569                        "process_orphan {} added to verify queue; find previous from {}",
570                        orphan.tx.hash(),
571                        tx.hash(),
572                    );
573                    let orphan_id = orphan.tx.proposal_short_id();
574                    match self
575                        .enqueue_verify_queue(
576                            orphan.tx.clone(),
577                            false,
578                            Some((orphan.cycle, orphan.peer)),
579                        )
580                        .await
581                    {
582                        Ok(_) => {
583                            self.remove_orphan_tx(&orphan_id).await;
584                        }
585                        Err(reject) => {
586                            warn!(
587                                "process_orphan {} failed to enqueue verify queue: {}; keep orphan from {}",
588                                orphan.tx.hash(),
589                                reject,
590                                tx.hash(),
591                            );
592                        }
593                    }
594                } else if let Some((ret, _snapshot)) = self
595                    ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
596                    .await
597                {
598                    match ret {
599                        Ok(_) => {
600                            self.send_result_to_relayer(TxVerificationResult::Ok {
601                                original_peer: Some(orphan.peer),
602                                tx_hash: orphan.tx.hash(),
603                            });
604                            debug!(
605                                "process_orphan {} success, find previous from {}",
606                                orphan.tx.hash(),
607                                tx.hash()
608                            );
609                            self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
610                            orphan_queue.push_back(orphan.tx);
611                        }
612                        Err(reject) => {
613                            debug!(
614                                "process_orphan {} reject {}, find previous from {}",
615                                orphan.tx.hash(),
616                                reject,
617                                tx.hash(),
618                            );
619
620                            if !is_missing_input(&reject) {
621                                self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
622                                if reject.is_malformed_tx() {
623                                    self.ban_malformed(orphan.peer, format!("reject {reject}"))
624                                        .await;
625                                }
626                                if reject.is_allowed_relay() {
627                                    self.send_result_to_relayer(TxVerificationResult::Reject {
628                                        tx_hash: orphan.tx.hash(),
629                                    });
630                                }
631                                if reject.should_recorded() {
632                                    self.put_recent_reject(&orphan.tx.hash(), &reject).await;
633                                }
634                            }
635                        }
636                    }
637                }
638            }
639        }
640    }
641
642    pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
643        if let Err(e) = self.tx_relay_sender.send(result) {
644            error!("tx-pool tx_relay_sender internal error {}", e);
645        }
646    }
647
648    async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
649        const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
650
651        #[cfg(feature = "with_sentry")]
652        use sentry::{Level, capture_message, with_scope};
653
654        #[cfg(feature = "with_sentry")]
655        with_scope(
656            |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
657            || {
658                capture_message(
659                    &format!(
660                        "Ban peer {} for {} seconds, reason: \
661                        {}",
662                        peer,
663                        DEFAULT_BAN_TIME.as_secs(),
664                        reason
665                    ),
666                    Level::Info,
667                )
668            },
669        );
670        self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
671        self.verify_queue.write().await.remove_txs_by_peer(&peer);
672    }
673
674    pub(crate) async fn _process_tx(
675        &self,
676        tx: TransactionView,
677        declared_cycles: Option<Cycle>,
678        command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
679    ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
680        let wtx_hash = tx.witness_hash();
681        let instant = Instant::now();
682        let is_sync_process = command_rx.is_none();
683
684        let (ret, snapshot) = self.pre_check(&tx).await;
685
686        let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
687
688        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
689        let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
690        let tip_header = snapshot.tip_header();
691        let tx_env = Arc::new(status.with_env(tip_header));
692
693        let verified_ret = verify_rtx(
694            Arc::clone(&snapshot),
695            Arc::clone(&rtx),
696            tx_env,
697            &verify_cache,
698            max_cycles,
699            command_rx,
700        )
701        .await;
702
703        let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
704
705        if let Some(declared) = declared_cycles
706            && declared != verified.cycles
707        {
708            info!(
709                "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
710                declared, verified.cycles, tx
711            );
712            return Some((
713                Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
714                snapshot,
715            ));
716        }
717
718        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
719
720        let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
721        try_or_return_with_snapshot!(ret, submit_snapshot);
722
723        self.notify_block_assembler(status).await;
724
725        if verify_cache.is_none() {
726            // update cache
727            let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
728            tokio::spawn(async move {
729                let mut guard = txs_verify_cache.write().await;
730                guard.put(wtx_hash, verified);
731            });
732        }
733
734        if let Some(metrics) = ckb_metrics::handle() {
735            let elapsed = instant.elapsed().as_secs_f64();
736            if is_sync_process {
737                metrics.ckb_tx_pool_sync_process.observe(elapsed);
738            } else {
739                metrics.ckb_tx_pool_async_process.observe(elapsed);
740            }
741        }
742
743        Some((Ok(verified), submit_snapshot))
744    }
745
746    pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
747        let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
748
749        let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
750
751        // skip check the delay window
752
753        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
754        let max_cycles = self.consensus.max_block_cycles();
755        let tip_header = snapshot.tip_header();
756        let tx_env = Arc::new(status.with_env(tip_header));
757
758        verify_rtx(
759            Arc::clone(&snapshot),
760            Arc::clone(&rtx),
761            tx_env,
762            &verify_cache,
763            max_cycles,
764            None,
765        )
766        .await
767    }
768
769    pub(crate) async fn update_tx_pool_for_reorg(
770        &self,
771        detached_blocks: VecDeque<BlockView>,
772        attached_blocks: VecDeque<BlockView>,
773        detached_proposal_id: HashSet<ProposalShortId>,
774        snapshot: Arc<Snapshot>,
775    ) {
776        let mine_mode = self.block_assembler.is_some();
777        let mut detached = LinkedHashSet::default();
778        let mut attached = LinkedHashSet::default();
779
780        let detached_headers: HashSet<Byte32> = detached_blocks
781            .iter()
782            .map(|blk| blk.header().hash())
783            .collect();
784
785        for blk in detached_blocks {
786            detached.extend(blk.transactions().into_iter().skip(1))
787        }
788
789        for blk in attached_blocks {
790            self.fee_estimator.commit_block(&blk);
791            attached.extend(blk.transactions().into_iter().skip(1));
792        }
793        let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
794
795        let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
796
797        // If there are any transactions requires re-process, return them.
798        //
799        // At present, there is only one situation:
800        // - If the hardfork was happened, then re-process all transactions.
801        {
802            // This closure is used to limit the lifetime of mutable tx_pool.
803            let mut tx_pool = self.tx_pool.write().await;
804
805            _update_tx_pool_for_reorg(
806                &mut tx_pool,
807                &attached,
808                &detached_headers,
809                detached_proposal_id,
810                snapshot,
811                &self.callbacks,
812                mine_mode,
813            );
814
815            // notice: readd_detached_tx don't update cache
816            self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
817                .await;
818        }
819
820        self.remove_orphan_txs_by_attach(&attached).await;
821        {
822            let mut queue = self.verify_queue.write().await;
823            queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
824        }
825    }
826
827    async fn enqueue_verify_queue(
828        &self,
829        tx: TransactionView,
830        is_proposal_tx: bool,
831        remote: Option<(Cycle, PeerIndex)>,
832    ) -> Result<bool, Reject> {
833        let mut queue = self.verify_queue.write().await;
834        queue.add_tx(tx, is_proposal_tx, remote)
835    }
836
837    async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
838        for tx in txs.iter() {
839            self.process_orphan_tx(tx).await;
840        }
841        let mut orphan = self.orphan.write().await;
842        orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
843    }
844
845    async fn readd_detached_tx(
846        &self,
847        tx_pool: &mut TxPool,
848        txs: Vec<TransactionView>,
849        fetched_cache: HashMap<Byte32, CacheEntry>,
850    ) {
851        let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
852        for tx in txs {
853            let tx_size = tx.data().serialized_size_in_block();
854            let tx_hash = tx.hash();
855            if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false)
856                && let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size)
857            {
858                let verify_cache = fetched_cache.get(&tx_hash).cloned();
859                let snapshot = tx_pool.cloned_snapshot();
860                let tip_header = snapshot.tip_header();
861                let tx_env = Arc::new(status.with_env(tip_header));
862                if let Ok(verified) = verify_rtx(
863                    snapshot,
864                    Arc::clone(&rtx),
865                    tx_env,
866                    &verify_cache,
867                    max_cycles,
868                    None,
869                )
870                .await
871                {
872                    let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
873                    if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
874                        error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
875                    } else {
876                        debug!("readd_detached_tx submit_entry {}", tx_hash);
877                    }
878                }
879            }
880        }
881    }
882
883    pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
884        {
885            let mut tx_pool = self.tx_pool.write().await;
886            tx_pool.clear(Arc::clone(&new_snapshot));
887        }
888        // reset block_assembler
889        if self
890            .block_assembler_sender
891            .send(BlockAssemblerMessage::Reset(new_snapshot))
892            .await
893            .is_err()
894        {
895            error!("block_assembler receiver dropped");
896        }
897    }
898
899    pub(crate) async fn save_pool(&self) {
900        let mut tx_pool = self.tx_pool.write().await;
901        if let Err(err) = tx_pool.save_into_file() {
902            error!("failed to save pool, error: {:?}", err)
903        } else {
904            info!("TxPool saved successfully")
905        }
906    }
907
908    pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
909        self.fee_estimator.update_ibd_state(in_ibd);
910    }
911
912    pub(crate) async fn estimate_fee_rate(
913        &self,
914        estimate_mode: EstimateMode,
915        enable_fallback: bool,
916    ) -> Result<FeeRate, AnyError> {
917        let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
918        match self
919            .fee_estimator
920            .estimate_fee_rate(estimate_mode, all_entry_info)
921        {
922            Ok(fee_rate) => Ok(fee_rate),
923            Err(err) => {
924                if enable_fallback {
925                    let target_blocks =
926                        FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
927                    self.tx_pool
928                        .read()
929                        .await
930                        .estimate_fee_rate(target_blocks)
931                        .map_err(Into::into)
932                } else {
933                    Err(err.into())
934                }
935            }
936        }
937    }
938}
939
940type PreCheckedTx = (
941    Byte32,                   // tip_hash
942    Arc<ResolvedTransaction>, // rtx
943    TxStatus,                 // status
944    Capacity,                 // tx fee
945    usize,                    // tx size
946);
947
948type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
949
950fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
951    if snapshot.proposals().contains_proposed(short_id) {
952        TxStatus::Proposed
953    } else if snapshot.proposals().contains_gap(short_id) {
954        TxStatus::Gap
955    } else {
956        TxStatus::Fresh
957    }
958}
959
960fn check_rtx(
961    tx_pool: &TxPool,
962    snapshot: &Snapshot,
963    rtx: &ResolvedTransaction,
964) -> Result<TxStatus, Reject> {
965    let short_id = rtx.transaction.proposal_short_id();
966    let tx_status = get_tx_status(snapshot, &short_id);
967    tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
968}
969
970fn resolve_tx(
971    tx_pool: &TxPool,
972    snapshot: &Snapshot,
973    tx: TransactionView,
974    rbf: bool,
975) -> ResolveResult {
976    let short_id = tx.proposal_short_id();
977    let tx_status = get_tx_status(snapshot, &short_id);
978    tx_pool
979        .resolve_tx_from_pool(tx, rbf)
980        .map(|rtx| (rtx, tx_status))
981}
982
983fn _submit_entry(
984    tx_pool: &mut TxPool,
985    status: TxStatus,
986    entry: TxEntry,
987    callbacks: &Callbacks,
988) -> Result<HashSet<TxEntry>, Reject> {
989    let tx_hash = entry.transaction().hash();
990    debug!("submit_entry {:?} {}", status, tx_hash);
991    let (succ, evicts) = match status {
992        TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
993        TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
994        TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
995    };
996    if succ {
997        match status {
998            TxStatus::Fresh => callbacks.call_pending(&entry),
999            TxStatus::Gap => callbacks.call_pending(&entry),
1000            TxStatus::Proposed => callbacks.call_proposed(&entry),
1001        }
1002    }
1003    Ok(evicts)
1004}
1005
1006fn _update_tx_pool_for_reorg(
1007    tx_pool: &mut TxPool,
1008    attached: &LinkedHashSet<TransactionView>,
1009    detached_headers: &HashSet<Byte32>,
1010    detached_proposal_id: HashSet<ProposalShortId>,
1011    snapshot: Arc<Snapshot>,
1012    callbacks: &Callbacks,
1013    mine_mode: bool,
1014) {
1015    tx_pool.snapshot = Arc::clone(&snapshot);
1016
1017    // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
1018    // pending-pool if they can be found within txpool. As for a transaction
1019    // which is both expired and committed at the one time(commit at its end of commit-window),
1020    // we should treat it as a committed and not re-put into pending-pool. So we should ensure
1021    // that involves `remove_committed_txs` before `remove_expired`.
1022    tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1023    tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1024
1025    // mine mode:
1026    // pending ---> gap ----> proposed
1027    // try move gap to proposed
1028    if mine_mode {
1029        let mut proposals = Vec::new();
1030        let mut gaps = Vec::new();
1031
1032        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1033            let short_id = entry.inner.proposal_short_id();
1034            if snapshot.proposals().contains_proposed(&short_id) {
1035                proposals.push((short_id, entry.inner.clone()));
1036            }
1037        }
1038
1039        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1040            let short_id = entry.inner.proposal_short_id();
1041            let elem = (short_id.clone(), entry.inner.clone());
1042            if snapshot.proposals().contains_proposed(&short_id) {
1043                proposals.push(elem);
1044            } else if snapshot.proposals().contains_gap(&short_id) {
1045                gaps.push(elem);
1046            }
1047        }
1048
1049        for (id, entry) in proposals {
1050            debug!("begin to proposed: {:x}", id);
1051            if let Err(e) = tx_pool.proposed_rtx(&id) {
1052                debug!(
1053                    "Failed to add proposed tx {}, reason: {}",
1054                    entry.transaction().hash(),
1055                    e
1056                );
1057                callbacks.call_reject(tx_pool, &entry, e);
1058            } else {
1059                callbacks.call_proposed(&entry)
1060            }
1061        }
1062
1063        for (id, entry) in gaps {
1064            debug!("begin to gap: {:x}", id);
1065            if let Err(e) = tx_pool.gap_rtx(&id) {
1066                debug!(
1067                    "Failed to add tx to gap {}, reason: {}",
1068                    entry.transaction().hash(),
1069                    e
1070                );
1071                callbacks.call_reject(tx_pool, &entry, e.clone());
1072            }
1073        }
1074    }
1075
1076    // Remove expired transaction from pending
1077    tx_pool.remove_expired(callbacks);
1078
1079    // Remove transactions from the pool until its size <= size_limit.
1080    let _ = tx_pool.limit_size(callbacks, None);
1081}