ckb_tx_pool/
service.rs

1//! Tx-pool background service
2
3use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::util::after_delay_window;
11use crate::verify_mgr::VerifyMgr;
12use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
13use ckb_async_runtime::Handle;
14use ckb_chain_spec::consensus::Consensus;
15use ckb_channel::oneshot;
16use ckb_error::AnyError;
17use ckb_fee_estimator::FeeEstimator;
18use ckb_jsonrpc_types::BlockTemplate;
19use ckb_logger::error;
20use ckb_logger::info;
21use ckb_network::{NetworkController, PeerIndex};
22use ckb_script::ChunkCommand;
23use ckb_snapshot::Snapshot;
24use ckb_stop_handler::new_tokio_exit_rx;
25use ckb_store::ChainStore;
26use ckb_types::{
27    core::{
28        cell::{CellProvider, CellStatus, OverlayCellProvider},
29        tx_pool::{
30            EntryCompleted, PoolTxDetailInfo, Reject, TransactionWithStatus, TxPoolEntryInfo,
31            TxPoolIds, TxPoolInfo, TxStatus, TRANSACTION_SIZE_LIMIT,
32        },
33        BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
34    },
35    packed::{Byte32, OutPoint, ProposalShortId},
36};
37use ckb_util::{LinkedHashMap, LinkedHashSet};
38use ckb_verification::cache::TxVerificationCache;
39use std::collections::{HashMap, HashSet, VecDeque};
40use std::sync::{
41    atomic::{AtomicBool, Ordering},
42    Arc,
43};
44use std::time::Duration;
45use tokio::sync::watch;
46use tokio::sync::{mpsc, RwLock};
47use tokio::task::block_in_place;
48use tokio_util::sync::CancellationToken;
49
50use crate::pool_cell::PoolCell;
51#[cfg(feature = "internal")]
52use crate::{component::entry::TxEntry, process::PlugTarget};
53
54pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
55pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
56
57pub(crate) struct Request<A, R> {
58    pub responder: oneshot::Sender<R>,
59    pub arguments: A,
60}
61
62impl<A, R> Request<A, R> {
63    pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
64        Request {
65            responder,
66            arguments,
67        }
68    }
69}
70
71pub(crate) struct Notify<A> {
72    pub arguments: A,
73}
74
75impl<A> Notify<A> {
76    pub(crate) fn new(arguments: A) -> Notify<A> {
77        Notify { arguments }
78    }
79}
80
81pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
82type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
83
84pub(crate) type SubmitTxResult = Result<(), Reject>;
85
86pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
87
88type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
89type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
90type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
91
92pub(crate) type ChainReorgArgs = (
93    VecDeque<BlockView>,
94    VecDeque<BlockView>,
95    HashSet<ProposalShortId>,
96    Arc<Snapshot>,
97);
98
99pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
100
101pub(crate) enum Message {
102    BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
103    SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
104    RemoveLocalTx(Request<Byte32, bool>),
105    TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
106    SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
107    NotifyTxs(Notify<Vec<TransactionView>>),
108    FreshProposalsFilter(Request<Vec<ProposalShortId>, Vec<ProposalShortId>>),
109    FetchTxs(Request<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
110    FetchTxsWithCycles(Request<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
111    GetTxPoolInfo(Request<(), TxPoolInfo>),
112    GetLiveCell(Request<(OutPoint, bool), CellStatus>),
113    GetTxStatus(Request<Byte32, GetTxStatusResult>),
114    GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
115    NewUncle(Notify<UncleBlockView>),
116    ClearPool(Request<Arc<Snapshot>, ()>),
117    ClearVerifyQueue(Request<(), ()>),
118    GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
119    GetAllIds(Request<(), TxPoolIds>),
120    SavePool(Request<(), ()>),
121    GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
122
123    UpdateIBDState(Request<bool, ()>),
124    EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
125
126    // test
127    #[cfg(feature = "internal")]
128    PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
129    #[cfg(feature = "internal")]
130    PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
131    SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
132}
133
134#[derive(Debug, Hash, Eq, PartialEq)]
135pub(crate) enum BlockAssemblerMessage {
136    Pending,
137    Proposed,
138    Uncle,
139    Reset(Arc<Snapshot>),
140}
141
142/// Controller to the tx-pool service.
143///
144/// The Controller is internally reference-counted and can be freely cloned. A Controller can be obtained when tx-pool service construct.
145#[derive(Clone)]
146pub struct TxPoolController {
147    sender: mpsc::Sender<Message>,
148    reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
149    chunk_tx: Arc<watch::Sender<ChunkCommand>>,
150    handle: Handle,
151    started: Arc<AtomicBool>,
152}
153
154macro_rules! send_message {
155    ($self:ident, $msg_type:ident, $args:expr) => {{
156        let (responder, response) = oneshot::channel();
157        let request = Request::call($args, responder);
158        $self
159            .sender
160            .try_send(Message::$msg_type(request))
161            .map_err(|e| {
162                let (_m, e) = handle_try_send_error(e);
163                e
164            })?;
165        block_in_place(|| response.recv())
166            .map_err(handle_recv_error)
167            .map_err(Into::into)
168    }};
169}
170
171macro_rules! send_notify {
172    ($self:ident, $msg_type:ident, $args:expr) => {{
173        let notify = Notify::new($args);
174        $self
175            .sender
176            .try_send(Message::$msg_type(notify))
177            .map_err(|e| {
178                let (_m, e) = handle_try_send_error(e);
179                e.into()
180            })
181    }};
182}
183
184impl TxPoolController {
185    /// Return whether tx-pool service is started
186    pub fn service_started(&self) -> bool {
187        self.started.load(Ordering::Acquire)
188    }
189
190    /// Set tx-pool service started, should only used for test
191    #[cfg(feature = "internal")]
192    pub fn set_service_started(&self, v: bool) {
193        self.started.store(v, Ordering::Release);
194    }
195
196    /// Return reference of tokio runtime handle
197    pub fn handle(&self) -> &Handle {
198        &self.handle
199    }
200
201    /// Generate and return block_template
202    pub fn get_block_template(
203        &self,
204        bytes_limit: Option<u64>,
205        proposals_limit: Option<u64>,
206        max_version: Option<Version>,
207    ) -> Result<BlockTemplateResult, AnyError> {
208        send_message!(
209            self,
210            BlockTemplate,
211            (bytes_limit, proposals_limit, max_version)
212        )
213    }
214
215    /// Notify new uncle
216    pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
217        send_notify!(self, NewUncle, uncle)
218    }
219
220    /// Make tx-pool consistent after a reorg, by re-adding or recursively erasing
221    /// detached block transactions from the tx-pool, and also removing any
222    /// other transactions from the tx-pool that are no longer valid given the new
223    /// tip/height.
224    pub fn update_tx_pool_for_reorg(
225        &self,
226        detached_blocks: VecDeque<BlockView>,
227        attached_blocks: VecDeque<BlockView>,
228        detached_proposal_id: HashSet<ProposalShortId>,
229        snapshot: Arc<Snapshot>,
230    ) -> Result<(), AnyError> {
231        let notify = Notify::new((
232            detached_blocks,
233            attached_blocks,
234            detached_proposal_id,
235            snapshot,
236        ));
237        self.reorg_sender.try_send(notify).map_err(|e| {
238            let (_m, e) = handle_try_send_error(e);
239            e.into()
240        })
241    }
242
243    /// Submit local tx to tx-pool
244    pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
245        send_message!(self, SubmitLocalTx, tx)
246    }
247
248    /// test if a tx can be accepted by tx-pool
249    /// Won't be broadcasted to network
250    /// won't be insert to tx-pool
251    pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
252        send_message!(self, TestAcceptTx, tx)
253    }
254
255    /// Remove tx from tx-pool
256    pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
257        send_message!(self, RemoveLocalTx, tx_hash)
258    }
259
260    /// Submit remote tx with declared cycles and origin to tx-pool
261    pub async fn submit_remote_tx(
262        &self,
263        tx: TransactionView,
264        declared_cycles: Cycle,
265        peer: PeerIndex,
266    ) -> Result<(), AnyError> {
267        send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
268    }
269
270    /// Receive txs from network, try to add txs to tx-pool
271    pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
272        send_notify!(self, NotifyTxs, txs)
273    }
274
275    /// Return tx-pool information
276    pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
277        send_message!(self, GetTxPoolInfo, ())
278    }
279
280    /// Return tx-pool information
281    pub fn get_live_cell(
282        &self,
283        out_point: OutPoint,
284        with_data: bool,
285    ) -> Result<CellStatus, AnyError> {
286        send_message!(self, GetLiveCell, (out_point, with_data))
287    }
288
289    /// Return fresh proposals
290    pub fn fresh_proposals_filter(
291        &self,
292        proposals: Vec<ProposalShortId>,
293    ) -> Result<Vec<ProposalShortId>, AnyError> {
294        send_message!(self, FreshProposalsFilter, proposals)
295    }
296
297    /// Return tx_status for rpc (get_transaction verbosity = 1)
298    pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
299        send_message!(self, GetTxStatus, hash)
300    }
301
302    /// Return transaction_with_status for rpc (get_transaction verbosity = 2)
303    pub fn get_transaction_with_status(
304        &self,
305        hash: Byte32,
306    ) -> Result<GetTransactionWithStatusResult, AnyError> {
307        send_message!(self, GetTransactionWithStatus, hash)
308    }
309
310    /// Mainly used for compact block reconstruction and block proposal pre-broadcasting
311    /// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
312    pub fn fetch_txs(
313        &self,
314        short_ids: HashSet<ProposalShortId>,
315    ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
316        send_message!(self, FetchTxs, short_ids)
317    }
318
319    /// Return txs with cycles
320    /// Mainly for relay transactions
321    pub fn fetch_txs_with_cycles(
322        &self,
323        short_ids: HashSet<ProposalShortId>,
324    ) -> Result<FetchTxsWithCyclesResult, AnyError> {
325        send_message!(self, FetchTxsWithCycles, short_ids)
326    }
327
328    /// Clears the tx-pool, removing all txs, update snapshot.
329    pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
330        send_message!(self, ClearPool, new_snapshot)
331    }
332
333    /// Clears the tx-verify-queue.
334    pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
335        send_message!(self, ClearVerifyQueue, ())
336    }
337
338    /// TODO(doc): @zhangsoledad
339    pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
340        send_message!(self, GetAllEntryInfo, ())
341    }
342
343    /// TODO(doc): @zhangsoledad
344    pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
345        send_message!(self, GetAllIds, ())
346    }
347
348    /// query the details of a transaction in the pool
349    pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
350        send_message!(self, GetPoolTxDetails, tx_hash)
351    }
352
353    /// Saves tx pool into disk.
354    pub fn save_pool(&self) -> Result<(), AnyError> {
355        info!("Please be patient, tx-pool are saving data into disk ...");
356        send_message!(self, SavePool, ())
357    }
358
359    /// Updates IBD state.
360    pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
361        send_message!(self, UpdateIBDState, in_ibd)
362    }
363
364    /// Estimates fee rate.
365    pub fn estimate_fee_rate(
366        &self,
367        estimate_mode: EstimateMode,
368        enable_fallback: bool,
369    ) -> Result<FeeEstimatesResult, AnyError> {
370        send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
371    }
372
373    /// Sends suspend chunk process cmd
374    pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
375        //debug!("[verify-test] run suspend_chunk_process");
376        self.chunk_tx
377            .send(ChunkCommand::Suspend)
378            .map_err(handle_send_cmd_error)
379            .map_err(Into::into)
380    }
381
382    /// Sends continue chunk process cmd
383    pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
384        //debug!("[verify-test] run continue_chunk_process");
385        self.chunk_tx
386            .send(ChunkCommand::Resume)
387            .map_err(handle_send_cmd_error)
388            .map_err(Into::into)
389    }
390
391    /// Load persisted txs into pool, assume that all txs are sorted
392    fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
393        if !txs.is_empty() {
394            info!("Loading persistent tx-pool data, total {} txs", txs.len());
395            let mut failed_txs = 0;
396            for tx in txs {
397                if self.submit_local_tx(tx)?.is_err() {
398                    failed_txs += 1;
399                }
400            }
401            if failed_txs == 0 {
402                info!("Persistent tx-pool data is loaded");
403            } else {
404                info!(
405                    "Persistent tx-pool data is loaded, {} stale txs are ignored",
406                    failed_txs
407                );
408            }
409        }
410        Ok(())
411    }
412
413    /// Plug tx-pool entry to tx-pool, skip verification. only for test
414    #[cfg(feature = "internal")]
415    pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
416        send_message!(self, PlugEntry, (entries, target))
417    }
418
419    /// Package txs with specified bytes_limit. for test
420    #[cfg(feature = "internal")]
421    pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
422        send_message!(self, PackageTxs, bytes_limit)
423    }
424
425    /// Submit local test tx to tx-pool, this tx will be put into verify queue directly.
426    pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
427        send_message!(self, SubmitLocalTestTx, tx)
428    }
429}
430
431/// A builder used to create TxPoolService.
432pub struct TxPoolServiceBuilder {
433    pub(crate) tx_pool_config: TxPoolConfig,
434    pub(crate) tx_pool_controller: TxPoolController,
435    pub(crate) snapshot: Arc<Snapshot>,
436    pub(crate) block_assembler: Option<BlockAssembler>,
437    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
438    pub(crate) callbacks: Callbacks,
439    pub(crate) receiver: mpsc::Receiver<Message>,
440    pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
441    pub(crate) signal_receiver: CancellationToken,
442    pub(crate) handle: Handle,
443    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
444    pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
445    pub(crate) started: Arc<AtomicBool>,
446    pub(crate) block_assembler_channel: (
447        mpsc::Sender<BlockAssemblerMessage>,
448        mpsc::Receiver<BlockAssemblerMessage>,
449    ),
450    pub(crate) fee_estimator: FeeEstimator,
451}
452
453impl TxPoolServiceBuilder {
454    /// Creates a new TxPoolServiceBuilder.
455    pub fn new(
456        tx_pool_config: TxPoolConfig,
457        snapshot: Arc<Snapshot>,
458        block_assembler_config: Option<BlockAssemblerConfig>,
459        txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
460        handle: &Handle,
461        tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
462        fee_estimator: FeeEstimator,
463    ) -> (TxPoolServiceBuilder, TxPoolController) {
464        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
465        let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
466        let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
467        let signal_receiver: CancellationToken = new_tokio_exit_rx();
468        let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
469        let started = Arc::new(AtomicBool::new(false));
470
471        let controller = TxPoolController {
472            sender,
473            reorg_sender,
474            handle: handle.clone(),
475            chunk_tx: Arc::new(chunk_tx),
476            started: Arc::clone(&started),
477        };
478
479        let block_assembler =
480            block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
481        let builder = TxPoolServiceBuilder {
482            tx_pool_config,
483            tx_pool_controller: controller.clone(),
484            snapshot,
485            block_assembler,
486            txs_verify_cache,
487            callbacks: Callbacks::new(),
488            receiver,
489            reorg_receiver,
490            signal_receiver,
491            handle: handle.clone(),
492            tx_relay_sender,
493            chunk_rx,
494            started,
495            block_assembler_channel,
496            fee_estimator,
497        };
498
499        (builder, controller)
500    }
501
502    /// Register new pending callback
503    pub fn register_pending(&mut self, callback: PendingCallback) {
504        self.callbacks.register_pending(callback);
505    }
506
507    /// Return cloned tx relayer sender
508    pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
509        self.tx_relay_sender.clone()
510    }
511
512    /// Register new proposed callback
513    pub fn register_proposed(&mut self, callback: ProposedCallback) {
514        self.callbacks.register_proposed(callback);
515    }
516
517    /// Register new abandon callback
518    pub fn register_reject(&mut self, callback: RejectCallback) {
519        self.callbacks.register_reject(callback);
520    }
521
522    /// Start a background thread tx-pool service by taking ownership of the Builder, and returns a TxPoolController.
523    pub fn start(self, network: NetworkController) {
524        let consensus = self.snapshot.cloned_consensus();
525        let after_delay_window = after_delay_window(&self.snapshot);
526
527        let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
528            self.tx_pool_config.max_tx_verify_cycles,
529        )));
530
531        let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
532        let txs = match tx_pool.load_from_file() {
533            Ok(txs) => txs,
534            Err(e) => {
535                error!("{}", e.to_string());
536                error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
537                Vec::new()
538            }
539        };
540
541        let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
542        let service = TxPoolService {
543            tx_pool_config: Arc::new(tx_pool.config.clone()),
544            tx_pool: Arc::new(RwLock::new(tx_pool)),
545            orphan: Arc::new(RwLock::new(OrphanPool::new())),
546            block_assembler: self.block_assembler,
547            txs_verify_cache: self.txs_verify_cache,
548            callbacks: Arc::new(self.callbacks),
549            tx_relay_sender: self.tx_relay_sender,
550            block_assembler_sender,
551            verify_queue: Arc::clone(&verify_queue),
552            network,
553            consensus,
554            delay: Arc::new(RwLock::new(LinkedHashMap::new())),
555            after_delay: Arc::new(AtomicBool::new(after_delay_window)),
556            fee_estimator: self.fee_estimator,
557        };
558
559        let mut verify_mgr =
560            VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
561        self.handle.spawn(async move { verify_mgr.run().await });
562
563        let mut receiver = self.receiver;
564        let mut reorg_receiver = self.reorg_receiver;
565        let handle_clone = self.handle.clone();
566
567        let process_service = service.clone();
568        let signal_receiver = self.signal_receiver.clone();
569        self.handle.spawn(async move {
570            loop {
571                tokio::select! {
572                    Some(message) = receiver.recv() => {
573                        let service_clone = process_service.clone();
574                        handle_clone.spawn(process(service_clone, message));
575                    },
576                    _ = signal_receiver.cancelled() => {
577                        info!("TxPool is saving, please wait...");
578                        process_service.save_pool().await;
579                        info!("TxPool process_service exit now");
580                        break
581                    },
582                    else => break,
583                }
584            }
585        });
586
587        let process_service = service.clone();
588        if let Some(ref block_assembler) = service.block_assembler {
589            let signal_receiver = self.signal_receiver.clone();
590            let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
591            if interval.is_zero() {
592                // block_assembler.update_interval_millis set zero interval should only be used for tests,
593                // external notification will be disabled.
594                ckb_logger::warn!(
595                    "block_assembler.update_interval_millis set to zero interval. \
596                    This should only be used for tests, as external notification will be disabled."
597                );
598                self.handle.spawn(async move {
599                    loop {
600                        tokio::select! {
601                            Some(message) = block_assembler_receiver.recv() => {
602                                let service_clone = process_service.clone();
603                                block_assembler::process(service_clone, &message).await;
604                            },
605                            _ = signal_receiver.cancelled() => {
606                                info!("TxPool block_assembler process service received exit signal, exit now");
607                                break
608                            },
609                            else => break,
610                        }
611                    }
612                });
613            } else {
614                self.handle.spawn(async move {
615                    let mut interval = tokio::time::interval(interval);
616                    let mut queue = LinkedHashSet::new();
617                    loop {
618                        tokio::select! {
619                            Some(message) = block_assembler_receiver.recv() => {
620                                if let BlockAssemblerMessage::Reset(..) = message {
621                                    let service_clone = process_service.clone();
622                                    queue.clear();
623                                    block_assembler::process(service_clone, &message).await;
624                                } else {
625                                    queue.insert(message);
626                                }
627                            },
628                            _ = interval.tick() => {
629                                for message in &queue {
630                                    let service_clone = process_service.clone();
631                                    block_assembler::process(service_clone, message).await;
632                                }
633                                if !queue.is_empty() {
634                                    if let Some(ref block_assembler) = process_service.block_assembler {
635                                        block_assembler.notify().await;
636                                    }
637                                }
638                                queue.clear();
639                            }
640                            _ = signal_receiver.cancelled() => {
641                                info!("TxPool block_assembler process service received exit signal, exit now");
642                                break
643                            },
644                            else => break,
645                        }
646                    }
647                });
648            }
649        }
650
651        let signal_receiver = self.signal_receiver;
652        self.handle.spawn(async move {
653            loop {
654                tokio::select! {
655                    Some(message) = reorg_receiver.recv() => {
656                        let Notify {
657                            arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
658                        } = message;
659                        let snapshot_clone = Arc::clone(&snapshot);
660                        let detached_blocks_clone = detached_blocks.clone();
661                        service.update_block_assembler_before_tx_pool_reorg(
662                            detached_blocks_clone,
663                            snapshot_clone
664                        ).await;
665
666                        let snapshot_clone = Arc::clone(&snapshot);
667                        service
668                        .update_tx_pool_for_reorg(
669                            detached_blocks,
670                            attached_blocks,
671                            detached_proposal_id,
672                            snapshot_clone,
673                        )
674                        .await;
675
676                        service.update_block_assembler_after_tx_pool_reorg().await;
677                    },
678                    _ = signal_receiver.cancelled() => {
679                        info!("TxPool reorg process service received exit signal, exit now");
680                        break
681                    },
682                    else => break,
683                }
684            }
685        });
686        self.started.store(true, Ordering::Release);
687        if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
688            error!("Failed to import persistent txs, cause: {}", err);
689        }
690    }
691}
692
693#[derive(Clone)]
694pub(crate) struct TxPoolService {
695    pub(crate) tx_pool: Arc<RwLock<TxPool>>,
696    pub(crate) orphan: Arc<RwLock<OrphanPool>>,
697    pub(crate) consensus: Arc<Consensus>,
698    pub(crate) tx_pool_config: Arc<TxPoolConfig>,
699    pub(crate) block_assembler: Option<BlockAssembler>,
700    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
701    pub(crate) callbacks: Arc<Callbacks>,
702    pub(crate) network: NetworkController,
703    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
704    pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
705    pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
706    pub(crate) delay: Arc<RwLock<LinkedHashMap<ProposalShortId, TransactionView>>>,
707    pub(crate) after_delay: Arc<AtomicBool>,
708    pub(crate) fee_estimator: FeeEstimator,
709}
710
711/// tx verification result
712pub enum TxVerificationResult {
713    /// tx is verified
714    Ok {
715        /// original peer
716        original_peer: Option<PeerIndex>,
717        /// verified by ckb vm version
718        with_vm_2023: bool,
719        /// transaction hash
720        tx_hash: Byte32,
721    },
722    /// tx parent is unknown
723    UnknownParents {
724        /// original peer
725        peer: PeerIndex,
726        /// parents hashes
727        parents: HashSet<Byte32>,
728    },
729    /// tx is rejected
730    Reject {
731        /// transaction hash
732        tx_hash: Byte32,
733    },
734}
735
736#[allow(clippy::cognitive_complexity)]
737async fn process(mut service: TxPoolService, message: Message) {
738    match message {
739        Message::GetTxPoolInfo(Request { responder, .. }) => {
740            let info = service.info().await;
741            if let Err(e) = responder.send(info) {
742                error!("Responder sending get_tx_pool_info failed {:?}", e);
743            };
744        }
745        Message::GetLiveCell(Request {
746            responder,
747            arguments: (out_point, with_data),
748        }) => {
749            let live_cell_status = service.get_live_cell(out_point, with_data).await;
750            if let Err(e) = responder.send(live_cell_status) {
751                error!("Responder sending get_live_cell failed {:?}", e);
752            };
753        }
754        Message::BlockTemplate(Request {
755            responder,
756            arguments: (_bytes_limit, _proposals_limit, _max_version),
757        }) => {
758            let block_template_result = service.get_block_template().await;
759            if let Err(e) = responder.send(block_template_result) {
760                error!("Responder sending block_template_result failed {:?}", e);
761            };
762        }
763        Message::SubmitLocalTx(Request {
764            responder,
765            arguments: tx,
766        }) => {
767            let result = service.process_tx(tx, None).await.map(|_| ());
768            if let Err(e) = responder.send(result) {
769                error!("Responder sending submit_tx result failed {:?}", e);
770            };
771        }
772        Message::SubmitLocalTestTx(Request {
773            responder,
774            arguments: tx,
775        }) => {
776            let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
777            if let Err(e) = responder.send(result) {
778                error!("Responder sending submit_tx result failed {:?}", e);
779            };
780        }
781        Message::RemoveLocalTx(Request {
782            responder,
783            arguments: tx_hash,
784        }) => {
785            let result = service.remove_tx(tx_hash).await;
786            if let Err(e) = responder.send(result) {
787                error!("Responder sending remove_tx result failed {:?}", e);
788            };
789        }
790        Message::TestAcceptTx(Request {
791            responder,
792            arguments: tx,
793        }) => {
794            let result = service.test_accept_tx(tx).await;
795            if let Err(e) = responder.send(result.map(|r| r.into())) {
796                error!("Responder sending test_accept_tx result failed {:?}", e);
797            };
798        }
799        Message::SubmitRemoteTx(Request {
800            responder,
801            arguments: (tx, declared_cycles, peer),
802        }) => {
803            let _result = service
804                .resumeble_process_tx(tx, Some((declared_cycles, peer)))
805                .await;
806            if let Err(e) = responder.send(()) {
807                error!("Responder sending submit_tx result failed {:?}", e);
808            };
809        }
810        Message::NotifyTxs(Notify { arguments: txs }) => {
811            for tx in txs {
812                let _ret = service.resumeble_process_tx(tx, None).await;
813            }
814        }
815        Message::FreshProposalsFilter(Request {
816            responder,
817            arguments: mut proposals,
818        }) => {
819            let tx_pool = service.tx_pool.read().await;
820            proposals.retain(|id| !tx_pool.contains_proposal_id(id));
821            if let Err(e) = responder.send(proposals) {
822                error!("Responder sending fresh_proposals_filter failed {:?}", e);
823            };
824        }
825        Message::GetTxStatus(Request {
826            responder,
827            arguments: hash,
828        }) => {
829            let id = ProposalShortId::from_tx_hash(&hash);
830            let tx_pool = service.tx_pool.read().await;
831            let ret = if let Some(PoolEntry {
832                status,
833                inner: entry,
834                ..
835            }) = tx_pool.pool_map.get_by_id(&id)
836            {
837                let status = if status == &Status::Proposed {
838                    TxStatus::Proposed
839                } else {
840                    TxStatus::Pending
841                };
842                Ok((status, Some(entry.cycles)))
843            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
844                let recent_reject_result = recent_reject_db.get(&hash);
845                if let Ok(recent_reject) = recent_reject_result {
846                    if let Some(record) = recent_reject {
847                        Ok((TxStatus::Rejected(record), None))
848                    } else {
849                        Ok((TxStatus::Unknown, None))
850                    }
851                } else {
852                    Err(recent_reject_result.unwrap_err())
853                }
854            } else {
855                Ok((TxStatus::Unknown, None))
856            };
857
858            if let Err(e) = responder.send(ret) {
859                error!("Responder sending get_tx_status failed {:?}", e)
860            };
861        }
862        Message::GetTransactionWithStatus(Request {
863            responder,
864            arguments: hash,
865        }) => {
866            let id = ProposalShortId::from_tx_hash(&hash);
867            let tx_pool = service.tx_pool.read().await;
868            let ret = if let Some(PoolEntry {
869                status,
870                inner: entry,
871                ..
872            }) = tx_pool.pool_map.get_by_id(&id)
873            {
874                let (tx_status, min_replace_fee) = if status == &Status::Proposed {
875                    (TxStatus::Proposed, None)
876                } else {
877                    (TxStatus::Pending, tx_pool.min_replace_fee(entry))
878                };
879                Ok(TransactionWithStatus::with_status(
880                    Some(entry.transaction().clone()),
881                    entry.cycles,
882                    entry.timestamp,
883                    tx_status,
884                    Some(entry.fee),
885                    min_replace_fee,
886                ))
887            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
888                match recent_reject_db.get(&hash) {
889                    Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
890                    Ok(_) => Ok(TransactionWithStatus::with_unknown()),
891                    Err(err) => Err(err),
892                }
893            } else {
894                Ok(TransactionWithStatus::with_unknown())
895            };
896
897            if let Err(e) = responder.send(ret) {
898                error!("Responder sending get_tx_status failed {:?}", e)
899            };
900        }
901        Message::FetchTxs(Request {
902            responder,
903            arguments: short_ids,
904        }) => {
905            let tx_pool = service.tx_pool.read().await;
906            let orphan = service.orphan.read().await;
907            let txs = short_ids
908                .into_iter()
909                .filter_map(|short_id| {
910                    tx_pool
911                        .get_tx_from_pool_or_store(&short_id)
912                        .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
913                        .map(|tx| (short_id, tx))
914                })
915                .collect();
916            if let Err(e) = responder.send(txs) {
917                error!("Responder sending fetch_txs failed {:?}", e);
918            };
919        }
920        Message::FetchTxsWithCycles(Request {
921            responder,
922            arguments: short_ids,
923        }) => {
924            let tx_pool = service.tx_pool.read().await;
925            let txs = short_ids
926                .into_iter()
927                .filter_map(|short_id| {
928                    tx_pool
929                        .get_tx_with_cycles(&short_id)
930                        .map(|(tx, cycles)| (short_id, (tx, cycles)))
931                })
932                .collect();
933            if let Err(e) = responder.send(txs) {
934                error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
935            };
936        }
937        Message::NewUncle(Notify { arguments: uncle }) => {
938            service.receive_candidate_uncle(uncle).await;
939        }
940        Message::ClearPool(Request {
941            responder,
942            arguments: new_snapshot,
943        }) => {
944            service.clear_pool(new_snapshot).await;
945            if let Err(e) = responder.send(()) {
946                error!("Responder sending clear_pool failed {:?}", e)
947            };
948        }
949        Message::ClearVerifyQueue(Request { responder, .. }) => {
950            service.verify_queue.write().await.clear();
951            if let Err(e) = responder.send(()) {
952                error!("Responder sending clear_verify_queue failed {:?}", e)
953            };
954        }
955        Message::GetPoolTxDetails(Request {
956            responder,
957            arguments: tx_hash,
958        }) => {
959            let tx_pool = service.tx_pool.read().await;
960            let id = ProposalShortId::from_tx_hash(&tx_hash);
961            let tx_details = tx_pool
962                .get_tx_detail(&id)
963                .unwrap_or(PoolTxDetailInfo::with_unknown());
964            if let Err(e) = responder.send(tx_details) {
965                error!("responder send get_pool_tx_details failed {:?}", e)
966            };
967        }
968        Message::GetAllEntryInfo(Request { responder, .. }) => {
969            let tx_pool = service.tx_pool.read().await;
970            let info = tx_pool.get_all_entry_info();
971            if let Err(e) = responder.send(info) {
972                error!("Responder sending get_all_entry_info failed {:?}", e)
973            };
974        }
975        Message::GetAllIds(Request { responder, .. }) => {
976            let tx_pool = service.tx_pool.read().await;
977            let ids = tx_pool.get_ids();
978            if let Err(e) = responder.send(ids) {
979                error!("Responder sending get_ids failed {:?}", e)
980            };
981        }
982        Message::SavePool(Request { responder, .. }) => {
983            service.save_pool().await;
984            if let Err(e) = responder.send(()) {
985                error!("Responder sending save_pool failed {:?}", e)
986            };
987        }
988        Message::UpdateIBDState(Request {
989            responder,
990            arguments: in_ibd,
991        }) => {
992            service.update_ibd_state(in_ibd).await;
993            if let Err(e) = responder.send(()) {
994                error!("Responder sending update_ibd_state failed {:?}", e)
995            };
996        }
997        Message::EstimateFeeRate(Request {
998            responder,
999            arguments: (estimate_mode, enable_fallback),
1000        }) => {
1001            let fee_estimates_result = service
1002                .estimate_fee_rate(estimate_mode, enable_fallback)
1003                .await;
1004            if let Err(e) = responder.send(fee_estimates_result) {
1005                error!("Responder sending fee_estimates_result failed {:?}", e)
1006            };
1007        }
1008        #[cfg(feature = "internal")]
1009        Message::PlugEntry(Request {
1010            responder,
1011            arguments: (entries, target),
1012        }) => {
1013            service.plug_entry(entries, target).await;
1014
1015            if let Err(e) = responder.send(()) {
1016                error!("Responder sending plug_entry failed {:?}", e);
1017            };
1018        }
1019        #[cfg(feature = "internal")]
1020        Message::PackageTxs(Request {
1021            responder,
1022            arguments: bytes_limit,
1023        }) => {
1024            let max_block_cycles = service.consensus.max_block_cycles();
1025            let max_block_bytes = service.consensus.max_block_bytes();
1026            let tx_pool = service.tx_pool.read().await;
1027            let (txs, _size, _cycles) = tx_pool.package_txs(
1028                max_block_cycles,
1029                bytes_limit.unwrap_or(max_block_bytes) as usize,
1030            );
1031            if let Err(e) = responder.send(txs) {
1032                error!("Responder sending plug_entry failed {:?}", e);
1033            };
1034        }
1035    }
1036}
1037
1038impl TxPoolService {
1039    /// Tx-pool information
1040    async fn info(&self) -> TxPoolInfo {
1041        let tx_pool = self.tx_pool.read().await;
1042        let orphan = self.orphan.read().await;
1043        let verify_queue = self.verify_queue.read().await;
1044        let tip_header = tx_pool.snapshot.tip_header();
1045        TxPoolInfo {
1046            tip_hash: tip_header.hash(),
1047            tip_number: tip_header.number(),
1048            pending_size: tx_pool.pool_map.pending_size(),
1049            proposed_size: tx_pool.pool_map.proposed_size(),
1050            orphan_size: orphan.len(),
1051            total_tx_size: tx_pool.pool_map.total_tx_size,
1052            total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1053            min_fee_rate: self.tx_pool_config.min_fee_rate,
1054            min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1055            last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1056            tx_size_limit: TRANSACTION_SIZE_LIMIT,
1057            max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1058            verify_queue_size: verify_queue.len(),
1059        }
1060    }
1061
1062    /// Get Live Cell Status
1063    async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1064        let tx_pool = self.tx_pool.read().await;
1065        let snapshot = tx_pool.snapshot();
1066        let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1067        let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1068
1069        match provider.cell(&out_point, false) {
1070            CellStatus::Live(mut cell_meta) => {
1071                if eager_load {
1072                    if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1073                        cell_meta.mem_cell_data = Some(data);
1074                        cell_meta.mem_cell_data_hash = Some(data_hash);
1075                    }
1076                }
1077                CellStatus::live_cell(cell_meta)
1078            }
1079            _ => CellStatus::Unknown,
1080        }
1081    }
1082
1083    pub fn should_notify_block_assembler(&self) -> bool {
1084        self.block_assembler.is_some()
1085    }
1086
1087    pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1088        if let Some(ref block_assembler) = self.block_assembler {
1089            {
1090                block_assembler.candidate_uncles.lock().await.insert(uncle);
1091            }
1092            if self
1093                .block_assembler_sender
1094                .send(BlockAssemblerMessage::Uncle)
1095                .await
1096                .is_err()
1097            {
1098                error!("block_assembler receiver dropped");
1099            }
1100        }
1101    }
1102
1103    pub async fn update_block_assembler_before_tx_pool_reorg(
1104        &self,
1105        detached_blocks: VecDeque<BlockView>,
1106        snapshot: Arc<Snapshot>,
1107    ) {
1108        if let Some(ref block_assembler) = self.block_assembler {
1109            {
1110                let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1111                for detached_block in detached_blocks {
1112                    candidate_uncles.insert(detached_block.as_uncle());
1113                }
1114            }
1115
1116            if let Err(e) = block_assembler.update_blank(snapshot).await {
1117                error!("block_assembler update_blank error {}", e);
1118            }
1119            block_assembler.notify().await;
1120        }
1121    }
1122
1123    pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1124        if let Some(ref block_assembler) = self.block_assembler {
1125            if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1126                error!("block_assembler update failed {:?}", e);
1127            }
1128            block_assembler.notify().await;
1129        }
1130    }
1131
1132    #[cfg(feature = "internal")]
1133    pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1134        {
1135            let mut tx_pool = self.tx_pool.write().await;
1136            match target {
1137                PlugTarget::Pending => {
1138                    for entry in entries {
1139                        tx_pool
1140                            .add_pending(entry)
1141                            .expect("Plug entry add_pending error");
1142                    }
1143                }
1144                PlugTarget::Proposed => {
1145                    for entry in entries {
1146                        tx_pool
1147                            .add_proposed(entry)
1148                            .expect("Plug entry add_proposed error");
1149                    }
1150                }
1151            };
1152        }
1153
1154        if self.should_notify_block_assembler() {
1155            let msg = match target {
1156                PlugTarget::Pending => BlockAssemblerMessage::Pending,
1157                PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1158            };
1159            if self.block_assembler_sender.send(msg).await.is_err() {
1160                error!("block_assembler receiver dropped");
1161            }
1162        }
1163    }
1164
1165    pub fn after_delay(&self) -> bool {
1166        self.after_delay.load(Ordering::Acquire)
1167    }
1168
1169    pub fn set_after_delay_true(&self) {
1170        self.after_delay.store(true, Ordering::Release);
1171    }
1172}