ckb_tx_pool/
process.rs

1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10    check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11    time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23    core::{
24        BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25        cell::ResolvedTransaction,
26    },
27    packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31    TxVerifyEnv,
32    cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40/// A list for plug target for `plug_entry` method
41pub enum PlugTarget {
42    /// Pending pool
43    Pending,
44    /// Proposed pool
45    Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50    Fresh,
51    Gap,
52    Proposed,
53}
54
55impl TxStatus {
56    fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57        match self {
58            TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59            TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60            TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61        }
62    }
63}
64
65impl TxPoolService {
66    pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67        if let Some(ref block_assembler) = self.block_assembler {
68            Ok(block_assembler.get_current().await)
69        } else {
70            Err(InternalErrorKind::Config
71                .other("BlockAssembler disabled")
72                .into())
73        }
74    }
75
76    pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77        let guard = self.txs_verify_cache.read().await;
78        guard.peek(&tx.witness_hash()).cloned()
79    }
80
81    async fn fetch_txs_verify_cache(
82        &self,
83        txs: impl Iterator<Item = &TransactionView>,
84    ) -> HashMap<Byte32, CacheEntry> {
85        let guard = self.txs_verify_cache.read().await;
86        txs.filter_map(|tx| {
87            let wtx_hash = tx.witness_hash();
88            guard
89                .peek(&wtx_hash)
90                .cloned()
91                .map(|value| (wtx_hash, value))
92        })
93        .collect()
94    }
95
96    pub(crate) async fn submit_entry(
97        &self,
98        pre_resolve_tip: Byte32,
99        entry: TxEntry,
100        mut status: TxStatus,
101    ) -> (Result<(), Reject>, Arc<Snapshot>) {
102        let (ret, snapshot) = self
103            .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104                // check_rbf must be invoked in `write` lock to avoid concurrent issues.
105                let conflicts = if tx_pool.enable_rbf() {
106                    tx_pool.check_rbf(&snapshot, &entry)?
107                } else {
108                    // RBF is disabled but we found conflicts, return error here
109                    // after_process will put this tx into conflicts_pool
110                    let conflicted_outpoint =
111                        tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112                    if let Some(outpoint) = conflicted_outpoint {
113                        return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114                    }
115                    HashSet::new()
116                };
117
118                // if snapshot changed by context switch we need redo time_relative verify
119                let tip_hash = snapshot.tip_hash();
120                if pre_resolve_tip != tip_hash {
121                    debug!(
122                        "submit_entry {} context changed. previous:{} now:{}",
123                        entry.proposal_short_id(),
124                        pre_resolve_tip,
125                        tip_hash
126                    );
127
128                    // destructuring assignments are not currently supported
129                    status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131                    let tip_header = snapshot.tip_header();
132                    let tx_env = status.with_env(tip_header);
133                    time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134                }
135
136                let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137                let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139                // in a corner case, a tx with lower fee rate may be rejected immediately
140                // after inserting into pool, return proper reject error here
141                for evict in evicted {
142                    let reject = Reject::Invalidated(format!(
143                        "invalidated by tx {}",
144                        evict.transaction().hash()
145                    ));
146                    self.callbacks.call_reject(tx_pool, &evict, reject);
147                }
148
149                tx_pool.remove_conflict(&entry.proposal_short_id());
150                tx_pool
151                    .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152                    .map_or(Ok(()), Err)?;
153
154                if !may_recovered_txs.is_empty() {
155                    let self_clone = self.clone();
156                    tokio::spawn(async move {
157                        // push the recovered txs back to verify queue, so that they can be verified and submitted again
158                        let mut queue = self_clone.verify_queue.write().await;
159                        for tx in may_recovered_txs {
160                            debug!("recover back: {:?}", tx.proposal_short_id());
161                            let _ = queue.add_tx(tx, None);
162                        }
163                    });
164                }
165                Ok(())
166            })
167            .await;
168
169        (ret, snapshot)
170    }
171
172    pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173        if self.should_notify_block_assembler() {
174            match status {
175                TxStatus::Fresh => {
176                    if self
177                        .block_assembler_sender
178                        .send(BlockAssemblerMessage::Pending)
179                        .await
180                        .is_err()
181                    {
182                        error!("block_assembler receiver dropped");
183                    }
184                }
185                TxStatus::Proposed => {
186                    if self
187                        .block_assembler_sender
188                        .send(BlockAssemblerMessage::Proposed)
189                        .await
190                        .is_err()
191                    {
192                        error!("block_assembler receiver dropped");
193                    }
194                }
195                _ => {}
196            }
197        }
198    }
199
200    // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
201    // since they maybe not conflicted anymore
202    fn process_rbf(
203        &self,
204        tx_pool: &mut TxPool,
205        entry: &TxEntry,
206        conflicts: &HashSet<ProposalShortId>,
207    ) -> Vec<TransactionView> {
208        let mut may_recovered_txs = vec![];
209        let mut available_inputs = HashSet::new();
210
211        if conflicts.is_empty() {
212            return may_recovered_txs;
213        }
214
215        let all_removed: Vec<_> = conflicts
216            .iter()
217            .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
218            .collect();
219
220        available_inputs.extend(
221            all_removed
222                .iter()
223                .flat_map(|removed| removed.transaction().input_pts_iter()),
224        );
225
226        for input in entry.transaction().input_pts_iter() {
227            available_inputs.remove(&input);
228        }
229
230        may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
231        for old in all_removed {
232            debug!(
233                "remove conflict tx {} for RBF by new tx {}",
234                old.transaction().hash(),
235                entry.transaction().hash()
236            );
237            let reject =
238                Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
239
240            // RBF replace successfully, put old transactions into conflicts pool
241            tx_pool.record_conflict(old.transaction().clone());
242            // after removing old tx from tx_pool, we call reject callbacks manually
243            self.callbacks.call_reject(tx_pool, &old, reject);
244        }
245        assert!(!may_recovered_txs.contains(entry.transaction()));
246        may_recovered_txs
247    }
248
249    pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
250        let queue = self.verify_queue.read().await;
251        queue.contains_key(&tx.proposal_short_id())
252    }
253
254    pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
255        let orphan = self.orphan.read().await;
256        orphan.contains_key(&tx.proposal_short_id())
257    }
258
259    pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
260        &self,
261        mut f: F,
262    ) -> (U, Arc<Snapshot>) {
263        let tx_pool = self.tx_pool.read().await;
264        let snapshot = tx_pool.cloned_snapshot();
265
266        let ret = f(&tx_pool, Arc::clone(&snapshot));
267        (ret, snapshot)
268    }
269
270    pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
271        &self,
272        mut f: F,
273    ) -> (U, Arc<Snapshot>) {
274        let mut tx_pool = self.tx_pool.write().await;
275        let snapshot = tx_pool.cloned_snapshot();
276
277        let ret = f(&mut tx_pool, Arc::clone(&snapshot));
278        (ret, snapshot)
279    }
280
281    pub(crate) async fn pre_check(
282        &self,
283        tx: &TransactionView,
284    ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
285        // Acquire read lock for cheap check
286        let tx_size = tx.data().serialized_size_in_block();
287
288        let (ret, snapshot) = self
289            .with_tx_pool_read_lock(|tx_pool, snapshot| {
290                let tip_hash = snapshot.tip_hash();
291
292                // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc.
293                // It's also not possible for RBF, reject it directly
294                check_txid_collision(tx_pool, tx)?;
295
296                // Try normal path first, if double-spending check success we don't need RBF check
297                // this make sure RBF won't introduce extra performance cost for hot path
298                let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
299                match res {
300                    Ok((rtx, status)) => {
301                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
302                        Ok((tip_hash, rtx, status, fee, tx_size))
303                    }
304                    Err(Reject::Resolve(OutPointError::Dead(out))) => {
305                        let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
306                        let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
307                        let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
308                        if conflicts.is_none() {
309                            // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
310                            // we should reject it directly and don't need to put it into conflicts pool
311                            error!(
312                                "{} is resolved as Dead, but there is no conflicted tx",
313                                rtx.transaction.proposal_short_id()
314                            );
315                            return Err(Reject::Resolve(OutPointError::Dead(out)));
316                        }
317                        // we also return Ok here, so that the entry will be continue to be verified before submit
318                        // we only want to put it into conflicts pool after the verification stage passed
319                        // then we will double-check conflicts txs in `submit_entry`
320
321                        Ok((tip_hash, rtx, status, fee, tx_size))
322                    }
323                    Err(err) => Err(err),
324                }
325            })
326            .await;
327        (ret, snapshot)
328    }
329
330    pub(crate) async fn non_contextual_verify(
331        &self,
332        tx: &TransactionView,
333        remote: Option<(Cycle, PeerIndex)>,
334    ) -> Result<(), Reject> {
335        if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
336            if reject.is_malformed_tx() {
337                if let Some(remote) = remote {
338                    self.ban_malformed(remote.1, format!("reject {reject}"))
339                        .await;
340                }
341            }
342            return Err(reject);
343        }
344        Ok(())
345    }
346
347    pub(crate) async fn resumeble_process_tx(
348        &self,
349        tx: TransactionView,
350        remote: Option<(Cycle, PeerIndex)>,
351    ) -> Result<bool, Reject> {
352        // non contextual verify first
353        self.non_contextual_verify(&tx, remote).await?;
354
355        if self.orphan_contains(&tx).await {
356            debug!("reject tx {} already in orphan pool", tx.hash());
357            return Err(Reject::Duplicated(tx.hash()));
358        }
359
360        if self.verify_queue_contains(&tx).await {
361            return Err(Reject::Duplicated(tx.hash()));
362        }
363        self.enqueue_verify_queue(tx, remote).await
364    }
365
366    pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
367        // non contextual verify first
368        self.non_contextual_verify(&tx, None).await?;
369
370        if self.verify_queue_contains(&tx).await {
371            return Err(Reject::Duplicated(tx.hash()));
372        }
373
374        if self.orphan_contains(&tx).await {
375            debug!("reject tx {} already in orphan pool", tx.hash());
376            return Err(Reject::Duplicated(tx.hash()));
377        }
378        self._test_accept_tx(tx.clone()).await
379    }
380
381    pub(crate) async fn process_tx(
382        &self,
383        tx: TransactionView,
384        remote: Option<(Cycle, PeerIndex)>,
385    ) -> Result<Completed, Reject> {
386        // non contextual verify first
387        self.non_contextual_verify(&tx, remote).await?;
388
389        if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
390            return Err(Reject::Duplicated(tx.hash()));
391        }
392
393        if let Some((ret, snapshot)) = self
394            ._process_tx(tx.clone(), remote.map(|r| r.0), None)
395            .await
396        {
397            self.after_process(tx, remote, &snapshot, &ret).await;
398            ret
399        } else {
400            // currently, the returned cycles is not been used, mock 0 if delay
401            Ok(Completed {
402                cycles: 0,
403                fee: Capacity::zero(),
404            })
405        }
406    }
407
408    pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
409        let mut tx_pool = self.tx_pool.write().await;
410        if let Some(ref mut recent_reject) = tx_pool.recent_reject {
411            if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
412                error!(
413                    "Failed to record recent_reject {} {} {}",
414                    tx_hash, reject, e
415                );
416            }
417        }
418    }
419
420    pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
421        let id = ProposalShortId::from_tx_hash(&tx_hash);
422        {
423            let mut queue = self.verify_queue.write().await;
424            if queue.remove_tx(&id).is_some() {
425                return true;
426            }
427        }
428        {
429            let mut orphan = self.orphan.write().await;
430            if orphan.remove_orphan_tx(&id).is_some() {
431                return true;
432            }
433        }
434        let mut tx_pool = self.tx_pool.write().await;
435        tx_pool.remove_tx(&id)
436    }
437
438    pub(crate) async fn after_process(
439        &self,
440        tx: TransactionView,
441        remote: Option<(Cycle, PeerIndex)>,
442        _snapshot: &Snapshot,
443        ret: &Result<Completed, Reject>,
444    ) {
445        let tx_hash = tx.hash();
446
447        // log tx verification result for monitor node
448        if log_enabled_target!("ckb_tx_monitor", Trace) {
449            if let Ok(c) = ret {
450                trace_target!(
451                    "ckb_tx_monitor",
452                    r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
453                    tx_hash,
454                    c.cycles
455                );
456            }
457        }
458
459        if matches!(
460            ret,
461            Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
462        ) {
463            let mut tx_pool = self.tx_pool.write().await;
464            if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
465                tx_pool.record_conflict(tx.clone());
466            }
467        }
468
469        match remote {
470            Some((declared_cycle, peer)) => match ret {
471                Ok(_) => {
472                    debug!(
473                        "after_process remote send_result_to_relayer {} {}",
474                        tx_hash, peer
475                    );
476                    self.send_result_to_relayer(TxVerificationResult::Ok {
477                        original_peer: Some(peer),
478                        tx_hash,
479                    });
480                    self.process_orphan_tx(&tx).await;
481                }
482                Err(reject) => {
483                    info!(
484                        "after_process {} {} remote reject: {} ",
485                        tx_hash, peer, reject
486                    );
487                    if is_missing_input(reject) {
488                        self.send_result_to_relayer(TxVerificationResult::UnknownParents {
489                            peer,
490                            parents: tx.unique_parents(),
491                        });
492                        self.add_orphan(tx, peer, declared_cycle).await;
493                    } else {
494                        if reject.is_malformed_tx() {
495                            self.ban_malformed(peer, format!("reject {reject}")).await;
496                        }
497                        if reject.is_allowed_relay() {
498                            self.send_result_to_relayer(TxVerificationResult::Reject {
499                                tx_hash: tx_hash.clone(),
500                            });
501                        }
502                        if reject.should_recorded() {
503                            self.put_recent_reject(&tx_hash, reject).await;
504                        }
505                    }
506                }
507            },
508            None => {
509                match ret {
510                    Ok(_) => {
511                        debug!("after_process local send_result_to_relayer {}", tx_hash);
512                        self.send_result_to_relayer(TxVerificationResult::Ok {
513                            original_peer: None,
514                            tx_hash,
515                        });
516                        self.process_orphan_tx(&tx).await;
517                    }
518                    Err(Reject::Duplicated(_)) => {
519                        debug!("after_process {} duplicated", tx_hash);
520                        // re-broadcast tx when it's duplicated and submitted through local rpc
521                        self.send_result_to_relayer(TxVerificationResult::Ok {
522                            original_peer: None,
523                            tx_hash,
524                        });
525                    }
526                    Err(reject) => {
527                        debug!("after_process {} reject: {} ", tx_hash, reject);
528                        if reject.should_recorded() {
529                            self.put_recent_reject(&tx_hash, reject).await;
530                        }
531                    }
532                }
533            }
534        }
535    }
536
537    pub(crate) async fn add_orphan(
538        &self,
539        tx: TransactionView,
540        peer: PeerIndex,
541        declared_cycle: Cycle,
542    ) {
543        let evicted_txs = self
544            .orphan
545            .write()
546            .await
547            .add_orphan_tx(tx, peer, declared_cycle);
548        // for any evicted orphan tx, we should send reject to relayer
549        // so that we mark it as `unknown` in filter
550        for tx_hash in evicted_txs {
551            self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
552        }
553    }
554
555    pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
556        let orphan = self.orphan.read().await;
557        orphan
558            .find_by_previous(tx)
559            .iter()
560            .filter_map(|id| orphan.get(id).cloned())
561            .collect::<Vec<_>>()
562    }
563
564    pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
565        self.orphan.write().await.remove_orphan_tx(id);
566    }
567
568    /// Remove all orphans which are resolved by the given transaction
569    /// the process is like a breath first search, if there is a cycle in `orphan_queue`,
570    /// `_process_tx` will return `Reject` since we have checked duplicated tx
571    pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
572        let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
573        orphan_queue.push_back(tx.clone());
574
575        while let Some(previous) = orphan_queue.pop_front() {
576            let orphans = self.find_orphan_by_previous(&previous).await;
577            for orphan in orphans.into_iter() {
578                if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
579                    debug!(
580                        "process_orphan {} added to verify queue; find previous from {}",
581                        orphan.tx.hash(),
582                        tx.hash(),
583                    );
584                    self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
585                    self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
586                        .await
587                        .expect("enqueue suspended tx");
588                } else if let Some((ret, _snapshot)) = self
589                    ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
590                    .await
591                {
592                    match ret {
593                        Ok(_) => {
594                            self.send_result_to_relayer(TxVerificationResult::Ok {
595                                original_peer: Some(orphan.peer),
596                                tx_hash: orphan.tx.hash(),
597                            });
598                            debug!(
599                                "process_orphan {} success, find previous from {}",
600                                orphan.tx.hash(),
601                                tx.hash()
602                            );
603                            self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
604                            orphan_queue.push_back(orphan.tx);
605                        }
606                        Err(reject) => {
607                            debug!(
608                                "process_orphan {} reject {}, find previous from {}",
609                                orphan.tx.hash(),
610                                reject,
611                                tx.hash(),
612                            );
613
614                            if !is_missing_input(&reject) {
615                                self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
616                                if reject.is_malformed_tx() {
617                                    self.ban_malformed(orphan.peer, format!("reject {reject}"))
618                                        .await;
619                                }
620                                if reject.is_allowed_relay() {
621                                    self.send_result_to_relayer(TxVerificationResult::Reject {
622                                        tx_hash: orphan.tx.hash(),
623                                    });
624                                }
625                                if reject.should_recorded() {
626                                    self.put_recent_reject(&orphan.tx.hash(), &reject).await;
627                                }
628                            }
629                        }
630                    }
631                }
632            }
633        }
634    }
635
636    pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
637        if let Err(e) = self.tx_relay_sender.send(result) {
638            error!("tx-pool tx_relay_sender internal error {}", e);
639        }
640    }
641
642    async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
643        const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
644
645        #[cfg(feature = "with_sentry")]
646        use sentry::{Level, capture_message, with_scope};
647
648        #[cfg(feature = "with_sentry")]
649        with_scope(
650            |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
651            || {
652                capture_message(
653                    &format!(
654                        "Ban peer {} for {} seconds, reason: \
655                        {}",
656                        peer,
657                        DEFAULT_BAN_TIME.as_secs(),
658                        reason
659                    ),
660                    Level::Info,
661                )
662            },
663        );
664        self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
665        self.verify_queue.write().await.remove_txs_by_peer(&peer);
666    }
667
668    pub(crate) async fn _process_tx(
669        &self,
670        tx: TransactionView,
671        declared_cycles: Option<Cycle>,
672        command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
673    ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
674        let wtx_hash = tx.witness_hash();
675        let instant = Instant::now();
676        let is_sync_process = command_rx.is_none();
677
678        let (ret, snapshot) = self.pre_check(&tx).await;
679
680        let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
681
682        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
683        let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
684        let tip_header = snapshot.tip_header();
685        let tx_env = Arc::new(status.with_env(tip_header));
686
687        let verified_ret = verify_rtx(
688            Arc::clone(&snapshot),
689            Arc::clone(&rtx),
690            tx_env,
691            &verify_cache,
692            max_cycles,
693            command_rx,
694        )
695        .await;
696
697        let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
698
699        if let Some(declared) = declared_cycles {
700            if declared != verified.cycles {
701                info!(
702                    "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
703                    declared, verified.cycles, tx
704                );
705                return Some((
706                    Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
707                    snapshot,
708                ));
709            }
710        }
711
712        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
713
714        let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
715        try_or_return_with_snapshot!(ret, submit_snapshot);
716
717        self.notify_block_assembler(status).await;
718
719        if verify_cache.is_none() {
720            // update cache
721            let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
722            tokio::spawn(async move {
723                let mut guard = txs_verify_cache.write().await;
724                guard.put(wtx_hash, verified);
725            });
726        }
727
728        if let Some(metrics) = ckb_metrics::handle() {
729            let elapsed = instant.elapsed().as_secs_f64();
730            if is_sync_process {
731                metrics.ckb_tx_pool_sync_process.observe(elapsed);
732            } else {
733                metrics.ckb_tx_pool_async_process.observe(elapsed);
734            }
735        }
736
737        Some((Ok(verified), submit_snapshot))
738    }
739
740    pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
741        let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
742
743        let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
744
745        // skip check the delay window
746
747        let verify_cache = self.fetch_tx_verify_cache(&tx).await;
748        let max_cycles = self.consensus.max_block_cycles();
749        let tip_header = snapshot.tip_header();
750        let tx_env = Arc::new(status.with_env(tip_header));
751
752        verify_rtx(
753            Arc::clone(&snapshot),
754            Arc::clone(&rtx),
755            tx_env,
756            &verify_cache,
757            max_cycles,
758            None,
759        )
760        .await
761    }
762
763    pub(crate) async fn update_tx_pool_for_reorg(
764        &self,
765        detached_blocks: VecDeque<BlockView>,
766        attached_blocks: VecDeque<BlockView>,
767        detached_proposal_id: HashSet<ProposalShortId>,
768        snapshot: Arc<Snapshot>,
769    ) {
770        let mine_mode = self.block_assembler.is_some();
771        let mut detached = LinkedHashSet::default();
772        let mut attached = LinkedHashSet::default();
773
774        let detached_headers: HashSet<Byte32> = detached_blocks
775            .iter()
776            .map(|blk| blk.header().hash())
777            .collect();
778
779        for blk in detached_blocks {
780            detached.extend(blk.transactions().into_iter().skip(1))
781        }
782
783        for blk in attached_blocks {
784            self.fee_estimator.commit_block(&blk);
785            attached.extend(blk.transactions().into_iter().skip(1));
786        }
787        let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
788
789        let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
790
791        // If there are any transactions requires re-process, return them.
792        //
793        // At present, there is only one situation:
794        // - If the hardfork was happened, then re-process all transactions.
795        {
796            // This closure is used to limit the lifetime of mutable tx_pool.
797            let mut tx_pool = self.tx_pool.write().await;
798
799            _update_tx_pool_for_reorg(
800                &mut tx_pool,
801                &attached,
802                &detached_headers,
803                detached_proposal_id,
804                snapshot,
805                &self.callbacks,
806                mine_mode,
807            );
808
809            // notice: readd_detached_tx don't update cache
810            self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
811                .await;
812        }
813
814        self.remove_orphan_txs_by_attach(&attached).await;
815        {
816            let mut queue = self.verify_queue.write().await;
817            queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
818        }
819    }
820
821    async fn enqueue_verify_queue(
822        &self,
823        tx: TransactionView,
824        remote: Option<(Cycle, PeerIndex)>,
825    ) -> Result<bool, Reject> {
826        let mut queue = self.verify_queue.write().await;
827        queue.add_tx(tx, remote)
828    }
829
830    async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
831        for tx in txs.iter() {
832            self.process_orphan_tx(tx).await;
833        }
834        let mut orphan = self.orphan.write().await;
835        orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
836    }
837
838    async fn readd_detached_tx(
839        &self,
840        tx_pool: &mut TxPool,
841        txs: Vec<TransactionView>,
842        fetched_cache: HashMap<Byte32, CacheEntry>,
843    ) {
844        let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
845        for tx in txs {
846            let tx_size = tx.data().serialized_size_in_block();
847            let tx_hash = tx.hash();
848            if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
849                if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
850                    let verify_cache = fetched_cache.get(&tx_hash).cloned();
851                    let snapshot = tx_pool.cloned_snapshot();
852                    let tip_header = snapshot.tip_header();
853                    let tx_env = Arc::new(status.with_env(tip_header));
854                    if let Ok(verified) = verify_rtx(
855                        snapshot,
856                        Arc::clone(&rtx),
857                        tx_env,
858                        &verify_cache,
859                        max_cycles,
860                        None,
861                    )
862                    .await
863                    {
864                        let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
865                        if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
866                            error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
867                        } else {
868                            debug!("readd_detached_tx submit_entry {}", tx_hash);
869                        }
870                    }
871                }
872            }
873        }
874    }
875
876    pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
877        {
878            let mut tx_pool = self.tx_pool.write().await;
879            tx_pool.clear(Arc::clone(&new_snapshot));
880        }
881        // reset block_assembler
882        if self
883            .block_assembler_sender
884            .send(BlockAssemblerMessage::Reset(new_snapshot))
885            .await
886            .is_err()
887        {
888            error!("block_assembler receiver dropped");
889        }
890    }
891
892    pub(crate) async fn save_pool(&self) {
893        let mut tx_pool = self.tx_pool.write().await;
894        if let Err(err) = tx_pool.save_into_file() {
895            error!("failed to save pool, error: {:?}", err)
896        } else {
897            info!("TxPool saved successfully")
898        }
899    }
900
901    pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
902        self.fee_estimator.update_ibd_state(in_ibd);
903    }
904
905    pub(crate) async fn estimate_fee_rate(
906        &self,
907        estimate_mode: EstimateMode,
908        enable_fallback: bool,
909    ) -> Result<FeeRate, AnyError> {
910        let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
911        match self
912            .fee_estimator
913            .estimate_fee_rate(estimate_mode, all_entry_info)
914        {
915            Ok(fee_rate) => Ok(fee_rate),
916            Err(err) => {
917                if enable_fallback {
918                    let target_blocks =
919                        FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
920                    self.tx_pool
921                        .read()
922                        .await
923                        .estimate_fee_rate(target_blocks)
924                        .map_err(Into::into)
925                } else {
926                    Err(err.into())
927                }
928            }
929        }
930    }
931}
932
933type PreCheckedTx = (
934    Byte32,                   // tip_hash
935    Arc<ResolvedTransaction>, // rtx
936    TxStatus,                 // status
937    Capacity,                 // tx fee
938    usize,                    // tx size
939);
940
941type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
942
943fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
944    if snapshot.proposals().contains_proposed(short_id) {
945        TxStatus::Proposed
946    } else if snapshot.proposals().contains_gap(short_id) {
947        TxStatus::Gap
948    } else {
949        TxStatus::Fresh
950    }
951}
952
953fn check_rtx(
954    tx_pool: &TxPool,
955    snapshot: &Snapshot,
956    rtx: &ResolvedTransaction,
957) -> Result<TxStatus, Reject> {
958    let short_id = rtx.transaction.proposal_short_id();
959    let tx_status = get_tx_status(snapshot, &short_id);
960    tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
961}
962
963fn resolve_tx(
964    tx_pool: &TxPool,
965    snapshot: &Snapshot,
966    tx: TransactionView,
967    rbf: bool,
968) -> ResolveResult {
969    let short_id = tx.proposal_short_id();
970    let tx_status = get_tx_status(snapshot, &short_id);
971    tx_pool
972        .resolve_tx_from_pool(tx, rbf)
973        .map(|rtx| (rtx, tx_status))
974}
975
976fn _submit_entry(
977    tx_pool: &mut TxPool,
978    status: TxStatus,
979    entry: TxEntry,
980    callbacks: &Callbacks,
981) -> Result<HashSet<TxEntry>, Reject> {
982    let tx_hash = entry.transaction().hash();
983    debug!("submit_entry {:?} {}", status, tx_hash);
984    let (succ, evicts) = match status {
985        TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
986        TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
987        TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
988    };
989    if succ {
990        match status {
991            TxStatus::Fresh => callbacks.call_pending(&entry),
992            TxStatus::Gap => callbacks.call_pending(&entry),
993            TxStatus::Proposed => callbacks.call_proposed(&entry),
994        }
995    }
996    Ok(evicts)
997}
998
999fn _update_tx_pool_for_reorg(
1000    tx_pool: &mut TxPool,
1001    attached: &LinkedHashSet<TransactionView>,
1002    detached_headers: &HashSet<Byte32>,
1003    detached_proposal_id: HashSet<ProposalShortId>,
1004    snapshot: Arc<Snapshot>,
1005    callbacks: &Callbacks,
1006    mine_mode: bool,
1007) {
1008    tx_pool.snapshot = Arc::clone(&snapshot);
1009
1010    // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
1011    // pending-pool if they can be found within txpool. As for a transaction
1012    // which is both expired and committed at the one time(commit at its end of commit-window),
1013    // we should treat it as a committed and not re-put into pending-pool. So we should ensure
1014    // that involves `remove_committed_txs` before `remove_expired`.
1015    tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1016    tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1017
1018    // mine mode:
1019    // pending ---> gap ----> proposed
1020    // try move gap to proposed
1021    if mine_mode {
1022        let mut proposals = Vec::new();
1023        let mut gaps = Vec::new();
1024
1025        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1026            let short_id = entry.inner.proposal_short_id();
1027            if snapshot.proposals().contains_proposed(&short_id) {
1028                proposals.push((short_id, entry.inner.clone()));
1029            }
1030        }
1031
1032        for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1033            let short_id = entry.inner.proposal_short_id();
1034            let elem = (short_id.clone(), entry.inner.clone());
1035            if snapshot.proposals().contains_proposed(&short_id) {
1036                proposals.push(elem);
1037            } else if snapshot.proposals().contains_gap(&short_id) {
1038                gaps.push(elem);
1039            }
1040        }
1041
1042        for (id, entry) in proposals {
1043            debug!("begin to proposed: {:x}", id);
1044            if let Err(e) = tx_pool.proposed_rtx(&id) {
1045                debug!(
1046                    "Failed to add proposed tx {}, reason: {}",
1047                    entry.transaction().hash(),
1048                    e
1049                );
1050                callbacks.call_reject(tx_pool, &entry, e);
1051            } else {
1052                callbacks.call_proposed(&entry)
1053            }
1054        }
1055
1056        for (id, entry) in gaps {
1057            debug!("begin to gap: {:x}", id);
1058            if let Err(e) = tx_pool.gap_rtx(&id) {
1059                debug!(
1060                    "Failed to add tx to gap {}, reason: {}",
1061                    entry.transaction().hash(),
1062                    e
1063                );
1064                callbacks.call_reject(tx_pool, &entry, e.clone());
1065            }
1066        }
1067    }
1068
1069    // Remove expired transaction from pending
1070    tx_pool.remove_expired(callbacks);
1071
1072    // Remove transactions from the pool until its size <= size_limit.
1073    let _ = tx_pool.limit_size(callbacks, None);
1074}