ckb_tx_pool/
service.rs

1//! Tx-pool background service
2
3use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26    core::{
27        BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28        cell::{CellProvider, CellStatus, OverlayCellProvider},
29        tx_pool::{
30            EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31            TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32        },
33    },
34    packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40    Arc,
41    atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57    pub responder: oneshot::Sender<R>,
58    pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62    pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63        Request {
64            responder,
65            arguments,
66        }
67    }
68}
69
70pub(crate) struct Notify<A> {
71    pub arguments: A,
72}
73
74impl<A> Notify<A> {
75    pub(crate) fn new(arguments: A) -> Notify<A> {
76        Notify { arguments }
77    }
78}
79
80pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
81type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
82
83pub(crate) type SubmitTxResult = Result<(), Reject>;
84
85pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
86
87type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
88type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
89type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
90
91pub(crate) type ChainReorgArgs = (
92    VecDeque<BlockView>,
93    VecDeque<BlockView>,
94    HashSet<ProposalShortId>,
95    Arc<Snapshot>,
96);
97
98pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
99
100pub(crate) enum Message {
101    BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
102    SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
103    RemoveLocalTx(Request<Byte32, bool>),
104    TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
105    SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
106    NotifyTxs(Notify<Vec<TransactionView>>),
107    FreshProposalsFilter(Request<Vec<ProposalShortId>, Vec<ProposalShortId>>),
108    FetchTxs(Request<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
109    FetchTxsWithCycles(Request<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
110    GetTxPoolInfo(Request<(), TxPoolInfo>),
111    GetLiveCell(Request<(OutPoint, bool), CellStatus>),
112    GetTxStatus(Request<Byte32, GetTxStatusResult>),
113    GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
114    NewUncle(Notify<UncleBlockView>),
115    ClearPool(Request<Arc<Snapshot>, ()>),
116    ClearVerifyQueue(Request<(), ()>),
117    GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
118    GetAllIds(Request<(), TxPoolIds>),
119    SavePool(Request<(), ()>),
120    GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
121
122    UpdateIBDState(Request<bool, ()>),
123    EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
124
125    // test
126    #[cfg(feature = "internal")]
127    PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
128    #[cfg(feature = "internal")]
129    PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
130    SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
131}
132
133#[derive(Debug, Hash, Eq, PartialEq)]
134pub(crate) enum BlockAssemblerMessage {
135    Pending,
136    Proposed,
137    Uncle,
138    Reset(Arc<Snapshot>),
139}
140
141/// Controller to the tx-pool service.
142///
143/// The Controller is internally reference-counted and can be freely cloned. A Controller can be obtained when tx-pool service construct.
144#[derive(Clone)]
145pub struct TxPoolController {
146    sender: mpsc::Sender<Message>,
147    reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
148    chunk_tx: Arc<watch::Sender<ChunkCommand>>,
149    handle: Handle,
150    started: Arc<AtomicBool>,
151}
152
153macro_rules! send_message {
154    ($self:ident, $msg_type:ident, $args:expr) => {{
155        let (responder, response) = oneshot::channel();
156        let request = Request::call($args, responder);
157        $self
158            .sender
159            .try_send(Message::$msg_type(request))
160            .map_err(|e| {
161                let (_m, e) = handle_try_send_error(e);
162                e
163            })?;
164        block_in_place(|| response.recv())
165            .map_err(handle_recv_error)
166            .map_err(Into::into)
167    }};
168}
169
170macro_rules! send_notify {
171    ($self:ident, $msg_type:ident, $args:expr) => {{
172        let notify = Notify::new($args);
173        $self
174            .sender
175            .try_send(Message::$msg_type(notify))
176            .map_err(|e| {
177                let (_m, e) = handle_try_send_error(e);
178                e.into()
179            })
180    }};
181}
182
183impl TxPoolController {
184    /// Return whether tx-pool service is started
185    pub fn service_started(&self) -> bool {
186        self.started.load(Ordering::Acquire)
187    }
188
189    /// Set tx-pool service started, should only used for test
190    #[cfg(feature = "internal")]
191    pub fn set_service_started(&self, v: bool) {
192        self.started.store(v, Ordering::Release);
193    }
194
195    /// Return reference of tokio runtime handle
196    pub fn handle(&self) -> &Handle {
197        &self.handle
198    }
199
200    /// Generate and return block_template
201    pub fn get_block_template(
202        &self,
203        bytes_limit: Option<u64>,
204        proposals_limit: Option<u64>,
205        max_version: Option<Version>,
206    ) -> Result<BlockTemplateResult, AnyError> {
207        send_message!(
208            self,
209            BlockTemplate,
210            (bytes_limit, proposals_limit, max_version)
211        )
212    }
213
214    /// Notify new uncle
215    pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
216        send_notify!(self, NewUncle, uncle)
217    }
218
219    /// Make tx-pool consistent after a reorg, by re-adding or recursively erasing
220    /// detached block transactions from the tx-pool, and also removing any
221    /// other transactions from the tx-pool that are no longer valid given the new
222    /// tip/height.
223    pub fn update_tx_pool_for_reorg(
224        &self,
225        detached_blocks: VecDeque<BlockView>,
226        attached_blocks: VecDeque<BlockView>,
227        detached_proposal_id: HashSet<ProposalShortId>,
228        snapshot: Arc<Snapshot>,
229    ) -> Result<(), AnyError> {
230        let notify = Notify::new((
231            detached_blocks,
232            attached_blocks,
233            detached_proposal_id,
234            snapshot,
235        ));
236        self.reorg_sender.try_send(notify).map_err(|e| {
237            let (_m, e) = handle_try_send_error(e);
238            e.into()
239        })
240    }
241
242    /// Submit local tx to tx-pool
243    pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
244        send_message!(self, SubmitLocalTx, tx)
245    }
246
247    /// test if a tx can be accepted by tx-pool
248    /// Won't be broadcasted to network
249    /// won't be insert to tx-pool
250    pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
251        send_message!(self, TestAcceptTx, tx)
252    }
253
254    /// Remove tx from tx-pool
255    pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
256        send_message!(self, RemoveLocalTx, tx_hash)
257    }
258
259    /// Submit remote tx with declared cycles and origin to tx-pool
260    pub async fn submit_remote_tx(
261        &self,
262        tx: TransactionView,
263        declared_cycles: Cycle,
264        peer: PeerIndex,
265    ) -> Result<(), AnyError> {
266        send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
267    }
268
269    /// Receive txs from network, try to add txs to tx-pool
270    pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
271        send_notify!(self, NotifyTxs, txs)
272    }
273
274    /// Return tx-pool information
275    pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
276        send_message!(self, GetTxPoolInfo, ())
277    }
278
279    /// Return tx-pool information
280    pub fn get_live_cell(
281        &self,
282        out_point: OutPoint,
283        with_data: bool,
284    ) -> Result<CellStatus, AnyError> {
285        send_message!(self, GetLiveCell, (out_point, with_data))
286    }
287
288    /// Return fresh proposals
289    pub fn fresh_proposals_filter(
290        &self,
291        proposals: Vec<ProposalShortId>,
292    ) -> Result<Vec<ProposalShortId>, AnyError> {
293        send_message!(self, FreshProposalsFilter, proposals)
294    }
295
296    /// Return tx_status for rpc (get_transaction verbosity = 1)
297    pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
298        send_message!(self, GetTxStatus, hash)
299    }
300
301    /// Return transaction_with_status for rpc (get_transaction verbosity = 2)
302    pub fn get_transaction_with_status(
303        &self,
304        hash: Byte32,
305    ) -> Result<GetTransactionWithStatusResult, AnyError> {
306        send_message!(self, GetTransactionWithStatus, hash)
307    }
308
309    /// Mainly used for compact block reconstruction and block proposal pre-broadcasting
310    /// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
311    pub fn fetch_txs(
312        &self,
313        short_ids: HashSet<ProposalShortId>,
314    ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
315        send_message!(self, FetchTxs, short_ids)
316    }
317
318    /// Return txs with cycles
319    /// Mainly for relay transactions
320    pub fn fetch_txs_with_cycles(
321        &self,
322        short_ids: HashSet<ProposalShortId>,
323    ) -> Result<FetchTxsWithCyclesResult, AnyError> {
324        send_message!(self, FetchTxsWithCycles, short_ids)
325    }
326
327    /// Clears the tx-pool, removing all txs, update snapshot.
328    pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
329        send_message!(self, ClearPool, new_snapshot)
330    }
331
332    /// Clears the tx-verify-queue.
333    pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
334        send_message!(self, ClearVerifyQueue, ())
335    }
336
337    /// TODO(doc): @zhangsoledad
338    pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
339        send_message!(self, GetAllEntryInfo, ())
340    }
341
342    /// TODO(doc): @zhangsoledad
343    pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
344        send_message!(self, GetAllIds, ())
345    }
346
347    /// query the details of a transaction in the pool
348    pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
349        send_message!(self, GetPoolTxDetails, tx_hash)
350    }
351
352    /// Saves tx pool into disk.
353    pub fn save_pool(&self) -> Result<(), AnyError> {
354        info!("Please be patient, tx-pool are saving data into disk ...");
355        send_message!(self, SavePool, ())
356    }
357
358    /// Updates IBD state.
359    pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
360        send_message!(self, UpdateIBDState, in_ibd)
361    }
362
363    /// Estimates fee rate.
364    pub fn estimate_fee_rate(
365        &self,
366        estimate_mode: EstimateMode,
367        enable_fallback: bool,
368    ) -> Result<FeeEstimatesResult, AnyError> {
369        send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
370    }
371
372    /// Sends suspend chunk process cmd
373    pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
374        //debug!("[verify-test] run suspend_chunk_process");
375        self.chunk_tx
376            .send(ChunkCommand::Suspend)
377            .map_err(handle_send_cmd_error)
378            .map_err(Into::into)
379    }
380
381    /// Sends continue chunk process cmd
382    pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
383        //debug!("[verify-test] run continue_chunk_process");
384        self.chunk_tx
385            .send(ChunkCommand::Resume)
386            .map_err(handle_send_cmd_error)
387            .map_err(Into::into)
388    }
389
390    /// Load persisted txs into pool, assume that all txs are sorted
391    fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
392        if !txs.is_empty() {
393            info!("Loading persistent tx-pool data, total {} txs", txs.len());
394            let mut failed_txs = 0;
395            for tx in txs {
396                if self.submit_local_tx(tx)?.is_err() {
397                    failed_txs += 1;
398                }
399            }
400            if failed_txs == 0 {
401                info!("Persistent tx-pool data is loaded");
402            } else {
403                info!(
404                    "Persistent tx-pool data is loaded, {} stale txs are ignored",
405                    failed_txs
406                );
407            }
408        }
409        Ok(())
410    }
411
412    /// Plug tx-pool entry to tx-pool, skip verification. only for test
413    #[cfg(feature = "internal")]
414    pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
415        send_message!(self, PlugEntry, (entries, target))
416    }
417
418    /// Package txs with specified bytes_limit. for test
419    #[cfg(feature = "internal")]
420    pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
421        send_message!(self, PackageTxs, bytes_limit)
422    }
423
424    /// Submit local test tx to tx-pool, this tx will be put into verify queue directly.
425    pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
426        send_message!(self, SubmitLocalTestTx, tx)
427    }
428}
429
430/// A builder used to create TxPoolService.
431pub struct TxPoolServiceBuilder {
432    pub(crate) tx_pool_config: TxPoolConfig,
433    pub(crate) tx_pool_controller: TxPoolController,
434    pub(crate) snapshot: Arc<Snapshot>,
435    pub(crate) block_assembler: Option<BlockAssembler>,
436    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
437    pub(crate) callbacks: Callbacks,
438    pub(crate) receiver: mpsc::Receiver<Message>,
439    pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
440    pub(crate) signal_receiver: CancellationToken,
441    pub(crate) handle: Handle,
442    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
443    pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
444    pub(crate) started: Arc<AtomicBool>,
445    pub(crate) block_assembler_channel: (
446        mpsc::Sender<BlockAssemblerMessage>,
447        mpsc::Receiver<BlockAssemblerMessage>,
448    ),
449    pub(crate) fee_estimator: FeeEstimator,
450}
451
452impl TxPoolServiceBuilder {
453    /// Creates a new TxPoolServiceBuilder.
454    pub fn new(
455        tx_pool_config: TxPoolConfig,
456        snapshot: Arc<Snapshot>,
457        block_assembler_config: Option<BlockAssemblerConfig>,
458        txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
459        handle: &Handle,
460        tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
461        fee_estimator: FeeEstimator,
462    ) -> (TxPoolServiceBuilder, TxPoolController) {
463        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
464        let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
465        let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
466        let signal_receiver: CancellationToken = new_tokio_exit_rx();
467        let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
468        let started = Arc::new(AtomicBool::new(false));
469
470        let controller = TxPoolController {
471            sender,
472            reorg_sender,
473            handle: handle.clone(),
474            chunk_tx: Arc::new(chunk_tx),
475            started: Arc::clone(&started),
476        };
477
478        let block_assembler =
479            block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
480        let builder = TxPoolServiceBuilder {
481            tx_pool_config,
482            tx_pool_controller: controller.clone(),
483            snapshot,
484            block_assembler,
485            txs_verify_cache,
486            callbacks: Callbacks::new(),
487            receiver,
488            reorg_receiver,
489            signal_receiver,
490            handle: handle.clone(),
491            tx_relay_sender,
492            chunk_rx,
493            started,
494            block_assembler_channel,
495            fee_estimator,
496        };
497
498        (builder, controller)
499    }
500
501    /// Register new pending callback
502    pub fn register_pending(&mut self, callback: PendingCallback) {
503        self.callbacks.register_pending(callback);
504    }
505
506    /// Return cloned tx relayer sender
507    pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
508        self.tx_relay_sender.clone()
509    }
510
511    /// Register new proposed callback
512    pub fn register_proposed(&mut self, callback: ProposedCallback) {
513        self.callbacks.register_proposed(callback);
514    }
515
516    /// Register new abandon callback
517    pub fn register_reject(&mut self, callback: RejectCallback) {
518        self.callbacks.register_reject(callback);
519    }
520
521    /// Start a background thread tx-pool service by taking ownership of the Builder, and returns a TxPoolController.
522    pub fn start(self, network: NetworkController) {
523        let consensus = self.snapshot.cloned_consensus();
524
525        let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
526            self.tx_pool_config.max_tx_verify_cycles,
527        )));
528
529        let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
530        let txs = match tx_pool.load_from_file() {
531            Ok(txs) => txs,
532            Err(e) => {
533                error!("{}", e.to_string());
534                error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
535                Vec::new()
536            }
537        };
538
539        let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
540        let service = TxPoolService {
541            tx_pool_config: Arc::new(tx_pool.config.clone()),
542            tx_pool: Arc::new(RwLock::new(tx_pool)),
543            orphan: Arc::new(RwLock::new(OrphanPool::new())),
544            block_assembler: self.block_assembler,
545            txs_verify_cache: self.txs_verify_cache,
546            callbacks: Arc::new(self.callbacks),
547            tx_relay_sender: self.tx_relay_sender,
548            block_assembler_sender,
549            verify_queue: Arc::clone(&verify_queue),
550            network,
551            consensus,
552            fee_estimator: self.fee_estimator,
553        };
554
555        let mut verify_mgr =
556            VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
557        self.handle.spawn(async move { verify_mgr.run().await });
558
559        let mut receiver = self.receiver;
560        let mut reorg_receiver = self.reorg_receiver;
561        let handle_clone = self.handle.clone();
562
563        let process_service = service.clone();
564        let signal_receiver = self.signal_receiver.clone();
565        self.handle.spawn(async move {
566            loop {
567                tokio::select! {
568                    Some(message) = receiver.recv() => {
569                        let service_clone = process_service.clone();
570                        handle_clone.spawn(process(service_clone, message));
571                    },
572                    _ = signal_receiver.cancelled() => {
573                        info!("TxPool is saving, please wait...");
574                        process_service.save_pool().await;
575                        info!("TxPool process_service exit now");
576                        break
577                    },
578                    else => break,
579                }
580            }
581        });
582
583        let process_service = service.clone();
584        if let Some(ref block_assembler) = service.block_assembler {
585            let signal_receiver = self.signal_receiver.clone();
586            let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
587            if interval.is_zero() {
588                // block_assembler.update_interval_millis set zero interval should only be used for tests,
589                // external notification will be disabled.
590                ckb_logger::warn!(
591                    "block_assembler.update_interval_millis set to zero interval. \
592                    This should only be used for tests, as external notification will be disabled."
593                );
594                self.handle.spawn(async move {
595                    loop {
596                        tokio::select! {
597                            Some(message) = block_assembler_receiver.recv() => {
598                                let service_clone = process_service.clone();
599                                block_assembler::process(service_clone, &message).await;
600                            },
601                            _ = signal_receiver.cancelled() => {
602                                info!("TxPool block_assembler process service received exit signal, exit now");
603                                break
604                            },
605                            else => break,
606                        }
607                    }
608                });
609            } else {
610                self.handle.spawn(async move {
611                    let mut interval = tokio::time::interval(interval);
612                    let mut queue = LinkedHashSet::new();
613                    loop {
614                        tokio::select! {
615                            Some(message) = block_assembler_receiver.recv() => {
616                                if let BlockAssemblerMessage::Reset(..) = message {
617                                    let service_clone = process_service.clone();
618                                    queue.clear();
619                                    block_assembler::process(service_clone, &message).await;
620                                } else {
621                                    queue.insert(message);
622                                }
623                            },
624                            _ = interval.tick() => {
625                                for message in &queue {
626                                    let service_clone = process_service.clone();
627                                    block_assembler::process(service_clone, message).await;
628                                }
629                                if !queue.is_empty() {
630                                    if let Some(ref block_assembler) = process_service.block_assembler {
631                                        block_assembler.notify().await;
632                                    }
633                                }
634                                queue.clear();
635                            }
636                            _ = signal_receiver.cancelled() => {
637                                info!("TxPool block_assembler process service received exit signal, exit now");
638                                break
639                            },
640                            else => break,
641                        }
642                    }
643                });
644            }
645        }
646
647        let signal_receiver = self.signal_receiver;
648        self.handle.spawn(async move {
649            loop {
650                tokio::select! {
651                    Some(message) = reorg_receiver.recv() => {
652                        let Notify {
653                            arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
654                        } = message;
655                        let snapshot_clone = Arc::clone(&snapshot);
656                        let detached_blocks_clone = detached_blocks.clone();
657                        service.update_block_assembler_before_tx_pool_reorg(
658                            detached_blocks_clone,
659                            snapshot_clone
660                        ).await;
661
662                        let snapshot_clone = Arc::clone(&snapshot);
663                        service
664                        .update_tx_pool_for_reorg(
665                            detached_blocks,
666                            attached_blocks,
667                            detached_proposal_id,
668                            snapshot_clone,
669                        )
670                        .await;
671
672                        service.update_block_assembler_after_tx_pool_reorg().await;
673                    },
674                    _ = signal_receiver.cancelled() => {
675                        info!("TxPool reorg process service received exit signal, exit now");
676                        break
677                    },
678                    else => break,
679                }
680            }
681        });
682        self.started.store(true, Ordering::Release);
683        if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
684            error!("Failed to import persistent txs, cause: {}", err);
685        }
686    }
687}
688
689#[derive(Clone)]
690pub(crate) struct TxPoolService {
691    pub(crate) tx_pool: Arc<RwLock<TxPool>>,
692    pub(crate) orphan: Arc<RwLock<OrphanPool>>,
693    pub(crate) consensus: Arc<Consensus>,
694    pub(crate) tx_pool_config: Arc<TxPoolConfig>,
695    pub(crate) block_assembler: Option<BlockAssembler>,
696    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
697    pub(crate) callbacks: Arc<Callbacks>,
698    pub(crate) network: NetworkController,
699    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
700    pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
701    pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
702    pub(crate) fee_estimator: FeeEstimator,
703}
704
705/// tx verification result
706pub enum TxVerificationResult {
707    /// tx is verified
708    Ok {
709        /// original peer
710        original_peer: Option<PeerIndex>,
711        /// transaction hash
712        tx_hash: Byte32,
713    },
714    /// tx parent is unknown
715    UnknownParents {
716        /// original peer
717        peer: PeerIndex,
718        /// parents hashes
719        parents: HashSet<Byte32>,
720    },
721    /// tx is rejected
722    Reject {
723        /// transaction hash
724        tx_hash: Byte32,
725    },
726}
727
728#[allow(clippy::cognitive_complexity)]
729async fn process(mut service: TxPoolService, message: Message) {
730    match message {
731        Message::GetTxPoolInfo(Request { responder, .. }) => {
732            let info = service.info().await;
733            if let Err(e) = responder.send(info) {
734                error!("Responder sending get_tx_pool_info failed {:?}", e);
735            };
736        }
737        Message::GetLiveCell(Request {
738            responder,
739            arguments: (out_point, with_data),
740        }) => {
741            let live_cell_status = service.get_live_cell(out_point, with_data).await;
742            if let Err(e) = responder.send(live_cell_status) {
743                error!("Responder sending get_live_cell failed {:?}", e);
744            };
745        }
746        Message::BlockTemplate(Request {
747            responder,
748            arguments: (_bytes_limit, _proposals_limit, _max_version),
749        }) => {
750            let block_template_result = service.get_block_template().await;
751            if let Err(e) = responder.send(block_template_result) {
752                error!("Responder sending block_template_result failed {:?}", e);
753            };
754        }
755        Message::SubmitLocalTx(Request {
756            responder,
757            arguments: tx,
758        }) => {
759            let result = service.process_tx(tx, None).await.map(|_| ());
760            if let Err(e) = responder.send(result) {
761                error!("Responder sending submit_tx result failed {:?}", e);
762            };
763        }
764        Message::SubmitLocalTestTx(Request {
765            responder,
766            arguments: tx,
767        }) => {
768            let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
769            if let Err(e) = responder.send(result) {
770                error!("Responder sending submit_tx result failed {:?}", e);
771            };
772        }
773        Message::RemoveLocalTx(Request {
774            responder,
775            arguments: tx_hash,
776        }) => {
777            let result = service.remove_tx(tx_hash).await;
778            if let Err(e) = responder.send(result) {
779                error!("Responder sending remove_tx result failed {:?}", e);
780            };
781        }
782        Message::TestAcceptTx(Request {
783            responder,
784            arguments: tx,
785        }) => {
786            let result = service.test_accept_tx(tx).await;
787            if let Err(e) = responder.send(result.map(|r| r.into())) {
788                error!("Responder sending test_accept_tx result failed {:?}", e);
789            };
790        }
791        Message::SubmitRemoteTx(Request {
792            responder,
793            arguments: (tx, declared_cycles, peer),
794        }) => {
795            let _result = service
796                .resumeble_process_tx(tx, Some((declared_cycles, peer)))
797                .await;
798            if let Err(e) = responder.send(()) {
799                error!("Responder sending submit_tx result failed {:?}", e);
800            };
801        }
802        Message::NotifyTxs(Notify { arguments: txs }) => {
803            for tx in txs {
804                let _ret = service.resumeble_process_tx(tx, None).await;
805            }
806        }
807        Message::FreshProposalsFilter(Request {
808            responder,
809            arguments: mut proposals,
810        }) => {
811            let tx_pool = service.tx_pool.read().await;
812            proposals.retain(|id| !tx_pool.contains_proposal_id(id));
813            if let Err(e) = responder.send(proposals) {
814                error!("Responder sending fresh_proposals_filter failed {:?}", e);
815            };
816        }
817        Message::GetTxStatus(Request {
818            responder,
819            arguments: hash,
820        }) => {
821            let id = ProposalShortId::from_tx_hash(&hash);
822            let tx_pool = service.tx_pool.read().await;
823            let ret = if let Some(PoolEntry {
824                status,
825                inner: entry,
826                ..
827            }) = tx_pool.pool_map.get_by_id(&id)
828            {
829                let status = if status == &Status::Proposed {
830                    TxStatus::Proposed
831                } else {
832                    TxStatus::Pending
833                };
834                Ok((status, Some(entry.cycles)))
835            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
836                let recent_reject_result = recent_reject_db.get(&hash);
837                if let Ok(recent_reject) = recent_reject_result {
838                    if let Some(record) = recent_reject {
839                        Ok((TxStatus::Rejected(record), None))
840                    } else {
841                        Ok((TxStatus::Unknown, None))
842                    }
843                } else {
844                    Err(recent_reject_result.unwrap_err())
845                }
846            } else {
847                Ok((TxStatus::Unknown, None))
848            };
849
850            if let Err(e) = responder.send(ret) {
851                error!("Responder sending get_tx_status failed {:?}", e)
852            };
853        }
854        Message::GetTransactionWithStatus(Request {
855            responder,
856            arguments: hash,
857        }) => {
858            let id = ProposalShortId::from_tx_hash(&hash);
859            let tx_pool = service.tx_pool.read().await;
860            let ret = if let Some(PoolEntry {
861                status,
862                inner: entry,
863                ..
864            }) = tx_pool.pool_map.get_by_id(&id)
865            {
866                let (tx_status, min_replace_fee) = if status == &Status::Proposed {
867                    (TxStatus::Proposed, None)
868                } else {
869                    (TxStatus::Pending, tx_pool.min_replace_fee(entry))
870                };
871                Ok(TransactionWithStatus::with_status(
872                    Some(entry.transaction().clone()),
873                    entry.cycles,
874                    entry.timestamp,
875                    tx_status,
876                    Some(entry.fee),
877                    min_replace_fee,
878                ))
879            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
880                match recent_reject_db.get(&hash) {
881                    Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
882                    Ok(_) => Ok(TransactionWithStatus::with_unknown()),
883                    Err(err) => Err(err),
884                }
885            } else {
886                Ok(TransactionWithStatus::with_unknown())
887            };
888
889            if let Err(e) = responder.send(ret) {
890                error!("Responder sending get_tx_status failed {:?}", e)
891            };
892        }
893        Message::FetchTxs(Request {
894            responder,
895            arguments: short_ids,
896        }) => {
897            let tx_pool = service.tx_pool.read().await;
898            let orphan = service.orphan.read().await;
899            let txs = short_ids
900                .into_iter()
901                .filter_map(|short_id| {
902                    tx_pool
903                        .get_tx_from_pool_or_store(&short_id)
904                        .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
905                        .map(|tx| (short_id, tx))
906                })
907                .collect();
908            if let Err(e) = responder.send(txs) {
909                error!("Responder sending fetch_txs failed {:?}", e);
910            };
911        }
912        Message::FetchTxsWithCycles(Request {
913            responder,
914            arguments: short_ids,
915        }) => {
916            let tx_pool = service.tx_pool.read().await;
917            let txs = short_ids
918                .into_iter()
919                .filter_map(|short_id| {
920                    tx_pool
921                        .get_tx_with_cycles(&short_id)
922                        .map(|(tx, cycles)| (short_id, (tx, cycles)))
923                })
924                .collect();
925            if let Err(e) = responder.send(txs) {
926                error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
927            };
928        }
929        Message::NewUncle(Notify { arguments: uncle }) => {
930            service.receive_candidate_uncle(uncle).await;
931        }
932        Message::ClearPool(Request {
933            responder,
934            arguments: new_snapshot,
935        }) => {
936            service.clear_pool(new_snapshot).await;
937            if let Err(e) = responder.send(()) {
938                error!("Responder sending clear_pool failed {:?}", e)
939            };
940        }
941        Message::ClearVerifyQueue(Request { responder, .. }) => {
942            service.verify_queue.write().await.clear();
943            if let Err(e) = responder.send(()) {
944                error!("Responder sending clear_verify_queue failed {:?}", e)
945            };
946        }
947        Message::GetPoolTxDetails(Request {
948            responder,
949            arguments: tx_hash,
950        }) => {
951            let tx_pool = service.tx_pool.read().await;
952            let id = ProposalShortId::from_tx_hash(&tx_hash);
953            let tx_details = tx_pool
954                .get_tx_detail(&id)
955                .unwrap_or(PoolTxDetailInfo::with_unknown());
956            if let Err(e) = responder.send(tx_details) {
957                error!("responder send get_pool_tx_details failed {:?}", e)
958            };
959        }
960        Message::GetAllEntryInfo(Request { responder, .. }) => {
961            let tx_pool = service.tx_pool.read().await;
962            let info = tx_pool.get_all_entry_info();
963            if let Err(e) = responder.send(info) {
964                error!("Responder sending get_all_entry_info failed {:?}", e)
965            };
966        }
967        Message::GetAllIds(Request { responder, .. }) => {
968            let tx_pool = service.tx_pool.read().await;
969            let ids = tx_pool.get_ids();
970            if let Err(e) = responder.send(ids) {
971                error!("Responder sending get_ids failed {:?}", e)
972            };
973        }
974        Message::SavePool(Request { responder, .. }) => {
975            service.save_pool().await;
976            if let Err(e) = responder.send(()) {
977                error!("Responder sending save_pool failed {:?}", e)
978            };
979        }
980        Message::UpdateIBDState(Request {
981            responder,
982            arguments: in_ibd,
983        }) => {
984            service.update_ibd_state(in_ibd).await;
985            if let Err(e) = responder.send(()) {
986                error!("Responder sending update_ibd_state failed {:?}", e)
987            };
988        }
989        Message::EstimateFeeRate(Request {
990            responder,
991            arguments: (estimate_mode, enable_fallback),
992        }) => {
993            let fee_estimates_result = service
994                .estimate_fee_rate(estimate_mode, enable_fallback)
995                .await;
996            if let Err(e) = responder.send(fee_estimates_result) {
997                error!("Responder sending fee_estimates_result failed {:?}", e)
998            };
999        }
1000        #[cfg(feature = "internal")]
1001        Message::PlugEntry(Request {
1002            responder,
1003            arguments: (entries, target),
1004        }) => {
1005            service.plug_entry(entries, target).await;
1006
1007            if let Err(e) = responder.send(()) {
1008                error!("Responder sending plug_entry failed {:?}", e);
1009            };
1010        }
1011        #[cfg(feature = "internal")]
1012        Message::PackageTxs(Request {
1013            responder,
1014            arguments: bytes_limit,
1015        }) => {
1016            let max_block_cycles = service.consensus.max_block_cycles();
1017            let max_block_bytes = service.consensus.max_block_bytes();
1018            let tx_pool = service.tx_pool.read().await;
1019            let (txs, _size, _cycles) = tx_pool.package_txs(
1020                max_block_cycles,
1021                bytes_limit.unwrap_or(max_block_bytes) as usize,
1022            );
1023            if let Err(e) = responder.send(txs) {
1024                error!("Responder sending plug_entry failed {:?}", e);
1025            };
1026        }
1027    }
1028}
1029
1030impl TxPoolService {
1031    /// Tx-pool information
1032    async fn info(&self) -> TxPoolInfo {
1033        let tx_pool = self.tx_pool.read().await;
1034        let orphan = self.orphan.read().await;
1035        let verify_queue = self.verify_queue.read().await;
1036        let tip_header = tx_pool.snapshot.tip_header();
1037        TxPoolInfo {
1038            tip_hash: tip_header.hash(),
1039            tip_number: tip_header.number(),
1040            pending_size: tx_pool.pool_map.pending_size(),
1041            proposed_size: tx_pool.pool_map.proposed_size(),
1042            orphan_size: orphan.len(),
1043            total_tx_size: tx_pool.pool_map.total_tx_size,
1044            total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1045            min_fee_rate: self.tx_pool_config.min_fee_rate,
1046            min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1047            last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1048            tx_size_limit: TRANSACTION_SIZE_LIMIT,
1049            max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1050            verify_queue_size: verify_queue.len(),
1051        }
1052    }
1053
1054    /// Get Live Cell Status
1055    async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1056        let tx_pool = self.tx_pool.read().await;
1057        let snapshot = tx_pool.snapshot();
1058        let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1059        let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1060
1061        match provider.cell(&out_point, false) {
1062            CellStatus::Live(mut cell_meta) => {
1063                if eager_load {
1064                    if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1065                        cell_meta.mem_cell_data = Some(data);
1066                        cell_meta.mem_cell_data_hash = Some(data_hash);
1067                    }
1068                }
1069                CellStatus::live_cell(cell_meta)
1070            }
1071            _ => CellStatus::Unknown,
1072        }
1073    }
1074
1075    pub fn should_notify_block_assembler(&self) -> bool {
1076        self.block_assembler.is_some()
1077    }
1078
1079    pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1080        if let Some(ref block_assembler) = self.block_assembler {
1081            {
1082                block_assembler.candidate_uncles.lock().await.insert(uncle);
1083            }
1084            if self
1085                .block_assembler_sender
1086                .send(BlockAssemblerMessage::Uncle)
1087                .await
1088                .is_err()
1089            {
1090                error!("block_assembler receiver dropped");
1091            }
1092        }
1093    }
1094
1095    pub async fn update_block_assembler_before_tx_pool_reorg(
1096        &self,
1097        detached_blocks: VecDeque<BlockView>,
1098        snapshot: Arc<Snapshot>,
1099    ) {
1100        if let Some(ref block_assembler) = self.block_assembler {
1101            {
1102                let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1103                for detached_block in detached_blocks {
1104                    candidate_uncles.insert(detached_block.as_uncle());
1105                }
1106            }
1107
1108            if let Err(e) = block_assembler.update_blank(snapshot).await {
1109                error!("block_assembler update_blank error {}", e);
1110            }
1111            block_assembler.notify().await;
1112        }
1113    }
1114
1115    pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1116        if let Some(ref block_assembler) = self.block_assembler {
1117            if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1118                error!("block_assembler update failed {:?}", e);
1119            }
1120            block_assembler.notify().await;
1121        }
1122    }
1123
1124    #[cfg(feature = "internal")]
1125    pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1126        {
1127            let mut tx_pool = self.tx_pool.write().await;
1128            match target {
1129                PlugTarget::Pending => {
1130                    for entry in entries {
1131                        tx_pool
1132                            .add_pending(entry)
1133                            .expect("Plug entry add_pending error");
1134                    }
1135                }
1136                PlugTarget::Proposed => {
1137                    for entry in entries {
1138                        tx_pool
1139                            .add_proposed(entry)
1140                            .expect("Plug entry add_proposed error");
1141                    }
1142                }
1143            };
1144        }
1145
1146        if self.should_notify_block_assembler() {
1147            let msg = match target {
1148                PlugTarget::Pending => BlockAssemblerMessage::Pending,
1149                PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1150            };
1151            if self.block_assembler_sender.send(msg).await.is_err() {
1152                error!("block_assembler receiver dropped");
1153            }
1154        }
1155    }
1156}