Skip to main content

ckb_tx_pool/
service.rs

1//! Tx-pool background service
2
3use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26    core::{
27        BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28        cell::{CellProvider, CellStatus, OverlayCellProvider},
29        tx_pool::{
30            EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31            TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32        },
33    },
34    packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40    Arc,
41    atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57    pub responder: oneshot::Sender<R>,
58    pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62    pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63        Request {
64            responder,
65            arguments,
66        }
67    }
68}
69
70pub(crate) struct AsyncRequest<A, R> {
71    pub responder: tokio::sync::oneshot::Sender<R>,
72    pub arguments: A,
73}
74
75impl<A, R> AsyncRequest<A, R> {
76    pub(crate) fn call(
77        arguments: A,
78        responder: tokio::sync::oneshot::Sender<R>,
79    ) -> AsyncRequest<A, R> {
80        AsyncRequest {
81            responder,
82            arguments,
83        }
84    }
85}
86
87pub(crate) struct Notify<A> {
88    pub arguments: A,
89}
90
91impl<A> Notify<A> {
92    pub(crate) fn new(arguments: A) -> Notify<A> {
93        Notify { arguments }
94    }
95}
96
97pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
98type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
99
100pub(crate) type SubmitTxResult = Result<(), Reject>;
101
102pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
103
104type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
105type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
106type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
107
108pub(crate) type ChainReorgArgs = (
109    VecDeque<BlockView>,
110    VecDeque<BlockView>,
111    HashSet<ProposalShortId>,
112    Arc<Snapshot>,
113);
114
115pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
116
117pub(crate) enum Message {
118    BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
119    SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
120    RemoveLocalTx(Request<Byte32, bool>),
121    TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
122    SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
123    NotifyTxs(Notify<Vec<TransactionView>>),
124    FreshProposalsFilter(AsyncRequest<Vec<ProposalShortId>, Vec<ProposalShortId>>),
125    FetchTxs(AsyncRequest<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
126    FetchTxsWithCycles(AsyncRequest<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
127    GetTxPoolInfo(Request<(), TxPoolInfo>),
128    GetLiveCell(Request<(OutPoint, bool), CellStatus>),
129    GetTxStatus(Request<Byte32, GetTxStatusResult>),
130    GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
131    NewUncle(Notify<UncleBlockView>),
132    ClearPool(Request<Arc<Snapshot>, ()>),
133    ClearVerifyQueue(Request<(), ()>),
134    GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
135    GetAllIds(Request<(), TxPoolIds>),
136    SavePool(Request<(), ()>),
137    GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
138    GetTotalRecentRejectNum(Request<(), Option<u64>>),
139
140    UpdateIBDState(Request<bool, ()>),
141    EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
142
143    // test
144    #[cfg(feature = "internal")]
145    PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
146    #[cfg(feature = "internal")]
147    PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
148    SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
149}
150
151#[derive(Debug, Hash, Eq, PartialEq)]
152pub(crate) enum BlockAssemblerMessage {
153    Pending,
154    Proposed,
155    Uncle,
156    Reset(Arc<Snapshot>),
157}
158
159/// Controller to the tx-pool service.
160///
161/// The Controller is internally reference-counted and can be freely cloned. A Controller can be obtained when tx-pool service construct.
162#[derive(Clone)]
163pub struct TxPoolController {
164    sender: mpsc::Sender<Message>,
165    reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
166    chunk_tx: Arc<watch::Sender<ChunkCommand>>,
167    handle: Handle,
168    started: Arc<AtomicBool>,
169}
170
171macro_rules! send_message {
172    ($self:ident, $msg_type:ident, $args:expr) => {{
173        let (responder, response) = oneshot::channel();
174        let request = Request::call($args, responder);
175        $self
176            .sender
177            .try_send(Message::$msg_type(request))
178            .map_err(|e| {
179                let (_m, e) = handle_try_send_error(e);
180                e
181            })?;
182        block_in_place(|| response.recv())
183            .map_err(handle_recv_error)
184            .map_err(Into::into)
185    }};
186}
187
188macro_rules! send_notify {
189    ($self:ident, $msg_type:ident, $args:expr) => {{
190        let notify = Notify::new($args);
191        $self
192            .sender
193            .try_send(Message::$msg_type(notify))
194            .map_err(|e| {
195                let (_m, e) = handle_try_send_error(e);
196                e.into()
197            })
198    }};
199}
200
201impl TxPoolController {
202    /// Return whether tx-pool service is started
203    pub fn service_started(&self) -> bool {
204        self.started.load(Ordering::Acquire)
205    }
206
207    /// Set tx-pool service started, should only used for test
208    #[cfg(feature = "internal")]
209    pub fn set_service_started(&self, v: bool) {
210        self.started.store(v, Ordering::Release);
211    }
212
213    /// Return reference of tokio runtime handle
214    pub fn handle(&self) -> &Handle {
215        &self.handle
216    }
217
218    /// Generate and return block_template
219    pub fn get_block_template(
220        &self,
221        bytes_limit: Option<u64>,
222        proposals_limit: Option<u64>,
223        max_version: Option<Version>,
224    ) -> Result<BlockTemplateResult, AnyError> {
225        send_message!(
226            self,
227            BlockTemplate,
228            (bytes_limit, proposals_limit, max_version)
229        )
230    }
231
232    /// Notify new uncle
233    pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
234        send_notify!(self, NewUncle, uncle)
235    }
236
237    /// Make tx-pool consistent after a reorg, by re-adding or recursively erasing
238    /// detached block transactions from the tx-pool, and also removing any
239    /// other transactions from the tx-pool that are no longer valid given the new
240    /// tip/height.
241    pub fn update_tx_pool_for_reorg(
242        &self,
243        detached_blocks: VecDeque<BlockView>,
244        attached_blocks: VecDeque<BlockView>,
245        detached_proposal_id: HashSet<ProposalShortId>,
246        snapshot: Arc<Snapshot>,
247    ) -> Result<(), AnyError> {
248        let notify = Notify::new((
249            detached_blocks,
250            attached_blocks,
251            detached_proposal_id,
252            snapshot,
253        ));
254        self.reorg_sender.try_send(notify).map_err(|e| {
255            let (_m, e) = handle_try_send_error(e);
256            e.into()
257        })
258    }
259
260    /// Submit local tx to tx-pool
261    pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
262        send_message!(self, SubmitLocalTx, tx)
263    }
264
265    /// test if a tx can be accepted by tx-pool
266    /// Won't be broadcasted to network
267    /// won't be insert to tx-pool
268    pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
269        send_message!(self, TestAcceptTx, tx)
270    }
271
272    /// Remove tx from tx-pool
273    pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
274        send_message!(self, RemoveLocalTx, tx_hash)
275    }
276
277    /// Submit remote tx with declared cycles and origin to tx-pool
278    pub async fn submit_remote_tx(
279        &self,
280        tx: TransactionView,
281        declared_cycles: Cycle,
282        peer: PeerIndex,
283    ) -> Result<(), AnyError> {
284        send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
285    }
286
287    /// Receive txs from network, try to add txs to tx-pool
288    pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
289        send_notify!(self, NotifyTxs, txs)
290    }
291
292    /// Receive txs from network, try to add txs to tx-pool
293    pub async fn notify_txs_async(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
294        let notify = Notify::new(txs);
295        self.sender
296            .send(Message::NotifyTxs(notify))
297            .await
298            .map_err(|e| {
299                let e = ckb_error::OtherError::new(format!("SendError {e}"));
300                e.into()
301            })
302    }
303
304    /// Return tx-pool information
305    pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
306        send_message!(self, GetTxPoolInfo, ())
307    }
308
309    /// Return tx-pool information
310    pub fn get_live_cell(
311        &self,
312        out_point: OutPoint,
313        with_data: bool,
314    ) -> Result<CellStatus, AnyError> {
315        send_message!(self, GetLiveCell, (out_point, with_data))
316    }
317
318    /// Return fresh proposals
319    pub async fn fresh_proposals_filter(
320        &self,
321        proposals: Vec<ProposalShortId>,
322    ) -> Result<Vec<ProposalShortId>, AnyError> {
323        let (responder, response) = tokio::sync::oneshot::channel();
324        let request = AsyncRequest::call(proposals, responder);
325        self.sender
326            .send(Message::FreshProposalsFilter(request))
327            .await?;
328        response.await.map_err(Into::into)
329    }
330
331    /// Return tx_status for rpc (get_transaction verbosity = 1)
332    pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
333        send_message!(self, GetTxStatus, hash)
334    }
335
336    /// Return transaction_with_status for rpc (get_transaction verbosity = 2)
337    pub fn get_transaction_with_status(
338        &self,
339        hash: Byte32,
340    ) -> Result<GetTransactionWithStatusResult, AnyError> {
341        send_message!(self, GetTransactionWithStatus, hash)
342    }
343
344    /// Mainly used for compact block reconstruction and block proposal pre-broadcasting
345    /// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
346    pub async fn fetch_txs(
347        &self,
348        short_ids: HashSet<ProposalShortId>,
349    ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
350        let (responder, response) = tokio::sync::oneshot::channel();
351        let request = AsyncRequest::call(short_ids, responder);
352        self.sender.send(Message::FetchTxs(request)).await?;
353        response.await.map_err(Into::into)
354    }
355
356    /// Return txs with cycles
357    /// Mainly for relay transactions
358    pub async fn fetch_txs_with_cycles(
359        &self,
360        short_ids: HashSet<ProposalShortId>,
361    ) -> Result<FetchTxsWithCyclesResult, AnyError> {
362        let (responder, response) = tokio::sync::oneshot::channel();
363        let request = AsyncRequest::call(short_ids, responder);
364        self.sender
365            .send(Message::FetchTxsWithCycles(request))
366            .await?;
367        response.await.map_err(Into::into)
368    }
369
370    /// Clears the tx-pool, removing all txs, update snapshot.
371    pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
372        send_message!(self, ClearPool, new_snapshot)
373    }
374
375    /// Clears the tx-verify-queue.
376    pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
377        send_message!(self, ClearVerifyQueue, ())
378    }
379
380    /// Returns information about all transactions in the pool.
381    pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
382        send_message!(self, GetAllEntryInfo, ())
383    }
384
385    /// Returns the IDs of all transactions in the pool.
386    pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
387        send_message!(self, GetAllIds, ())
388    }
389
390    /// query the details of a transaction in the pool
391    pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
392        send_message!(self, GetPoolTxDetails, tx_hash)
393    }
394
395    /// Saves tx pool into disk.
396    pub fn save_pool(&self) -> Result<(), AnyError> {
397        info!("Please be patient, tx-pool are saving data into disk ...");
398        send_message!(self, SavePool, ())
399    }
400
401    /// Updates IBD state.
402    pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
403        send_message!(self, UpdateIBDState, in_ibd)
404    }
405
406    /// Estimates fee rate.
407    pub fn estimate_fee_rate(
408        &self,
409        estimate_mode: EstimateMode,
410        enable_fallback: bool,
411    ) -> Result<FeeEstimatesResult, AnyError> {
412        send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
413    }
414
415    /// Sends suspend chunk process cmd
416    pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
417        //debug!("[verify-test] run suspend_chunk_process");
418        self.chunk_tx
419            .send(ChunkCommand::Suspend)
420            .map_err(handle_send_cmd_error)
421            .map_err(Into::into)
422    }
423
424    /// Sends continue chunk process cmd
425    pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
426        //debug!("[verify-test] run continue_chunk_process");
427        self.chunk_tx
428            .send(ChunkCommand::Resume)
429            .map_err(handle_send_cmd_error)
430            .map_err(Into::into)
431    }
432
433    /// Load persisted txs into pool, assume that all txs are sorted
434    fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
435        if !txs.is_empty() {
436            info!("Loading persistent tx-pool data, total {} txs", txs.len());
437            let mut failed_txs = 0;
438            for tx in txs {
439                if self.submit_local_tx(tx)?.is_err() {
440                    failed_txs += 1;
441                }
442            }
443            if failed_txs == 0 {
444                info!("Persistent tx-pool data is loaded");
445            } else {
446                info!(
447                    "Persistent tx-pool data is loaded, {} stale txs are ignored",
448                    failed_txs
449                );
450            }
451        }
452        Ok(())
453    }
454
455    /// Plug tx-pool entry to tx-pool, skip verification. only for test
456    #[cfg(feature = "internal")]
457    pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
458        send_message!(self, PlugEntry, (entries, target))
459    }
460
461    /// Package txs with specified bytes_limit. for test
462    #[cfg(feature = "internal")]
463    pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
464        send_message!(self, PackageTxs, bytes_limit)
465    }
466
467    /// Submit local test tx to tx-pool, this tx will be put into verify queue directly.
468    pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
469        send_message!(self, SubmitLocalTestTx, tx)
470    }
471
472    /// get total recent reject num
473    pub fn get_total_recent_reject_num(&self) -> Result<Option<u64>, AnyError> {
474        send_message!(self, GetTotalRecentRejectNum, ())
475    }
476}
477
478/// A builder used to create TxPoolService.
479pub struct TxPoolServiceBuilder {
480    pub(crate) tx_pool_config: TxPoolConfig,
481    pub(crate) tx_pool_controller: TxPoolController,
482    pub(crate) snapshot: Arc<Snapshot>,
483    pub(crate) block_assembler: Option<BlockAssembler>,
484    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
485    pub(crate) callbacks: Callbacks,
486    pub(crate) receiver: mpsc::Receiver<Message>,
487    pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
488    pub(crate) signal_receiver: CancellationToken,
489    pub(crate) handle: Handle,
490    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
491    pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
492    pub(crate) started: Arc<AtomicBool>,
493    pub(crate) block_assembler_channel: (
494        mpsc::Sender<BlockAssemblerMessage>,
495        mpsc::Receiver<BlockAssemblerMessage>,
496    ),
497    pub(crate) fee_estimator: FeeEstimator,
498}
499
500impl TxPoolServiceBuilder {
501    /// Creates a new TxPoolServiceBuilder.
502    pub fn new(
503        tx_pool_config: TxPoolConfig,
504        snapshot: Arc<Snapshot>,
505        block_assembler_config: Option<BlockAssemblerConfig>,
506        txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
507        handle: &Handle,
508        tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
509        fee_estimator: FeeEstimator,
510    ) -> (TxPoolServiceBuilder, TxPoolController) {
511        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
512        let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
513        let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
514        let signal_receiver: CancellationToken = new_tokio_exit_rx();
515        let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
516        let started = Arc::new(AtomicBool::new(false));
517
518        let controller = TxPoolController {
519            sender,
520            reorg_sender,
521            handle: handle.clone(),
522            chunk_tx: Arc::new(chunk_tx),
523            started: Arc::clone(&started),
524        };
525
526        let block_assembler = block_assembler_config.and_then(|config| {
527            BlockAssembler::new(config, Arc::clone(&snapshot))
528                .inspect_err(|err| error!("failed to initialize block assembler: {}", err))
529                .ok()
530        });
531        let builder = TxPoolServiceBuilder {
532            tx_pool_config,
533            tx_pool_controller: controller.clone(),
534            snapshot,
535            block_assembler,
536            txs_verify_cache,
537            callbacks: Callbacks::new(),
538            receiver,
539            reorg_receiver,
540            signal_receiver,
541            handle: handle.clone(),
542            tx_relay_sender,
543            chunk_rx,
544            started,
545            block_assembler_channel,
546            fee_estimator,
547        };
548
549        (builder, controller)
550    }
551
552    /// Register new pending callback
553    pub fn register_pending(&mut self, callback: PendingCallback) {
554        self.callbacks.register_pending(callback);
555    }
556
557    /// Return cloned tx relayer sender
558    pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
559        self.tx_relay_sender.clone()
560    }
561
562    /// Register new proposed callback
563    pub fn register_proposed(&mut self, callback: ProposedCallback) {
564        self.callbacks.register_proposed(callback);
565    }
566
567    /// Register new abandon callback
568    pub fn register_reject(&mut self, callback: RejectCallback) {
569        self.callbacks.register_reject(callback);
570    }
571
572    /// Start a background thread tx-pool service by taking ownership of the Builder, and returns a TxPoolController.
573    pub fn start(self, network: NetworkController) {
574        let consensus = self.snapshot.cloned_consensus();
575
576        let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
577            self.tx_pool_config.max_tx_verify_cycles,
578        )));
579
580        let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
581        let txs = match tx_pool.load_from_file() {
582            Ok(txs) => txs,
583            Err(e) => {
584                error!("{}", e.to_string());
585                error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
586                Vec::new()
587            }
588        };
589
590        let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
591        let service = TxPoolService {
592            tx_pool_config: Arc::new(tx_pool.config.clone()),
593            tx_pool: Arc::new(RwLock::new(tx_pool)),
594            orphan: Arc::new(RwLock::new(OrphanPool::new())),
595            block_assembler: self.block_assembler,
596            txs_verify_cache: self.txs_verify_cache,
597            callbacks: Arc::new(self.callbacks),
598            tx_relay_sender: self.tx_relay_sender,
599            block_assembler_sender,
600            verify_queue: Arc::clone(&verify_queue),
601            network,
602            consensus,
603            fee_estimator: self.fee_estimator,
604        };
605
606        let mut verify_mgr =
607            VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
608        self.handle.spawn(async move { verify_mgr.run().await });
609
610        let mut receiver = self.receiver;
611        let mut reorg_receiver = self.reorg_receiver;
612        let handle_clone = self.handle.clone();
613
614        let process_service = service.clone();
615        let signal_receiver = self.signal_receiver.clone();
616        self.handle.spawn(async move {
617            loop {
618                tokio::select! {
619                    Some(message) = receiver.recv() => {
620                        let service_clone = process_service.clone();
621                        handle_clone.spawn(process(service_clone, message));
622                    },
623                    _ = signal_receiver.cancelled() => {
624                        info!("TxPool is saving, please wait...");
625                        process_service.save_pool().await;
626                        info!("TxPool process_service exit now");
627                        break
628                    },
629                    else => break,
630                }
631            }
632        });
633
634        let process_service = service.clone();
635        if let Some(ref block_assembler) = service.block_assembler {
636            let signal_receiver = self.signal_receiver.clone();
637            let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
638            if interval.is_zero() {
639                // block_assembler.update_interval_millis set zero interval should only be used for tests,
640                // external notification will be disabled.
641                ckb_logger::warn!(
642                    "block_assembler.update_interval_millis set to zero interval. \
643                    This should only be used for tests, as external notification will be disabled."
644                );
645                self.handle.spawn(async move {
646                    loop {
647                        tokio::select! {
648                            Some(message) = block_assembler_receiver.recv() => {
649                                let service_clone = process_service.clone();
650                                block_assembler::process(service_clone, &message).await;
651                            },
652                            _ = signal_receiver.cancelled() => {
653                                info!("TxPool block_assembler process service received exit signal, exit now");
654                                break
655                            },
656                            else => break,
657                        }
658                    }
659                });
660            } else {
661                self.handle.spawn(async move {
662                    let mut interval = tokio::time::interval(interval);
663                    let mut queue = LinkedHashSet::new();
664                    loop {
665                        tokio::select! {
666                            Some(message) = block_assembler_receiver.recv() => {
667                                if let BlockAssemblerMessage::Reset(..) = message {
668                                    let service_clone = process_service.clone();
669                                    queue.clear();
670                                    block_assembler::process(service_clone, &message).await;
671                                } else {
672                                    queue.insert(message);
673                                }
674                            },
675                            _ = interval.tick() => {
676                                for message in &queue {
677                                    let service_clone = process_service.clone();
678                                    block_assembler::process(service_clone, message).await;
679                                }
680                                if !queue.is_empty()
681                                    && let Some(ref block_assembler) = process_service.block_assembler {
682                                        block_assembler.notify().await;
683                                    }
684                                queue.clear();
685                            }
686                            _ = signal_receiver.cancelled() => {
687                                info!("TxPool block_assembler process service received exit signal, exit now");
688                                break
689                            },
690                            else => break,
691                        }
692                    }
693                });
694            }
695        }
696
697        let signal_receiver = self.signal_receiver;
698        self.handle.spawn(async move {
699            loop {
700                tokio::select! {
701                    Some(message) = reorg_receiver.recv() => {
702                        let Notify {
703                            arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
704                        } = message;
705                        let snapshot_clone = Arc::clone(&snapshot);
706                        let detached_blocks_clone = detached_blocks.clone();
707                        service.update_block_assembler_before_tx_pool_reorg(
708                            detached_blocks_clone,
709                            snapshot_clone
710                        ).await;
711
712                        let snapshot_clone = Arc::clone(&snapshot);
713                        service
714                        .update_tx_pool_for_reorg(
715                            detached_blocks,
716                            attached_blocks,
717                            detached_proposal_id,
718                            snapshot_clone,
719                        )
720                        .await;
721
722                        service.update_block_assembler_after_tx_pool_reorg().await;
723                    },
724                    _ = signal_receiver.cancelled() => {
725                        info!("TxPool reorg process service received exit signal, exit now");
726                        break
727                    },
728                    else => break,
729                }
730            }
731        });
732        self.started.store(true, Ordering::Release);
733        if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
734            error!("Failed to import persistent txs, cause: {}", err);
735        }
736    }
737}
738
739#[derive(Clone)]
740pub(crate) struct TxPoolService {
741    pub(crate) tx_pool: Arc<RwLock<TxPool>>,
742    pub(crate) orphan: Arc<RwLock<OrphanPool>>,
743    pub(crate) consensus: Arc<Consensus>,
744    pub(crate) tx_pool_config: Arc<TxPoolConfig>,
745    pub(crate) block_assembler: Option<BlockAssembler>,
746    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
747    pub(crate) callbacks: Arc<Callbacks>,
748    pub(crate) network: NetworkController,
749    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
750    pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
751    pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
752    pub(crate) fee_estimator: FeeEstimator,
753}
754
755/// tx verification result
756pub enum TxVerificationResult {
757    /// tx is verified
758    Ok {
759        /// original peer
760        original_peer: Option<PeerIndex>,
761        /// transaction hash
762        tx_hash: Byte32,
763    },
764    /// tx parent is unknown
765    UnknownParents {
766        /// original peer
767        peer: PeerIndex,
768        /// parents hashes
769        parents: HashSet<Byte32>,
770    },
771    /// tx is rejected
772    Reject {
773        /// transaction hash
774        tx_hash: Byte32,
775    },
776}
777
778#[allow(clippy::cognitive_complexity)]
779async fn process(mut service: TxPoolService, message: Message) {
780    match message {
781        Message::GetTxPoolInfo(Request { responder, .. }) => {
782            let info = service.info().await;
783            if let Err(e) = responder.send(info) {
784                error!("Responder sending get_tx_pool_info failed {:?}", e);
785            };
786        }
787        Message::GetLiveCell(Request {
788            responder,
789            arguments: (out_point, with_data),
790        }) => {
791            let live_cell_status = service.get_live_cell(out_point, with_data).await;
792            if let Err(e) = responder.send(live_cell_status) {
793                error!("Responder sending get_live_cell failed {:?}", e);
794            };
795        }
796        Message::BlockTemplate(Request {
797            responder,
798            arguments: (_bytes_limit, _proposals_limit, _max_version),
799        }) => {
800            let block_template_result = service.get_block_template().await;
801            if let Err(e) = responder.send(block_template_result) {
802                error!("Responder sending block_template_result failed {:?}", e);
803            };
804        }
805        Message::SubmitLocalTx(Request {
806            responder,
807            arguments: tx,
808        }) => {
809            let result = service.process_tx(tx, None).await.map(|_| ());
810            if let Err(e) = responder.send(result) {
811                error!("Responder sending submit_tx result failed {:?}", e);
812            };
813        }
814        Message::SubmitLocalTestTx(Request {
815            responder,
816            arguments: tx,
817        }) => {
818            let result = service
819                .resumeble_process_tx(tx, false, None)
820                .await
821                .map(|_| ());
822            if let Err(e) = responder.send(result) {
823                error!("Responder sending submit_tx result failed {:?}", e);
824            };
825        }
826        Message::RemoveLocalTx(Request {
827            responder,
828            arguments: tx_hash,
829        }) => {
830            let result = service.remove_tx(tx_hash).await;
831            if let Err(e) = responder.send(result) {
832                error!("Responder sending remove_tx result failed {:?}", e);
833            };
834        }
835        Message::TestAcceptTx(Request {
836            responder,
837            arguments: tx,
838        }) => {
839            let result = service.test_accept_tx(tx).await;
840            if let Err(e) = responder.send(result.map(|r| r.into())) {
841                error!("Responder sending test_accept_tx result failed {:?}", e);
842            };
843        }
844        Message::SubmitRemoteTx(Request {
845            responder,
846            arguments: (tx, declared_cycles, peer),
847        }) => {
848            let _result = service
849                .resumeble_process_tx(tx, false, Some((declared_cycles, peer)))
850                .await;
851            if let Err(e) = responder.send(()) {
852                error!("Responder sending submit_tx result failed {:?}", e);
853            };
854        }
855        Message::NotifyTxs(Notify { arguments: txs }) => {
856            for tx in txs {
857                let _ret = service.resumeble_process_tx(tx, true, None).await;
858            }
859        }
860        Message::FreshProposalsFilter(AsyncRequest {
861            responder,
862            arguments: proposals,
863        }) => {
864            let new_proposals = service.exclude_existing_proposal(proposals).await;
865            if let Err(e) = responder.send(new_proposals) {
866                error!("Responder sending fresh_proposals_filter failed {:?}", e);
867            };
868        }
869        Message::GetTxStatus(Request {
870            responder,
871            arguments: hash,
872        }) => {
873            let id = ProposalShortId::from_tx_hash(&hash);
874            let tx_pool = service.tx_pool.read().await;
875            let ret = if let Some(PoolEntry {
876                status,
877                inner: entry,
878                ..
879            }) = tx_pool.pool_map.get_by_id(&id)
880            {
881                let status = if status == &Status::Proposed {
882                    TxStatus::Proposed
883                } else {
884                    TxStatus::Pending
885                };
886                Ok((status, Some(entry.cycles)))
887            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
888                let recent_reject_result = recent_reject_db.get(&hash);
889                if let Ok(recent_reject) = recent_reject_result {
890                    if let Some(record) = recent_reject {
891                        Ok((TxStatus::Rejected(record), None))
892                    } else {
893                        Ok((TxStatus::Unknown, None))
894                    }
895                } else {
896                    Err(recent_reject_result.unwrap_err())
897                }
898            } else {
899                Ok((TxStatus::Unknown, None))
900            };
901
902            if let Err(e) = responder.send(ret) {
903                error!("Responder sending get_tx_status failed {:?}", e)
904            };
905        }
906        Message::GetTransactionWithStatus(Request {
907            responder,
908            arguments: hash,
909        }) => {
910            let id = ProposalShortId::from_tx_hash(&hash);
911            let tx_pool = service.tx_pool.read().await;
912            let ret = if let Some(PoolEntry {
913                status,
914                inner: entry,
915                ..
916            }) = tx_pool.pool_map.get_by_id(&id)
917            {
918                let (tx_status, min_replace_fee) = if status == &Status::Proposed {
919                    (TxStatus::Proposed, None)
920                } else {
921                    (TxStatus::Pending, tx_pool.min_replace_fee(entry))
922                };
923                Ok(TransactionWithStatus::with_status(
924                    Some(entry.transaction().clone()),
925                    entry.cycles,
926                    entry.timestamp,
927                    tx_status,
928                    Some(entry.fee),
929                    min_replace_fee,
930                ))
931            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
932                match recent_reject_db.get(&hash) {
933                    Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
934                    Ok(_) => Ok(TransactionWithStatus::with_unknown()),
935                    Err(err) => Err(err),
936                }
937            } else {
938                Ok(TransactionWithStatus::with_unknown())
939            };
940
941            if let Err(e) = responder.send(ret) {
942                error!("Responder sending get_tx_status failed {:?}", e)
943            };
944        }
945        Message::FetchTxs(AsyncRequest {
946            responder,
947            arguments: short_ids,
948        }) => {
949            let txs_map = service.get_tx_for_compact_block(short_ids).await;
950            if let Err(e) = responder.send(txs_map) {
951                error!("Responder sending fetch_txs failed {:?}", e);
952            };
953        }
954        Message::FetchTxsWithCycles(AsyncRequest {
955            responder,
956            arguments: short_ids,
957        }) => {
958            let tx_pool = service.tx_pool.read().await;
959            let txs = short_ids
960                .into_iter()
961                .filter_map(|short_id| {
962                    tx_pool
963                        .get_tx_with_cycles(&short_id)
964                        .map(|(tx, cycles)| (short_id, (tx, cycles)))
965                })
966                .collect();
967            if let Err(e) = responder.send(txs) {
968                error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
969            };
970        }
971        Message::NewUncle(Notify { arguments: uncle }) => {
972            service.receive_candidate_uncle(uncle).await;
973        }
974        Message::ClearPool(Request {
975            responder,
976            arguments: new_snapshot,
977        }) => {
978            service.clear_pool(new_snapshot).await;
979            if let Err(e) = responder.send(()) {
980                error!("Responder sending clear_pool failed {:?}", e)
981            };
982        }
983        Message::ClearVerifyQueue(Request { responder, .. }) => {
984            service.verify_queue.write().await.clear();
985            if let Err(e) = responder.send(()) {
986                error!("Responder sending clear_verify_queue failed {:?}", e)
987            };
988        }
989        Message::GetPoolTxDetails(Request {
990            responder,
991            arguments: tx_hash,
992        }) => {
993            let tx_pool = service.tx_pool.read().await;
994            let id = ProposalShortId::from_tx_hash(&tx_hash);
995            let tx_details = tx_pool
996                .get_tx_detail(&id)
997                .unwrap_or(PoolTxDetailInfo::with_unknown());
998            if let Err(e) = responder.send(tx_details) {
999                error!("responder send get_pool_tx_details failed {:?}", e)
1000            };
1001        }
1002        Message::GetAllEntryInfo(Request { responder, .. }) => {
1003            let tx_pool = service.tx_pool.read().await;
1004            let info = tx_pool.get_all_entry_info();
1005            if let Err(e) = responder.send(info) {
1006                error!("Responder sending get_all_entry_info failed {:?}", e)
1007            };
1008        }
1009        Message::GetAllIds(Request { responder, .. }) => {
1010            let tx_pool = service.tx_pool.read().await;
1011            let ids = tx_pool.get_ids();
1012            if let Err(e) = responder.send(ids) {
1013                error!("Responder sending get_ids failed {:?}", e)
1014            };
1015        }
1016        Message::SavePool(Request { responder, .. }) => {
1017            service.save_pool().await;
1018            if let Err(e) = responder.send(()) {
1019                error!("Responder sending save_pool failed {:?}", e)
1020            };
1021        }
1022        Message::UpdateIBDState(Request {
1023            responder,
1024            arguments: in_ibd,
1025        }) => {
1026            service.update_ibd_state(in_ibd).await;
1027            if let Err(e) = responder.send(()) {
1028                error!("Responder sending update_ibd_state failed {:?}", e)
1029            };
1030        }
1031        Message::EstimateFeeRate(Request {
1032            responder,
1033            arguments: (estimate_mode, enable_fallback),
1034        }) => {
1035            let fee_estimates_result = service
1036                .estimate_fee_rate(estimate_mode, enable_fallback)
1037                .await;
1038            if let Err(e) = responder.send(fee_estimates_result) {
1039                error!("Responder sending fee_estimates_result failed {:?}", e)
1040            };
1041        }
1042        #[cfg(feature = "internal")]
1043        Message::PlugEntry(Request {
1044            responder,
1045            arguments: (entries, target),
1046        }) => {
1047            service.plug_entry(entries, target).await;
1048
1049            if let Err(e) = responder.send(()) {
1050                error!("Responder sending plug_entry failed {:?}", e);
1051            };
1052        }
1053        #[cfg(feature = "internal")]
1054        Message::PackageTxs(Request {
1055            responder,
1056            arguments: bytes_limit,
1057        }) => {
1058            let max_block_cycles = service.consensus.max_block_cycles();
1059            let max_block_bytes = service.consensus.max_block_bytes();
1060            let tx_pool = service.tx_pool.read().await;
1061            let (txs, _size, _cycles) = tx_pool.package_txs(
1062                max_block_cycles,
1063                bytes_limit.unwrap_or(max_block_bytes) as usize,
1064            );
1065            if let Err(e) = responder.send(txs) {
1066                error!("Responder sending plug_entry failed {:?}", e);
1067            };
1068        }
1069        Message::GetTotalRecentRejectNum(Request { responder, .. }) => {
1070            let total_recent_reject_num = service.get_total_recent_reject_num().await;
1071            if let Err(e) = responder.send(total_recent_reject_num) {
1072                error!("Responder sending total_recent_reject_num failed {:?}", e)
1073            };
1074        }
1075    }
1076}
1077
1078impl TxPoolService {
1079    /// Tx-pool information
1080    async fn info(&self) -> TxPoolInfo {
1081        let tx_pool = self.tx_pool.read().await;
1082        let orphan = self.orphan.read().await;
1083        let verify_queue = self.verify_queue.read().await;
1084        let tip_header = tx_pool.snapshot.tip_header();
1085        TxPoolInfo {
1086            tip_hash: tip_header.hash(),
1087            tip_number: tip_header.number(),
1088            pending_size: tx_pool.pool_map.pending_size(),
1089            proposed_size: tx_pool.pool_map.proposed_size(),
1090            orphan_size: orphan.len(),
1091            total_tx_size: tx_pool.pool_map.total_tx_size,
1092            total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1093            min_fee_rate: self.tx_pool_config.min_fee_rate,
1094            min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1095            last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1096            tx_size_limit: TRANSACTION_SIZE_LIMIT,
1097            max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1098            verify_queue_size: verify_queue.len(),
1099        }
1100    }
1101
1102    async fn get_total_recent_reject_num(&self) -> Option<u64> {
1103        let tx_pool = self.tx_pool.read().await;
1104        tx_pool
1105            .recent_reject
1106            .as_ref()
1107            .map(|r| r.get_estimate_total_keys_num())
1108    }
1109
1110    /// Get Live Cell Status
1111    async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1112        let tx_pool = self.tx_pool.read().await;
1113        let snapshot = tx_pool.snapshot();
1114        let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1115        let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1116
1117        match provider.cell(&out_point, false) {
1118            CellStatus::Live(mut cell_meta) => {
1119                if eager_load && let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1120                    cell_meta.mem_cell_data = Some(data);
1121                    cell_meta.mem_cell_data_hash = Some(data_hash);
1122                }
1123                CellStatus::live_cell(cell_meta)
1124            }
1125            _ => CellStatus::Unknown,
1126        }
1127    }
1128
1129    pub fn should_notify_block_assembler(&self) -> bool {
1130        self.block_assembler.is_some()
1131    }
1132
1133    /// Excludes proposals that already exist in either the proposal pool or the verification queue.
1134    ///
1135    /// Any proposal that appears in **either** of these two structures is considered "already exists"
1136    /// and will be filtered out.
1137    /// - already accepted and stored in the main pool (`pool_map`), or
1138    /// - orphan_pool that are waiting for missing parents
1139    /// - currently being verified (`verify_queue`).
1140    ///
1141    /// /// # Returns
1142    ///
1143    /// A new `Vec<ProposalShortId> ` containing only the proposals that are **completely new**
1144    /// (not present in `pool_map` nor in `verify_queue`).
1145    pub async fn exclude_existing_proposal(
1146        &self,
1147        mut proposals: Vec<ProposalShortId>,
1148    ) -> Vec<ProposalShortId> {
1149        {
1150            let verify_queue = self.verify_queue.read().await;
1151            proposals.retain(|id| !verify_queue.contains_key(id));
1152        }
1153        {
1154            let orphan = self.orphan.read().await;
1155            proposals.retain(|id| !orphan.contains_key(id));
1156        }
1157        {
1158            let tx_pool = self.tx_pool.read().await;
1159            proposals.retain(|id| !tx_pool.contains_proposal_id(id));
1160        }
1161        proposals
1162    }
1163
1164    /// Retrieves transactions required for compact block reconstruction.
1165    ///
1166    /// During compact block relay, a node may receive a block that contains transactions
1167    /// still being verified and not yet present in the main mempool. This method searches
1168    /// **both** primary locations where a transaction can reside when its short ID is known:
1169    ///
1170    /// 1. `pool_map` – the main mempool (already accepted transactions)
1171    /// 2. `verify_queue` – transactions currently undergoing background validation
1172    /// 3. `orphan_pool`   – Orphan transactions that are waiting for missing parents
1173    ///
1174    /// # Returns
1175    /// A map containing only the transactions that were found, keyed by their short ID.
1176    /// Missing entries are simply omitted (caller should treat absence as "need to request")
1177    /// Returning a `HashMap` allows the caller (compact block reconstructor) to:
1178    /// - Immediately obtain all locally-available transactions in a single call
1179    /// - Quickly identify which short IDs are missing
1180    pub async fn get_tx_for_compact_block(
1181        &self,
1182        short_ids: HashSet<ProposalShortId>,
1183    ) -> HashMap<ProposalShortId, TransactionView> {
1184        let mut txs = HashMap::with_capacity(short_ids.len());
1185        {
1186            let verify_queue = self.verify_queue.read().await;
1187            txs.extend(short_ids.iter().filter_map(|short_id| {
1188                verify_queue
1189                    .get_tx_by_id(short_id)
1190                    .map(|entry| (short_id.to_owned(), entry.tx.to_owned()))
1191            }));
1192        }
1193        {
1194            let orphan = self.orphan.read().await;
1195            txs.extend(short_ids.iter().filter_map(|short_id| {
1196                orphan
1197                    .get(short_id)
1198                    .map(|entry| (short_id.to_owned(), entry.tx.to_owned()))
1199            }));
1200        }
1201        {
1202            let tx_pool = self.tx_pool.read().await;
1203            txs.extend(short_ids.iter().filter_map(|short_id| {
1204                tx_pool
1205                    .get_tx_from_pool_or_store(short_id)
1206                    .map(|tx| (short_id.to_owned(), tx))
1207            }));
1208        }
1209        txs
1210    }
1211
1212    pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1213        if let Some(ref block_assembler) = self.block_assembler {
1214            {
1215                block_assembler.candidate_uncles.lock().await.insert(uncle);
1216            }
1217            if self
1218                .block_assembler_sender
1219                .send(BlockAssemblerMessage::Uncle)
1220                .await
1221                .is_err()
1222            {
1223                error!("block_assembler receiver dropped");
1224            }
1225        }
1226    }
1227
1228    pub async fn update_block_assembler_before_tx_pool_reorg(
1229        &self,
1230        detached_blocks: VecDeque<BlockView>,
1231        snapshot: Arc<Snapshot>,
1232    ) {
1233        if let Some(ref block_assembler) = self.block_assembler {
1234            {
1235                let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1236                for detached_block in detached_blocks {
1237                    candidate_uncles.insert(detached_block.as_uncle());
1238                }
1239            }
1240
1241            if let Err(e) = block_assembler.update_blank(snapshot).await {
1242                error!("block_assembler update_blank error {}", e);
1243            }
1244            block_assembler.notify().await;
1245        }
1246    }
1247
1248    pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1249        if let Some(ref block_assembler) = self.block_assembler {
1250            if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1251                error!("block_assembler update failed {:?}", e);
1252            }
1253            block_assembler.notify().await;
1254        }
1255    }
1256
1257    #[cfg(feature = "internal")]
1258    pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1259        {
1260            let mut tx_pool = self.tx_pool.write().await;
1261            match target {
1262                PlugTarget::Pending => {
1263                    for entry in entries {
1264                        tx_pool
1265                            .add_pending(entry)
1266                            .expect("Plug entry add_pending error");
1267                    }
1268                }
1269                PlugTarget::Proposed => {
1270                    for entry in entries {
1271                        tx_pool
1272                            .add_proposed(entry)
1273                            .expect("Plug entry add_proposed error");
1274                    }
1275                }
1276            };
1277        }
1278
1279        if self.should_notify_block_assembler() {
1280            let msg = match target {
1281                PlugTarget::Pending => BlockAssemblerMessage::Pending,
1282                PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1283            };
1284            if self.block_assembler_sender.send(msg).await.is_err() {
1285                error!("block_assembler receiver dropped");
1286            }
1287        }
1288    }
1289}