Skip to main content

ckb_tx_pool/
process.rs

1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10    check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11    time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23    core::{
24        BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25        cell::ResolvedTransaction,
26    },
27    packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31    TxVerifyEnv,
32    cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40/// A list for plug target for `plug_entry` method
41pub enum PlugTarget {
42    /// Pending pool
43    Pending,
44    /// Proposed pool
45    Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50    Fresh,
51    Gap,
52    Proposed,
53}
54
55impl TxStatus {
56    fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57        match self {
58            TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59            TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60            TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61        }
62    }
63}
64
65impl TxPoolService {
66    pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67        if let Some(ref block_assembler) = self.block_assembler {
68            Ok(block_assembler.get_current().await)
69        } else {
70            Err(InternalErrorKind::Config
71                .other("BlockAssembler disabled")
72                .into())
73        }
74    }
75
76    pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77        let guard = self.txs_verify_cache.read().await;
78        guard.peek(&tx.witness_hash()).cloned()
79    }
80
81    async fn fetch_txs_verify_cache(
82        &self,
83        txs: impl Iterator<Item = &TransactionView>,
84    ) -> HashMap<Byte32, CacheEntry> {
85        let guard = self.txs_verify_cache.read().await;
86        txs.filter_map(|tx| {
87            let wtx_hash = tx.witness_hash();
88            guard
89                .peek(&wtx_hash)
90                .cloned()
91                .map(|value| (wtx_hash, value))
92        })
93        .collect()
94    }
95
96    pub(crate) async fn submit_entry(
97        &self,
98        pre_resolve_tip: Byte32,
99        entry: TxEntry,
100        mut status: TxStatus,
101    ) -> (Result<(), Reject>, Arc<Snapshot>) {
102        let (ret, snapshot) = self
103            .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104                // check_rbf must be invoked in `write` lock to avoid concurrent issues.
105                let conflicts = if tx_pool.enable_rbf() {
106                    tx_pool.check_rbf(&snapshot, &entry)?
107                } else {
108                    // RBF is disabled but we found conflicts, return error here
109                    // after_process will put this tx into conflicts_pool
110                    let conflicted_outpoint =
111                        tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112                    if let Some(outpoint) = conflicted_outpoint {
113                        return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114                    }
115                    HashSet::new()
116                };
117
118                // if snapshot changed by context switch we need redo time_relative verify
119                let tip_hash = snapshot.tip_hash();
120                if pre_resolve_tip != tip_hash {
121                    debug!(
122                        "submit_entry {} context changed. previous:{} now:{}",
123                        entry.proposal_short_id(),
124                        pre_resolve_tip,
125                        tip_hash
126                    );
127
128                    // destructuring assignments are not currently supported
129                    status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131                    let tip_header = snapshot.tip_header();
132                    let tx_env = status.with_env(tip_header);
133                    time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134                }
135
136                let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137                let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139                // in a corner case, a tx with lower fee rate may be rejected immediately
140                // after inserting into pool, return proper reject error here
141                for evict in evicted {
142                    let reject = Reject::Invalidated(format!(
143                        "invalidated by tx {}",
144                        evict.transaction().hash()
145                    ));
146                    self.callbacks.call_reject(tx_pool, &evict, reject);
147                }
148
149                tx_pool.remove_conflict(&entry.proposal_short_id());
150                tx_pool
151                    .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152                    .map_or(Ok(()), Err)?;
153
154                if !may_recovered_txs.is_empty() {
155                    let self_clone = self.clone();
156                    tokio::spawn(async move {
157                        // push the recovered txs back to verify queue, so that they can be verified and submitted again
158                        let mut queue = self_clone.verify_queue.write().await;
159                        for tx in may_recovered_txs {
160                            debug!("recover back: {:?}", tx.proposal_short_id());
161                            let _ = queue.add_tx(tx, false, None);
162                        }
163                    });
164                }
165                Ok(())
166            })
167            .await;
168
169        (ret, snapshot)
170    }
171
172    pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173        if self.should_notify_block_assembler() {
174            match status {
175                TxStatus::Fresh => {
176                    if self
177                        .block_assembler_sender
178                        .send(BlockAssemblerMessage::Pending)
179                        .await
180                        .is_err()
181                    {
182                        error!("block_assembler receiver dropped");
183                    }
184                }
185                TxStatus::Proposed => {
186                    if self
187                        .block_assembler_sender
188                        .send(BlockAssemblerMessage::Proposed)
189                        .await
190                        .is_err()
191                    {
192                        error!("block_assembler receiver dropped");
193                    }
194                }
195                _ => {}
196            }
197        }
198    }
199
200    // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
201    // since they maybe not conflicted anymore
202    fn process_rbf(
203        &self,
204        tx_pool: &mut TxPool,
205        entry: &TxEntry,
206        conflicts: &HashSet<ProposalShortId>,
207    ) -> Vec<TransactionView> {
208        let mut may_recovered_txs = vec![];
209        let mut available_inputs = HashSet::new();
210
211        if conflicts.is_empty() {
212            return may_recovered_txs;
213        }
214
215        let all_removed: Vec<_> = conflicts
216            .iter()
217            .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
218            .collect();
219
220        available_inputs.extend(
221            all_removed
222                .iter()
223                .flat_map(|removed| removed.transaction().input_pts_iter()),
224        );
225
226        for input in entry.transaction().input_pts_iter() {
227            available_inputs.remove(&input);
228        }
229
230        may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
231        for old in all_removed {
232            debug!(
233                "remove conflict tx {} for RBF by new tx {}",
234                old.transaction().hash(),
235                entry.transaction().hash()
236            );
237            let reject =
238                Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
239
240            // RBF replace successfully, put old transactions into conflicts pool
241            tx_pool.record_conflict(old.transaction().clone());
242            // after removing old tx from tx_pool, we call reject callbacks manually
243            self.callbacks.call_reject(tx_pool, &old, reject);
244        }
245        assert!(!may_recovered_txs.contains(entry.transaction()));
246        may_recovered_txs
247    }
248
249    pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
250        let queue = self.verify_queue.read().await;
251        queue.contains_key(&tx.proposal_short_id())
252    }
253
254    pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
255        let orphan = self.orphan.read().await;
256        orphan.contains_key(&tx.proposal_short_id())
257    }
258
259    pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
260        &self,
261        mut f: F,
262    ) -> (U, Arc<Snapshot>) {
263        let tx_pool = self.tx_pool.read().await;
264        let snapshot = tx_pool.cloned_snapshot();
265
266        let ret = f(&tx_pool, Arc::clone(&snapshot));
267        (ret, snapshot)
268    }
269
270    pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
271        &self,
272        mut f: F,
273    ) -> (U, Arc<Snapshot>) {
274        let mut tx_pool = self.tx_pool.write().await;
275        let snapshot = tx_pool.cloned_snapshot();
276
277        let ret = f(&mut tx_pool, Arc::clone(&snapshot));
278        (ret, snapshot)
279    }
280
281    pub(crate) async fn pre_check(
282        &self,
283        tx: &TransactionView,
284    ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
285        // Acquire read lock for cheap check
286        let tx_size = tx.data().serialized_size_in_block();
287
288        let (ret, snapshot) = self
289            .with_tx_pool_read_lock(|tx_pool, snapshot| {
290                let tip_hash = snapshot.tip_hash();
291
292                // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc.
293                // It's also not possible for RBF, reject it directly
294                check_txid_collision(tx_pool, tx)?;
295
296                // Try normal path first, if double-spending check success we don't need RBF check
297                // this make sure RBF won't introduce extra performance cost for hot path
298                let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
299                match res {
300                    Ok((rtx, status)) => {
301                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
302                        Ok((tip_hash, rtx, status, fee, tx_size))
303                    }
304                    Err(Reject::Resolve(OutPointError::Dead(out))) => {
305                        let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
306                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
307                        let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
308                        if conflicts.is_none() {
309                            // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
310                            // we should reject it directly and don't need to put it into conflicts pool
311                            error!(
312                                "{} is resolved as Dead, but there is no conflicted tx",
313                                rtx.transaction.proposal_short_id()
314                            );
315                            return Err(Reject::Resolve(OutPointError::Dead(out)));
316                        }
317                        // we also return Ok here, so that the entry will be continue to be verified before submit
318                        // we only want to put it into conflicts pool after the verification stage passed
319                        // then we will double-check conflicts txs in `submit_entry`
320
321                        Ok((tip_hash, rtx, status, fee, tx_size))
322                    }
323                    Err(err) => Err(err),
324                }
325            })
326            .await;
327        (ret, snapshot)
328    }
329
330    pub(crate) async fn non_contextual_verify(
331        &self,
332        tx: &TransactionView,
333        remote: Option<(Cycle, PeerIndex)>,
334    ) -> Result<(), Reject> {
335        if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
336            if reject.is_malformed_tx()
337                && let Some(remote) = remote
338            {
339                self.ban_malformed(remote.1, format!("reject {reject}"))
340                    .await;
341            }
342            return Err(reject);
343        }
344        Ok(())
345    }
346
347    pub(crate) async fn resumeble_process_tx(
348        &self,
349        tx: TransactionView,
350        is_proposal_tx: bool,
351        remote: Option<(Cycle, PeerIndex)>,
352    ) -> Result<bool, Reject> {
353        // non contextual verify first
354        self.non_contextual_verify(&tx, remote).await?;
355
356        if self.orphan_contains(&tx).await {
357            debug!("reject tx {} already in orphan pool", tx.hash());
358            return Err(Reject::Duplicated(tx.hash()));
359        }
360
361        if self.verify_queue_contains(&tx).await {
362            return Err(Reject::Duplicated(tx.hash()));
363        }
364        self.enqueue_verify_queue(tx, is_proposal_tx, remote).await
365    }
366
367    pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
368        // non contextual verify first
369        self.non_contextual_verify(&tx, None).await?;
370
371        if self.verify_queue_contains(&tx).await {
372            return Err(Reject::Duplicated(tx.hash()));
373        }
374
375        if self.orphan_contains(&tx).await {
376            debug!("reject tx {} already in orphan pool", tx.hash());
377            return Err(Reject::Duplicated(tx.hash()));
378        }
379        self._test_accept_tx(tx.clone()).await
380    }
381
382    pub(crate) async fn process_tx(
383        &self,
384        tx: TransactionView,
385        remote: Option<(Cycle, PeerIndex)>,
386    ) -> Result<Completed, Reject> {
387        // non contextual verify first
388        self.non_contextual_verify(&tx, remote).await?;
389
390        if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
391            return Err(Reject::Duplicated(tx.hash()));
392        }
393
394        if let Some((ret, snapshot)) = self
395            ._process_tx(tx.clone(), remote.map(|r| r.0), None)
396            .await
397        {
398            self.after_process(tx, remote, &snapshot, &ret).await;
399            ret
400        } else {
401            // currently, the returned cycles is not been used, mock 0 if delay
402            Ok(Completed {
403                cycles: 0,
404                fee: Capacity::zero(),
405            })
406        }
407    }
408
409    pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
410        let mut tx_pool = self.tx_pool.write().await;
411        if let Some(ref mut recent_reject) = tx_pool.recent_reject
412            && let Err(e) = recent_reject.put(tx_hash, reject.clone())
413        {
414            error!(
415                "Failed to record recent_reject {} {} {}",
416                tx_hash, reject, e
417            );
418        }
419    }
420
421    pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
422        let id = ProposalShortId::from_tx_hash(&tx_hash);
423        {
424            let mut queue = self.verify_queue.write().await;
425            if queue.remove_tx(&id).is_some() {
426                return true;
427            }
428        }
429        {
430            let mut orphan = self.orphan.write().await;
431            if orphan.remove_orphan_tx(&id).is_some() {
432                return true;
433            }
434        }
435        let mut tx_pool = self.tx_pool.write().await;
436        tx_pool.remove_tx(&id)
437    }
438
439    pub(crate) async fn after_process(
440        &self,
441        tx: TransactionView,
442        remote: Option<(Cycle, PeerIndex)>,
443        _snapshot: &Snapshot,
444        ret: &Result<Completed, Reject>,
445    ) {
446        let tx_hash = tx.hash();
447
448        // log tx verification result for monitor node
449        if log_enabled_target!("ckb_tx_monitor", Trace)
450            && let Ok(c) = ret
451        {
452            trace_target!(
453                "ckb_tx_monitor",
454                r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
455                tx_hash,
456                c.cycles
457            );
458        }
459
460        if matches!(
461            ret,
462            Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
463        ) {
464            let mut tx_pool = self.tx_pool.write().await;
465            if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
466                tx_pool.record_conflict(tx.clone());
467            }
468        }
469
470        match remote {
471            Some((declared_cycle, peer)) => match ret {
472                Ok(_) => {
473                    debug!(
474                        "after_process remote send_result_to_relayer {} {}",
475                        tx_hash, peer
476                    );
477                    self.send_result_to_relayer(TxVerificationResult::Ok {
478                        original_peer: Some(peer),
479                        tx_hash,
480                    });
481                    self.process_orphan_tx(&tx).await;
482                }
483                Err(reject) => {
484                    info!(
485                        "after_process {} {} remote reject: {} ",
486                        tx_hash, peer, reject
487                    );
488                    if is_missing_input(reject) {
489                        self.send_result_to_relayer(TxVerificationResult::UnknownParents {
490                            peer,
491                            parents: tx.unique_parents(),
492                        });
493                        self.add_orphan(tx, peer, declared_cycle).await;
494                    } else {
495                        if reject.is_malformed_tx() {
496                            self.ban_malformed(peer, format!("reject {reject}")).await;
497                        }
498                        if reject.is_allowed_relay() {
499                            self.send_result_to_relayer(TxVerificationResult::Reject {
500                                tx_hash: tx_hash.clone(),
501                            });
502                        }
503                        if reject.should_recorded() {
504                            self.put_recent_reject(&tx_hash, reject).await;
505                        }
506                    }
507                }
508            },
509            None => {
510                match ret {
511                    Ok(_) => {
512                        debug!("after_process local send_result_to_relayer {}", tx_hash);
513                        self.send_result_to_relayer(TxVerificationResult::Ok {
514                            original_peer: None,
515                            tx_hash,
516                        });
517                        self.process_orphan_tx(&tx).await;
518                    }
519                    Err(Reject::Duplicated(_)) => {
520                        debug!("after_process {} duplicated", tx_hash);
521                        // re-broadcast tx when it's duplicated and submitted through local rpc
522                        self.send_result_to_relayer(TxVerificationResult::Ok {
523                            original_peer: None,
524                            tx_hash,
525                        });
526                    }
527                    Err(reject) => {
528                        debug!("after_process {} reject: {} ", tx_hash, reject);
529                        if reject.should_recorded() {
530                            self.put_recent_reject(&tx_hash, reject).await;
531                        }
532                    }
533                }
534            }
535        }
536    }
537
538    pub(crate) async fn add_orphan(
539        &self,
540        tx: TransactionView,
541        peer: PeerIndex,
542        declared_cycle: Cycle,
543    ) {
544        let evicted_txs = self
545            .orphan
546            .write()
547            .await
548            .add_orphan_tx(tx, peer, declared_cycle);
549        // for any evicted orphan tx, we should send reject to relayer
550        // so that we mark it as `unknown` in filter
551        for tx_hash in evicted_txs {
552            self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
553        }
554    }
555
556    pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
557        let orphan = self.orphan.read().await;
558        orphan
559            .find_by_previous(tx)
560            .iter()
561            .filter_map(|id| orphan.get(id).cloned())
562            .collect::<Vec<_>>()
563    }
564
565    pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
566        self.orphan.write().await.remove_orphan_tx(id);
567    }
568
569    /// Remove all orphans which are resolved by the given transaction
570    /// the process is like a breath first search, if there is a cycle in `orphan_queue`,
571    /// `_process_tx` will return `Reject` since we have checked duplicated tx
572    pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
573        let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
574        orphan_queue.push_back(tx.clone());
575
576        while let Some(previous) = orphan_queue.pop_front() {
577            let orphans = self.find_orphan_by_previous(&previous).await;
578            for orphan in orphans.into_iter() {
579                if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
580                    debug!(
581                        "process_orphan {} added to verify queue; find previous from {}",
582                        orphan.tx.hash(),
583                        tx.hash(),
584                    );
585                    self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
586                    self.enqueue_verify_queue(orphan.tx, false, Some((orphan.cycle, orphan.peer)))
587                        .await
588                        .expect("enqueue suspended tx");
589                } else if let Some((ret, _snapshot)) = self
590                    ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
591                    .await
592                {
593                    match ret {
594                        Ok(_) => {
595                            self.send_result_to_relayer(TxVerificationResult::Ok {
596                                original_peer: Some(orphan.peer),
597                                tx_hash: orphan.tx.hash(),
598                            });
599                            debug!(
600                                "process_orphan {} success, find previous from {}",
601                                orphan.tx.hash(),
602                                tx.hash()
603                            );
604                            self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
605                            orphan_queue.push_back(orphan.tx);
606                        }
607                        Err(reject) => {
608                            debug!(
609                                "process_orphan {} reject {}, find previous from {}",
610                                orphan.tx.hash(),
611                                reject,
612                                tx.hash(),
613                            );
614
615                            if !is_missing_input(&reject) {
616                                self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
617                                if reject.is_malformed_tx() {
618                                    self.ban_malformed(orphan.peer, format!("reject {reject}"))
619                                        .await;
620                                }
621                                if reject.is_allowed_relay() {
622                                    self.send_result_to_relayer(TxVerificationResult::Reject {
623                                        tx_hash: orphan.tx.hash(),
624                                    });
625                                }
626                                if reject.should_recorded() {
627                                    self.put_recent_reject(&orphan.tx.hash(), &reject).await;
628                                }
629                            }
630                        }
631                    }
632                }
633            }
634        }
635    }
636
637    pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
638        if let Err(e) = self.tx_relay_sender.send(result) {
639            error!("tx-pool tx_relay_sender internal error {}", e);
640        }
641    }
642
643    async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
644        const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
645
646        #[cfg(feature = "with_sentry")]
647        use sentry::{Level, capture_message, with_scope};
648
649        #[cfg(feature = "with_sentry")]
650        with_scope(
651            |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
652            || {
653                capture_message(
654                    &format!(
655                        "Ban peer {} for {} seconds, reason: \
656                        {}",
657                        peer,
658                        DEFAULT_BAN_TIME.as_secs(),
659                        reason
660                    ),
661                    Level::Info,
662                )
663            },
664        );
665        self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
666        self.verify_queue.write().await.remove_txs_by_peer(&peer);
667    }
668
669    pub(crate) async fn _process_tx(
670        &self,
671        tx: TransactionView,
672        declared_cycles: Option<Cycle>,
673        command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
674    ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
675        let wtx_hash = tx.witness_hash();
676        let instant = Instant::now();
677        let is_sync_process = command_rx.is_none();
678
679        let (ret, snapshot) = self.pre_check(&tx).await;
680
681        let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
682
683        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
684        let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
685        let tip_header = snapshot.tip_header();
686        let tx_env = Arc::new(status.with_env(tip_header));
687
688        let verified_ret = verify_rtx(
689            Arc::clone(&snapshot),
690            Arc::clone(&rtx),
691            tx_env,
692            &verify_cache,
693            max_cycles,
694            command_rx,
695        )
696        .await;
697
698        let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
699
700        if let Some(declared) = declared_cycles
701            && declared != verified.cycles
702        {
703            info!(
704                "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
705                declared, verified.cycles, tx
706            );
707            return Some((
708                Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
709                snapshot,
710            ));
711        }
712
713        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
714
715        let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
716        try_or_return_with_snapshot!(ret, submit_snapshot);
717
718        self.notify_block_assembler(status).await;
719
720        if verify_cache.is_none() {
721            // update cache
722            let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
723            tokio::spawn(async move {
724                let mut guard = txs_verify_cache.write().await;
725                guard.put(wtx_hash, verified);
726            });
727        }
728
729        if let Some(metrics) = ckb_metrics::handle() {
730            let elapsed = instant.elapsed().as_secs_f64();
731            if is_sync_process {
732                metrics.ckb_tx_pool_sync_process.observe(elapsed);
733            } else {
734                metrics.ckb_tx_pool_async_process.observe(elapsed);
735            }
736        }
737
738        Some((Ok(verified), submit_snapshot))
739    }
740
741    pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
742        let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
743
744        let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
745
746        // skip check the delay window
747
748        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
749        let max_cycles = self.consensus.max_block_cycles();
750        let tip_header = snapshot.tip_header();
751        let tx_env = Arc::new(status.with_env(tip_header));
752
753        verify_rtx(
754            Arc::clone(&snapshot),
755            Arc::clone(&rtx),
756            tx_env,
757            &verify_cache,
758            max_cycles,
759            None,
760        )
761        .await
762    }
763
764    pub(crate) async fn update_tx_pool_for_reorg(
765        &self,
766        detached_blocks: VecDeque<BlockView>,
767        attached_blocks: VecDeque<BlockView>,
768        detached_proposal_id: HashSet<ProposalShortId>,
769        snapshot: Arc<Snapshot>,
770    ) {
771        let mine_mode = self.block_assembler.is_some();
772        let mut detached = LinkedHashSet::default();
773        let mut attached = LinkedHashSet::default();
774
775        let detached_headers: HashSet<Byte32> = detached_blocks
776            .iter()
777            .map(|blk| blk.header().hash())
778            .collect();
779
780        for blk in detached_blocks {
781            detached.extend(blk.transactions().into_iter().skip(1))
782        }
783
784        for blk in attached_blocks {
785            self.fee_estimator.commit_block(&blk);
786            attached.extend(blk.transactions().into_iter().skip(1));
787        }
788        let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
789
790        let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
791
792        // If there are any transactions requires re-process, return them.
793        //
794        // At present, there is only one situation:
795        // - If the hardfork was happened, then re-process all transactions.
796        {
797            // This closure is used to limit the lifetime of mutable tx_pool.
798            let mut tx_pool = self.tx_pool.write().await;
799
800            _update_tx_pool_for_reorg(
801                &mut tx_pool,
802                &attached,
803                &detached_headers,
804                detached_proposal_id,
805                snapshot,
806                &self.callbacks,
807                mine_mode,
808            );
809
810            // notice: readd_detached_tx don't update cache
811            self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
812                .await;
813        }
814
815        self.remove_orphan_txs_by_attach(&attached).await;
816        {
817            let mut queue = self.verify_queue.write().await;
818            queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
819        }
820    }
821
822    async fn enqueue_verify_queue(
823        &self,
824        tx: TransactionView,
825        is_proposal_tx: bool,
826        remote: Option<(Cycle, PeerIndex)>,
827    ) -> Result<bool, Reject> {
828        let mut queue = self.verify_queue.write().await;
829        queue.add_tx(tx, is_proposal_tx, remote)
830    }
831
832    async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
833        for tx in txs.iter() {
834            self.process_orphan_tx(tx).await;
835        }
836        let mut orphan = self.orphan.write().await;
837        orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
838    }
839
840    async fn readd_detached_tx(
841        &self,
842        tx_pool: &mut TxPool,
843        txs: Vec<TransactionView>,
844        fetched_cache: HashMap<Byte32, CacheEntry>,
845    ) {
846        let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
847        for tx in txs {
848            let tx_size = tx.data().serialized_size_in_block();
849            let tx_hash = tx.hash();
850            if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false)
851                && let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size)
852            {
853                let verify_cache = fetched_cache.get(&tx_hash).cloned();
854                let snapshot = tx_pool.cloned_snapshot();
855                let tip_header = snapshot.tip_header();
856                let tx_env = Arc::new(status.with_env(tip_header));
857                if let Ok(verified) = verify_rtx(
858                    snapshot,
859                    Arc::clone(&rtx),
860                    tx_env,
861                    &verify_cache,
862                    max_cycles,
863                    None,
864                )
865                .await
866                {
867                    let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
868                    if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
869                        error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
870                    } else {
871                        debug!("readd_detached_tx submit_entry {}", tx_hash);
872                    }
873                }
874            }
875        }
876    }
877
878    pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
879        {
880            let mut tx_pool = self.tx_pool.write().await;
881            tx_pool.clear(Arc::clone(&new_snapshot));
882        }
883        // reset block_assembler
884        if self
885            .block_assembler_sender
886            .send(BlockAssemblerMessage::Reset(new_snapshot))
887            .await
888            .is_err()
889        {
890            error!("block_assembler receiver dropped");
891        }
892    }
893
894    pub(crate) async fn save_pool(&self) {
895        let mut tx_pool = self.tx_pool.write().await;
896        if let Err(err) = tx_pool.save_into_file() {
897            error!("failed to save pool, error: {:?}", err)
898        } else {
899            info!("TxPool saved successfully")
900        }
901    }
902
903    pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
904        self.fee_estimator.update_ibd_state(in_ibd);
905    }
906
907    pub(crate) async fn estimate_fee_rate(
908        &self,
909        estimate_mode: EstimateMode,
910        enable_fallback: bool,
911    ) -> Result<FeeRate, AnyError> {
912        let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
913        match self
914            .fee_estimator
915            .estimate_fee_rate(estimate_mode, all_entry_info)
916        {
917            Ok(fee_rate) => Ok(fee_rate),
918            Err(err) => {
919                if enable_fallback {
920                    let target_blocks =
921                        FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
922                    self.tx_pool
923                        .read()
924                        .await
925                        .estimate_fee_rate(target_blocks)
926                        .map_err(Into::into)
927                } else {
928                    Err(err.into())
929                }
930            }
931        }
932    }
933}
934
935type PreCheckedTx = (
936    Byte32,                   // tip_hash
937    Arc<ResolvedTransaction>, // rtx
938    TxStatus,                 // status
939    Capacity,                 // tx fee
940    usize,                    // tx size
941);
942
943type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
944
945fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
946    if snapshot.proposals().contains_proposed(short_id) {
947        TxStatus::Proposed
948    } else if snapshot.proposals().contains_gap(short_id) {
949        TxStatus::Gap
950    } else {
951        TxStatus::Fresh
952    }
953}
954
955fn check_rtx(
956    tx_pool: &TxPool,
957    snapshot: &Snapshot,
958    rtx: &ResolvedTransaction,
959) -> Result<TxStatus, Reject> {
960    let short_id = rtx.transaction.proposal_short_id();
961    let tx_status = get_tx_status(snapshot, &short_id);
962    tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
963}
964
965fn resolve_tx(
966    tx_pool: &TxPool,
967    snapshot: &Snapshot,
968    tx: TransactionView,
969    rbf: bool,
970) -> ResolveResult {
971    let short_id = tx.proposal_short_id();
972    let tx_status = get_tx_status(snapshot, &short_id);
973    tx_pool
974        .resolve_tx_from_pool(tx, rbf)
975        .map(|rtx| (rtx, tx_status))
976}
977
978fn _submit_entry(
979    tx_pool: &mut TxPool,
980    status: TxStatus,
981    entry: TxEntry,
982    callbacks: &Callbacks,
983) -> Result<HashSet<TxEntry>, Reject> {
984    let tx_hash = entry.transaction().hash();
985    debug!("submit_entry {:?} {}", status, tx_hash);
986    let (succ, evicts) = match status {
987        TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
988        TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
989        TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
990    };
991    if succ {
992        match status {
993            TxStatus::Fresh => callbacks.call_pending(&entry),
994            TxStatus::Gap => callbacks.call_pending(&entry),
995            TxStatus::Proposed => callbacks.call_proposed(&entry),
996        }
997    }
998    Ok(evicts)
999}
1000
1001fn _update_tx_pool_for_reorg(
1002    tx_pool: &mut TxPool,
1003    attached: &LinkedHashSet<TransactionView>,
1004    detached_headers: &HashSet<Byte32>,
1005    detached_proposal_id: HashSet<ProposalShortId>,
1006    snapshot: Arc<Snapshot>,
1007    callbacks: &Callbacks,
1008    mine_mode: bool,
1009) {
1010    tx_pool.snapshot = Arc::clone(&snapshot);
1011
1012    // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
1013    // pending-pool if they can be found within txpool. As for a transaction
1014    // which is both expired and committed at the one time(commit at its end of commit-window),
1015    // we should treat it as a committed and not re-put into pending-pool. So we should ensure
1016    // that involves `remove_committed_txs` before `remove_expired`.
1017    tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1018    tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1019
1020    // mine mode:
1021    // pending ---> gap ----> proposed
1022    // try move gap to proposed
1023    if mine_mode {
1024        let mut proposals = Vec::new();
1025        let mut gaps = Vec::new();
1026
1027        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1028            let short_id = entry.inner.proposal_short_id();
1029            if snapshot.proposals().contains_proposed(&short_id) {
1030                proposals.push((short_id, entry.inner.clone()));
1031            }
1032        }
1033
1034        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1035            let short_id = entry.inner.proposal_short_id();
1036            let elem = (short_id.clone(), entry.inner.clone());
1037            if snapshot.proposals().contains_proposed(&short_id) {
1038                proposals.push(elem);
1039            } else if snapshot.proposals().contains_gap(&short_id) {
1040                gaps.push(elem);
1041            }
1042        }
1043
1044        for (id, entry) in proposals {
1045            debug!("begin to proposed: {:x}", id);
1046            if let Err(e) = tx_pool.proposed_rtx(&id) {
1047                debug!(
1048                    "Failed to add proposed tx {}, reason: {}",
1049                    entry.transaction().hash(),
1050                    e
1051                );
1052                callbacks.call_reject(tx_pool, &entry, e);
1053            } else {
1054                callbacks.call_proposed(&entry)
1055            }
1056        }
1057
1058        for (id, entry) in gaps {
1059            debug!("begin to gap: {:x}", id);
1060            if let Err(e) = tx_pool.gap_rtx(&id) {
1061                debug!(
1062                    "Failed to add tx to gap {}, reason: {}",
1063                    entry.transaction().hash(),
1064                    e
1065                );
1066                callbacks.call_reject(tx_pool, &entry, e.clone());
1067            }
1068        }
1069    }
1070
1071    // Remove expired transaction from pending
1072    tx_pool.remove_expired(callbacks);
1073
1074    // Remove transactions from the pool until its size <= size_limit.
1075    let _ = tx_pool.limit_size(callbacks, None);
1076}