ckb_tx_pool/
pool.rs

1//! Top-level Pool type, methods, and tests
2extern crate rustc_hash;
3extern crate slab;
4use super::component::{TxEntry, commit_txs_scanner::CommitTxsScanner};
5use crate::callback::Callbacks;
6use crate::component::pool_map::{PoolEntry, PoolMap, Status};
7use crate::component::recent_reject::RecentReject;
8use crate::error::Reject;
9use crate::pool_cell::PoolCell;
10use ckb_app_config::TxPoolConfig;
11use ckb_fee_estimator::Error as FeeEstimatorError;
12use ckb_logger::{debug, error, warn};
13use ckb_snapshot::Snapshot;
14use ckb_store::ChainStore;
15use ckb_types::core::tx_pool::PoolTxDetailInfo;
16use ckb_types::core::{BlockNumber, CapacityError, FeeRate};
17use ckb_types::packed::OutPoint;
18use ckb_types::{
19    core::{
20        Capacity, Cycle, TransactionView, UncleBlockView,
21        cell::{OverlayCellChecker, OverlayCellProvider, ResolvedTransaction, resolve_transaction},
22        tx_pool::{TxPoolEntryInfo, TxPoolIds},
23    },
24    packed::{Byte32, ProposalShortId},
25};
26use lru::LruCache;
27use std::collections::{HashMap, HashSet};
28use std::sync::Arc;
29
30const COMMITTED_HASH_CACHE_SIZE: usize = 100_000;
31const CONFLICTES_CACHE_SIZE: usize = 10_000;
32const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000;
33const MAX_REPLACEMENT_CANDIDATES: usize = 100;
34
35/// Tx-pool implementation
36pub struct TxPool {
37    pub(crate) config: TxPoolConfig,
38    pub(crate) pool_map: PoolMap,
39    /// cache for committed transactions hash
40    pub(crate) committed_txs_hash_cache: LruCache<ProposalShortId, Byte32>,
41    /// storage snapshot reference
42    pub(crate) snapshot: Arc<Snapshot>,
43    /// record recent reject
44    pub recent_reject: Option<RecentReject>,
45    // expiration milliseconds,
46    pub(crate) expiry: u64,
47    // conflicted transaction cache
48    pub(crate) conflicts_cache: lru::LruCache<ProposalShortId, TransactionView>,
49    // conflicted transaction outputs cache, input -> tx_short_id
50    pub(crate) conflicts_outputs_cache: lru::LruCache<OutPoint, ProposalShortId>,
51}
52
53impl TxPool {
54    /// Create new TxPool
55    pub fn new(config: TxPoolConfig, snapshot: Arc<Snapshot>) -> TxPool {
56        let recent_reject = Self::build_recent_reject(&config);
57        let expiry = config.expiry_hours as u64 * 60 * 60 * 1000;
58        TxPool {
59            pool_map: PoolMap::new(config.max_ancestors_count),
60            committed_txs_hash_cache: LruCache::new(COMMITTED_HASH_CACHE_SIZE),
61            config,
62            snapshot,
63            recent_reject,
64            expiry,
65            conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE),
66            conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE),
67        }
68    }
69
70    /// Tx-pool owned snapshot, it may not consistent with chain cause tx-pool update snapshot asynchronously
71    pub(crate) fn snapshot(&self) -> &Snapshot {
72        &self.snapshot
73    }
74
75    /// Makes a clone of the `Arc<Snapshot>`
76    pub(crate) fn cloned_snapshot(&self) -> Arc<Snapshot> {
77        Arc::clone(&self.snapshot)
78    }
79
80    /// Check whether tx-pool enable RBF
81    pub fn enable_rbf(&self) -> bool {
82        self.config.min_rbf_rate > self.config.min_fee_rate
83    }
84
85    /// The least required fee rate to allow tx to be replaced
86    pub fn min_replace_fee(&self, tx: &TxEntry) -> Option<Capacity> {
87        if !self.enable_rbf() {
88            return None;
89        }
90
91        let mut conflicts = vec![self.get_pool_entry(&tx.proposal_short_id()).unwrap()];
92        let descendants = self.pool_map.calc_descendants(&tx.proposal_short_id());
93        let descendants = descendants
94            .iter()
95            .filter_map(|id| self.get_pool_entry(id))
96            .collect::<Vec<_>>();
97        conflicts.extend(descendants);
98        self.calculate_min_replace_fee(&conflicts, tx.size)
99    }
100
101    /// min_replace_fee = sum(replaced_txs.fee) + extra_rbf_fee
102    fn calculate_min_replace_fee(&self, conflicts: &[&PoolEntry], size: usize) -> Option<Capacity> {
103        let extra_rbf_fee = self.config.min_rbf_rate.fee(size as u64);
104        // don't account for duplicate txs
105        let replaced_fees: HashMap<_, _> = conflicts
106            .iter()
107            .map(|c| (c.id.clone(), c.inner.fee))
108            .collect();
109        let replaced_sum_fee = replaced_fees
110            .values()
111            .try_fold(Capacity::zero(), |acc, x| acc.safe_add(*x));
112        let res = replaced_sum_fee.map_or(Err(CapacityError::Overflow), |sum| {
113            sum.safe_add(extra_rbf_fee)
114        });
115        if let Ok(res) = res {
116            Some(res)
117        } else {
118            let fees = conflicts.iter().map(|c| c.inner.fee).collect::<Vec<_>>();
119            error!(
120                "conflicts: {:?} replaced_sum_fee {:?} overflow by add {}",
121                conflicts.iter().map(|e| e.id.clone()).collect::<Vec<_>>(),
122                fees,
123                extra_rbf_fee
124            );
125            None
126        }
127    }
128
129    /// Add tx with pending status
130    /// If did have this value present, false is returned.
131    pub(crate) fn add_pending(
132        &mut self,
133        entry: TxEntry,
134    ) -> Result<(bool, HashSet<TxEntry>), Reject> {
135        self.pool_map.add_entry(entry, Status::Pending)
136    }
137
138    /// Add tx which proposed but still uncommittable to gap
139    pub(crate) fn add_gap(&mut self, entry: TxEntry) -> Result<(bool, HashSet<TxEntry>), Reject> {
140        self.pool_map.add_entry(entry, Status::Gap)
141    }
142
143    /// Add tx with proposed status
144    pub(crate) fn add_proposed(
145        &mut self,
146        entry: TxEntry,
147    ) -> Result<(bool, HashSet<TxEntry>), Reject> {
148        self.pool_map.add_entry(entry, Status::Proposed)
149    }
150
151    /// Returns true if the tx-pool contains a tx with specified id.
152    pub(crate) fn contains_proposal_id(&self, id: &ProposalShortId) -> bool {
153        self.pool_map.get_by_id(id).is_some()
154    }
155
156    pub(crate) fn set_entry_proposed(&mut self, short_id: &ProposalShortId) {
157        self.pool_map.set_entry(short_id, Status::Proposed)
158    }
159
160    pub(crate) fn set_entry_gap(&mut self, short_id: &ProposalShortId) {
161        self.pool_map.set_entry(short_id, Status::Gap)
162    }
163
164    pub(crate) fn record_conflict(&mut self, tx: TransactionView) {
165        let short_id = tx.proposal_short_id();
166        for inputs in tx.input_pts_iter() {
167            self.conflicts_outputs_cache.put(inputs, short_id.clone());
168        }
169        self.conflicts_cache.put(short_id.clone(), tx);
170        debug!(
171            "record_conflict {:?} now cache size: {}",
172            short_id,
173            self.conflicts_cache.len()
174        );
175    }
176
177    pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) {
178        if let Some(tx) = self.conflicts_cache.pop(short_id) {
179            for inputs in tx.input_pts_iter() {
180                self.conflicts_outputs_cache.pop(&inputs);
181            }
182        }
183        debug!(
184            "remove_conflict {:?} now cache size: {}",
185            short_id,
186            self.conflicts_cache.len()
187        );
188    }
189
190    pub(crate) fn get_conflicted_txs_from_inputs(
191        &self,
192        inputs: impl Iterator<Item = OutPoint>,
193    ) -> Vec<TransactionView> {
194        inputs
195            .filter_map(|input| {
196                self.conflicts_outputs_cache
197                    .peek(&input)
198                    .and_then(|id| self.conflicts_cache.peek(id).cloned())
199            })
200            .collect()
201    }
202
203    /// Returns tx with cycles corresponding to the id.
204    pub(crate) fn get_tx_with_cycles(
205        &self,
206        id: &ProposalShortId,
207    ) -> Option<(TransactionView, Cycle)> {
208        self.pool_map
209            .get_by_id(id)
210            .map(|entry| (entry.inner.transaction().clone(), entry.inner.cycles))
211    }
212
213    pub(crate) fn get_pool_entry(&self, id: &ProposalShortId) -> Option<&PoolEntry> {
214        self.pool_map.get_by_id(id)
215    }
216
217    pub(crate) fn get_tx_from_pool(&self, id: &ProposalShortId) -> Option<&TransactionView> {
218        self.pool_map
219            .get_by_id(id)
220            .map(|entry| entry.inner.transaction())
221    }
222
223    pub(crate) fn remove_committed_txs<'a>(
224        &mut self,
225        txs: impl Iterator<Item = &'a TransactionView>,
226        callbacks: &Callbacks,
227        detached_headers: &HashSet<Byte32>,
228    ) {
229        for tx in txs {
230            let tx_hash = tx.hash();
231            debug!("try remove_committed_tx {}", tx_hash);
232            self.remove_committed_tx(tx, callbacks);
233
234            self.committed_txs_hash_cache
235                .put(tx.proposal_short_id(), tx_hash);
236        }
237
238        if !detached_headers.is_empty() {
239            self.resolve_conflict_header_dep(detached_headers, callbacks)
240        }
241    }
242
243    fn resolve_conflict_header_dep(
244        &mut self,
245        detached_headers: &HashSet<Byte32>,
246        callbacks: &Callbacks,
247    ) {
248        for (entry, reject) in self.pool_map.resolve_conflict_header_dep(detached_headers) {
249            callbacks.call_reject(self, &entry, reject);
250        }
251    }
252
253    fn remove_committed_tx(&mut self, tx: &TransactionView, callbacks: &Callbacks) {
254        let short_id = tx.proposal_short_id();
255        if let Some(_entry) = self.pool_map.remove_entry(&short_id) {
256            debug!("remove_committed_tx for {}", tx.hash());
257        }
258        {
259            for (entry, reject) in self.pool_map.resolve_conflict(tx) {
260                debug!(
261                    "removed {} for committed: {}",
262                    entry.transaction().hash(),
263                    tx.hash()
264                );
265                callbacks.call_reject(self, &entry, reject);
266            }
267        }
268    }
269
270    // Expire all transaction (and their dependencies) in the pool.
271    pub(crate) fn remove_expired(&mut self, callbacks: &Callbacks) {
272        let now_ms = ckb_systemtime::unix_time_as_millis();
273
274        let removed: Vec<_> = self
275            .pool_map
276            .iter()
277            .filter(|&entry| self.expiry + entry.inner.timestamp < now_ms)
278            .map(|entry| entry.inner.clone())
279            .collect();
280
281        for entry in removed {
282            let tx_hash = entry.transaction().hash();
283            debug!("remove_expired {} timestamp({})", tx_hash, entry.timestamp);
284            self.pool_map.remove_entry(&entry.proposal_short_id());
285            let reject = Reject::Expiry(entry.timestamp);
286            callbacks.call_reject(self, &entry, reject);
287        }
288    }
289
290    // Remove transactions from the pool until total size <= size_limit.
291    // Return a `Reject` for current inserting entry if it's removed
292    pub(crate) fn limit_size(
293        &mut self,
294        callbacks: &Callbacks,
295        current_entry_id: Option<&ProposalShortId>,
296    ) -> Option<Reject> {
297        let mut ret = None;
298        while self.pool_map.total_tx_size > self.config.max_tx_pool_size {
299            let next_evict_entry = || {
300                self.pool_map
301                    .next_evict_entry(Status::Pending)
302                    .or_else(|| self.pool_map.next_evict_entry(Status::Gap))
303                    .or_else(|| self.pool_map.next_evict_entry(Status::Proposed))
304            };
305
306            if let Some(id) = next_evict_entry() {
307                let removed = self.pool_map.remove_entry_and_descendants(&id);
308                for entry in removed {
309                    let tx_hash = entry.transaction().hash();
310                    debug!(
311                        "Removed by size limit {} timestamp({})",
312                        tx_hash, entry.timestamp
313                    );
314                    let reject = Reject::Full(format!(
315                        "the fee_rate for this transaction is: {}",
316                        entry.fee_rate()
317                    ));
318                    if let Some(short_id) = current_entry_id {
319                        if entry.proposal_short_id() == *short_id {
320                            ret = Some(reject.clone());
321                        }
322                    }
323                    callbacks.call_reject(self, &entry, reject);
324                }
325            }
326        }
327        self.pool_map.entries.shrink_to_fit();
328        ret
329    }
330
331    // remove transaction with detached proposal from gap and proposed
332    // try re-put to pending
333    pub(crate) fn remove_by_detached_proposal<'a>(
334        &mut self,
335        ids: impl Iterator<Item = &'a ProposalShortId>,
336    ) {
337        for id in ids {
338            if let Some(e) = self.pool_map.get_by_id(id) {
339                let status = e.status;
340                if status == Status::Pending {
341                    continue;
342                }
343                let mut entries = self.pool_map.remove_entry_and_descendants(id);
344                entries.sort_unstable_by_key(|entry| entry.ancestors_count);
345                for mut entry in entries {
346                    let tx_hash = entry.transaction().hash();
347                    entry.reset_statistic_state();
348                    let ret = self.add_pending(entry);
349                    debug!(
350                        "remove_by_detached_proposal from {:?} {} add_pending {:?}",
351                        status, tx_hash, ret
352                    );
353                }
354            }
355        }
356    }
357
358    pub(crate) fn remove_tx(&mut self, id: &ProposalShortId) -> bool {
359        let entries = self.pool_map.remove_entry_and_descendants(id);
360        !entries.is_empty()
361    }
362
363    pub(crate) fn check_rtx_from_pool(&self, rtx: &ResolvedTransaction) -> Result<(), Reject> {
364        let snapshot = self.snapshot();
365        let pool_cell = PoolCell::new(&self.pool_map, false);
366        let checker = OverlayCellChecker::new(&pool_cell, snapshot);
367        let mut seen_inputs = HashSet::new();
368        rtx.check(&mut seen_inputs, &checker, snapshot)
369            .map_err(Reject::Resolve)
370    }
371
372    pub(crate) fn resolve_tx_from_pool(
373        &self,
374        tx: TransactionView,
375        rbf: bool,
376    ) -> Result<Arc<ResolvedTransaction>, Reject> {
377        let snapshot = self.snapshot();
378        let pool_cell = PoolCell::new(&self.pool_map, rbf);
379        let provider = OverlayCellProvider::new(&pool_cell, snapshot);
380        let mut seen_inputs = HashSet::new();
381        resolve_transaction(tx, &mut seen_inputs, &provider, snapshot)
382            .map(Arc::new)
383            .map_err(Reject::Resolve)
384    }
385
386    pub(crate) fn gap_rtx(&mut self, short_id: &ProposalShortId) -> Result<(), Reject> {
387        match self.get_pool_entry(short_id) {
388            Some(entry) => {
389                let tx_hash = entry.inner.transaction().hash();
390                if entry.status == Status::Gap {
391                    Err(Reject::Duplicated(tx_hash))
392                } else {
393                    debug!("gap_rtx: {:?} => {:?}", tx_hash, short_id);
394                    self.set_entry_gap(short_id);
395                    Ok(())
396                }
397            }
398            None => Err(Reject::Malformed(
399                String::from("invalid short_id"),
400                Default::default(),
401            )),
402        }
403    }
404
405    pub(crate) fn proposed_rtx(&mut self, short_id: &ProposalShortId) -> Result<(), Reject> {
406        match self.get_pool_entry(short_id) {
407            Some(entry) => {
408                let tx_hash = entry.inner.transaction().hash();
409                if entry.status == Status::Proposed {
410                    Err(Reject::Duplicated(tx_hash))
411                } else {
412                    debug!("proposed_rtx: {:?} => {:?}", tx_hash, short_id);
413                    self.set_entry_proposed(short_id);
414                    Ok(())
415                }
416            }
417            None => Err(Reject::Malformed(
418                String::from("invalid short_id"),
419                Default::default(),
420            )),
421        }
422    }
423
424    /// Get to-be-proposal transactions that may be included in the next block.
425    pub(crate) fn get_proposals(
426        &self,
427        limit: usize,
428        exclusion: &HashSet<ProposalShortId>,
429    ) -> HashSet<ProposalShortId> {
430        self.pool_map.get_proposals(limit, exclusion)
431    }
432
433    /// Returns tx from tx-pool or storage corresponding to the id.
434    pub(crate) fn get_tx_from_pool_or_store(
435        &self,
436        proposal_id: &ProposalShortId,
437    ) -> Option<TransactionView> {
438        self.get_tx_from_pool(proposal_id)
439            .cloned()
440            .or_else(|| self.conflicts_cache.peek(proposal_id).cloned())
441            .or_else(|| {
442                self.committed_txs_hash_cache
443                    .peek(proposal_id)
444                    .and_then(|tx_hash| self.snapshot().get_transaction(tx_hash).map(|(tx, _)| tx))
445            })
446    }
447
448    pub(crate) fn get_ids(&self) -> TxPoolIds {
449        let pending = self
450            .pool_map
451            .score_sorted_iter_by_statuses(vec![Status::Pending, Status::Gap])
452            .map(|entry| entry.transaction().hash())
453            .collect();
454
455        let proposed = self
456            .pool_map
457            .sorted_proposed_iter()
458            .map(|entry| entry.transaction().hash())
459            .collect();
460
461        TxPoolIds { pending, proposed }
462    }
463
464    pub(crate) fn get_all_entry_info(&self) -> TxPoolEntryInfo {
465        let pending = self
466            .pool_map
467            .score_sorted_iter_by_statuses(vec![Status::Pending, Status::Gap])
468            .map(|entry| (entry.transaction().hash(), entry.to_info()))
469            .collect();
470
471        let proposed = self
472            .pool_map
473            .sorted_proposed_iter()
474            .map(|entry| (entry.transaction().hash(), entry.to_info()))
475            .collect();
476
477        let conflicted = self
478            .conflicts_cache
479            .iter()
480            .map(|(_id, tx)| tx.hash())
481            .collect();
482        TxPoolEntryInfo {
483            pending,
484            proposed,
485            conflicted,
486        }
487    }
488
489    pub(crate) fn drain_all_transactions(&mut self) -> Vec<TransactionView> {
490        let mut txs = CommitTxsScanner::new(&self.pool_map)
491            .txs_to_commit(usize::MAX, Cycle::MAX)
492            .0
493            .into_iter()
494            .map(|tx_entry| tx_entry.into_transaction())
495            .collect::<Vec<_>>();
496        let mut pending = self
497            .pool_map
498            .entries
499            .remove_by_status(&Status::Pending)
500            .into_iter()
501            .map(|e| e.inner.into_transaction())
502            .collect::<Vec<_>>();
503        txs.append(&mut pending);
504        let mut gap = self
505            .pool_map
506            .entries
507            .remove_by_status(&Status::Gap)
508            .into_iter()
509            .map(|e| e.inner.into_transaction())
510            .collect::<Vec<_>>();
511        txs.append(&mut gap);
512        self.pool_map.clear();
513        txs
514    }
515
516    pub(crate) fn clear(&mut self, snapshot: Arc<Snapshot>) {
517        self.pool_map.clear();
518        self.snapshot = snapshot;
519        self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE);
520        self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE);
521        self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE);
522    }
523
524    pub(crate) fn package_proposals(
525        &self,
526        proposals_limit: u64,
527        uncles: &[UncleBlockView],
528    ) -> HashSet<ProposalShortId> {
529        let uncle_proposals = uncles
530            .iter()
531            .flat_map(|u| u.data().proposals().into_iter())
532            .collect();
533        self.get_proposals(proposals_limit as usize, &uncle_proposals)
534    }
535
536    pub(crate) fn package_txs(
537        &self,
538        max_block_cycles: Cycle,
539        txs_size_limit: usize,
540    ) -> (Vec<TxEntry>, usize, Cycle) {
541        let (entries, size, cycles) =
542            CommitTxsScanner::new(&self.pool_map).txs_to_commit(txs_size_limit, max_block_cycles);
543
544        if !entries.is_empty() {
545            ckb_logger::info!(
546                "[get_block_template] candidate txs count: {}, size: {}/{}, cycles:{}/{}",
547                entries.len(),
548                size,
549                txs_size_limit,
550                cycles,
551                max_block_cycles
552            );
553        }
554        (entries, size, cycles)
555    }
556
557    pub(crate) fn estimate_fee_rate(
558        &self,
559        target_to_be_committed: BlockNumber,
560    ) -> Result<FeeRate, FeeEstimatorError> {
561        if !(3..=131).contains(&target_to_be_committed) {
562            return Err(FeeEstimatorError::NoProperFeeRate);
563        }
564        let fee_rate = self.pool_map.estimate_fee_rate(
565            (target_to_be_committed - self.snapshot.consensus().tx_proposal_window().closest())
566                as usize,
567            self.snapshot.consensus().max_block_bytes() as usize,
568            self.snapshot.consensus().max_block_cycles(),
569            self.config.min_fee_rate,
570        );
571        Ok(fee_rate)
572    }
573
574    pub(crate) fn check_rbf(
575        &self,
576        snapshot: &Snapshot,
577        entry: &TxEntry,
578    ) -> Result<HashSet<ProposalShortId>, Reject> {
579        assert!(self.enable_rbf());
580        let tx_inputs: Vec<OutPoint> = entry.transaction().input_pts_iter().collect();
581        let conflict_ids = self.pool_map.find_conflict_tx(entry.transaction());
582
583        if conflict_ids.is_empty() {
584            return Ok(HashSet::new());
585        }
586
587        let short_id = entry.proposal_short_id();
588
589        // Rule #1, the node has enabled RBF, which is checked by caller
590        let conflicts = conflict_ids
591            .iter()
592            .filter_map(|id| self.get_pool_entry(id))
593            .collect::<Vec<_>>();
594        assert!(conflicts.len() == conflict_ids.len());
595
596        // Rule #2, new tx don't contain any new unconfirmed inputs
597        let mut inputs = HashSet::new();
598        for c in conflicts.iter() {
599            inputs.extend(c.inner.transaction().input_pts_iter());
600        }
601
602        if tx_inputs
603            .iter()
604            .any(|pt| !inputs.contains(pt) && !snapshot.transaction_exists(&pt.tx_hash()))
605        {
606            return Err(Reject::RBFRejected(
607                "new Tx contains unconfirmed inputs".to_string(),
608            ));
609        }
610
611        // Rule #5, the replaced tx's descendants can not more than 100
612        // and the ancestor of the new tx don't have common set with the replaced tx's descendants
613        let mut replace_count: usize = 0;
614        let mut all_conflicted = conflicts.clone();
615        let ancestors = self.pool_map.calc_ancestors(&short_id);
616        for conflict in conflicts.iter() {
617            let descendants = self.pool_map.calc_descendants(&conflict.id);
618            replace_count += descendants.len() + 1;
619            if replace_count > MAX_REPLACEMENT_CANDIDATES {
620                return Err(Reject::RBFRejected(format!(
621                    "Tx conflict with too many txs, conflict txs count: {}, expect <= {}",
622                    replace_count, MAX_REPLACEMENT_CANDIDATES,
623                )));
624            }
625
626            if !descendants.is_disjoint(&ancestors) {
627                return Err(Reject::RBFRejected(
628                    "Tx ancestors have common with conflict Tx descendants".to_string(),
629                ));
630            }
631
632            let entries = descendants
633                .iter()
634                .filter_map(|id| self.get_pool_entry(id))
635                .collect::<Vec<_>>();
636
637            for entry in entries.iter() {
638                let hash = entry.inner.transaction().hash();
639                if tx_inputs.iter().any(|pt| pt.tx_hash() == hash) {
640                    return Err(Reject::RBFRejected(
641                        "new Tx contains inputs in descendants of to be replaced Tx".to_string(),
642                    ));
643                }
644            }
645            all_conflicted.extend(entries);
646        }
647
648        let tx_cells_deps: Vec<OutPoint> = entry
649            .transaction()
650            .cell_deps_iter()
651            .map(|c| c.out_point())
652            .collect();
653        for entry in all_conflicted.iter() {
654            let hash = entry.inner.transaction().hash();
655            if tx_cells_deps.iter().any(|pt| pt.tx_hash() == hash) {
656                return Err(Reject::RBFRejected(
657                    "new Tx contains cell deps from conflicts".to_string(),
658                ));
659            }
660        }
661
662        // Rule #4, new tx's fee need to higher than min_rbf_fee computed from the tx_pool configuration
663        // Rule #3, new tx's fee need to higher than conflicts, here we only check the all conflicted txs fee
664        let fee = entry.fee;
665        if let Some(min_replace_fee) = self.calculate_min_replace_fee(&all_conflicted, entry.size) {
666            if fee < min_replace_fee {
667                return Err(Reject::RBFRejected(format!(
668                    "Tx's current fee is {}, expect it to >= {} to replace old txs",
669                    fee, min_replace_fee,
670                )));
671            }
672        } else {
673            return Err(Reject::RBFRejected(
674                "calculate_min_replace_fee failed".to_string(),
675            ));
676        }
677
678        Ok(conflict_ids)
679    }
680
681    /// query the details of a transaction in the pool, only for trouble shooting
682    pub(crate) fn get_tx_detail(&self, id: &ProposalShortId) -> Option<PoolTxDetailInfo> {
683        if let Some(entry) = self.pool_map.get_by_id(id) {
684            let ids = self.get_ids();
685            let rank_in_pending = if entry.status == Status::Proposed {
686                0
687            } else {
688                let tx_hash = entry.inner.transaction().hash();
689                ids.pending
690                    .iter()
691                    .enumerate()
692                    .find(|(_, hash)| &tx_hash == *hash)
693                    .map(|r| r.0)
694                    .unwrap_or_default()
695                    + 1
696            };
697            let res = PoolTxDetailInfo {
698                timestamp: entry.inner.timestamp,
699                entry_status: entry.status.to_string(),
700                pending_count: self.pool_map.pending_size(),
701                rank_in_pending,
702                proposed_count: ids.proposed.len(),
703                descendants_count: self.pool_map.calc_descendants(id).len(),
704                ancestors_count: self.pool_map.calc_ancestors(id).len(),
705                score_sortkey: entry.inner.as_score_key().into(),
706            };
707            Some(res)
708        } else {
709            None
710        }
711    }
712
713    fn build_recent_reject(config: &TxPoolConfig) -> Option<RecentReject> {
714        if !config.recent_reject.as_os_str().is_empty() {
715            let recent_reject_ttl =
716                u8::max(1, config.keep_rejected_tx_hashes_days) as i32 * 24 * 60 * 60;
717            match RecentReject::new(
718                &config.recent_reject,
719                config.keep_rejected_tx_hashes_count,
720                recent_reject_ttl,
721            ) {
722                Ok(recent_reject) => Some(recent_reject),
723                Err(err) => {
724                    error!(
725                        "Failed to open the recent reject database {:?} {}",
726                        config.recent_reject, err
727                    );
728                    None
729                }
730            }
731        } else {
732            warn!("Recent reject database is disabled!");
733            None
734        }
735    }
736}