ckb_tx_pool/
service.rs

1//! Tx-pool background service
2
3use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26    core::{
27        BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28        cell::{CellProvider, CellStatus, OverlayCellProvider},
29        tx_pool::{
30            EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31            TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32        },
33    },
34    packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40    Arc,
41    atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57    pub responder: oneshot::Sender<R>,
58    pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62    pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63        Request {
64            responder,
65            arguments,
66        }
67    }
68}
69
70pub(crate) struct AsyncRequest<A, R> {
71    pub responder: tokio::sync::oneshot::Sender<R>,
72    pub arguments: A,
73}
74
75impl<A, R> AsyncRequest<A, R> {
76    pub(crate) fn call(
77        arguments: A,
78        responder: tokio::sync::oneshot::Sender<R>,
79    ) -> AsyncRequest<A, R> {
80        AsyncRequest {
81            responder,
82            arguments,
83        }
84    }
85}
86
87pub(crate) struct Notify<A> {
88    pub arguments: A,
89}
90
91impl<A> Notify<A> {
92    pub(crate) fn new(arguments: A) -> Notify<A> {
93        Notify { arguments }
94    }
95}
96
97pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
98type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
99
100pub(crate) type SubmitTxResult = Result<(), Reject>;
101
102pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
103
104type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
105type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
106type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
107
108pub(crate) type ChainReorgArgs = (
109    VecDeque<BlockView>,
110    VecDeque<BlockView>,
111    HashSet<ProposalShortId>,
112    Arc<Snapshot>,
113);
114
115pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
116
117pub(crate) enum Message {
118    BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
119    SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
120    RemoveLocalTx(Request<Byte32, bool>),
121    TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
122    SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
123    NotifyTxs(Notify<Vec<TransactionView>>),
124    FreshProposalsFilter(AsyncRequest<Vec<ProposalShortId>, Vec<ProposalShortId>>),
125    FetchTxs(AsyncRequest<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
126    FetchTxsWithCycles(AsyncRequest<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
127    GetTxPoolInfo(Request<(), TxPoolInfo>),
128    GetLiveCell(Request<(OutPoint, bool), CellStatus>),
129    GetTxStatus(Request<Byte32, GetTxStatusResult>),
130    GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
131    NewUncle(Notify<UncleBlockView>),
132    ClearPool(Request<Arc<Snapshot>, ()>),
133    ClearVerifyQueue(Request<(), ()>),
134    GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
135    GetAllIds(Request<(), TxPoolIds>),
136    SavePool(Request<(), ()>),
137    GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
138
139    UpdateIBDState(Request<bool, ()>),
140    EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
141
142    // test
143    #[cfg(feature = "internal")]
144    PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
145    #[cfg(feature = "internal")]
146    PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
147    SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
148}
149
150#[derive(Debug, Hash, Eq, PartialEq)]
151pub(crate) enum BlockAssemblerMessage {
152    Pending,
153    Proposed,
154    Uncle,
155    Reset(Arc<Snapshot>),
156}
157
158/// Controller to the tx-pool service.
159///
160/// The Controller is internally reference-counted and can be freely cloned. A Controller can be obtained when tx-pool service construct.
161#[derive(Clone)]
162pub struct TxPoolController {
163    sender: mpsc::Sender<Message>,
164    reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
165    chunk_tx: Arc<watch::Sender<ChunkCommand>>,
166    handle: Handle,
167    started: Arc<AtomicBool>,
168}
169
170macro_rules! send_message {
171    ($self:ident, $msg_type:ident, $args:expr) => {{
172        let (responder, response) = oneshot::channel();
173        let request = Request::call($args, responder);
174        $self
175            .sender
176            .try_send(Message::$msg_type(request))
177            .map_err(|e| {
178                let (_m, e) = handle_try_send_error(e);
179                e
180            })?;
181        block_in_place(|| response.recv())
182            .map_err(handle_recv_error)
183            .map_err(Into::into)
184    }};
185}
186
187macro_rules! send_notify {
188    ($self:ident, $msg_type:ident, $args:expr) => {{
189        let notify = Notify::new($args);
190        $self
191            .sender
192            .try_send(Message::$msg_type(notify))
193            .map_err(|e| {
194                let (_m, e) = handle_try_send_error(e);
195                e.into()
196            })
197    }};
198}
199
200impl TxPoolController {
201    /// Return whether tx-pool service is started
202    pub fn service_started(&self) -> bool {
203        self.started.load(Ordering::Acquire)
204    }
205
206    /// Set tx-pool service started, should only used for test
207    #[cfg(feature = "internal")]
208    pub fn set_service_started(&self, v: bool) {
209        self.started.store(v, Ordering::Release);
210    }
211
212    /// Return reference of tokio runtime handle
213    pub fn handle(&self) -> &Handle {
214        &self.handle
215    }
216
217    /// Generate and return block_template
218    pub fn get_block_template(
219        &self,
220        bytes_limit: Option<u64>,
221        proposals_limit: Option<u64>,
222        max_version: Option<Version>,
223    ) -> Result<BlockTemplateResult, AnyError> {
224        send_message!(
225            self,
226            BlockTemplate,
227            (bytes_limit, proposals_limit, max_version)
228        )
229    }
230
231    /// Notify new uncle
232    pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
233        send_notify!(self, NewUncle, uncle)
234    }
235
236    /// Make tx-pool consistent after a reorg, by re-adding or recursively erasing
237    /// detached block transactions from the tx-pool, and also removing any
238    /// other transactions from the tx-pool that are no longer valid given the new
239    /// tip/height.
240    pub fn update_tx_pool_for_reorg(
241        &self,
242        detached_blocks: VecDeque<BlockView>,
243        attached_blocks: VecDeque<BlockView>,
244        detached_proposal_id: HashSet<ProposalShortId>,
245        snapshot: Arc<Snapshot>,
246    ) -> Result<(), AnyError> {
247        let notify = Notify::new((
248            detached_blocks,
249            attached_blocks,
250            detached_proposal_id,
251            snapshot,
252        ));
253        self.reorg_sender.try_send(notify).map_err(|e| {
254            let (_m, e) = handle_try_send_error(e);
255            e.into()
256        })
257    }
258
259    /// Submit local tx to tx-pool
260    pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
261        send_message!(self, SubmitLocalTx, tx)
262    }
263
264    /// test if a tx can be accepted by tx-pool
265    /// Won't be broadcasted to network
266    /// won't be insert to tx-pool
267    pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
268        send_message!(self, TestAcceptTx, tx)
269    }
270
271    /// Remove tx from tx-pool
272    pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
273        send_message!(self, RemoveLocalTx, tx_hash)
274    }
275
276    /// Submit remote tx with declared cycles and origin to tx-pool
277    pub async fn submit_remote_tx(
278        &self,
279        tx: TransactionView,
280        declared_cycles: Cycle,
281        peer: PeerIndex,
282    ) -> Result<(), AnyError> {
283        send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
284    }
285
286    /// Receive txs from network, try to add txs to tx-pool
287    pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
288        send_notify!(self, NotifyTxs, txs)
289    }
290
291    /// Receive txs from network, try to add txs to tx-pool
292    pub async fn notify_txs_async(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
293        let notify = Notify::new(txs);
294        self.sender
295            .send(Message::NotifyTxs(notify))
296            .await
297            .map_err(|e| {
298                let e = ckb_error::OtherError::new(format!("SendError {e}"));
299                e.into()
300            })
301    }
302
303    /// Return tx-pool information
304    pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
305        send_message!(self, GetTxPoolInfo, ())
306    }
307
308    /// Return tx-pool information
309    pub fn get_live_cell(
310        &self,
311        out_point: OutPoint,
312        with_data: bool,
313    ) -> Result<CellStatus, AnyError> {
314        send_message!(self, GetLiveCell, (out_point, with_data))
315    }
316
317    /// Return fresh proposals
318    pub async fn fresh_proposals_filter(
319        &self,
320        proposals: Vec<ProposalShortId>,
321    ) -> Result<Vec<ProposalShortId>, AnyError> {
322        let (responder, response) = tokio::sync::oneshot::channel();
323        let request = AsyncRequest::call(proposals, responder);
324        self.sender
325            .send(Message::FreshProposalsFilter(request))
326            .await?;
327        response.await.map_err(Into::into)
328    }
329
330    /// Return tx_status for rpc (get_transaction verbosity = 1)
331    pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
332        send_message!(self, GetTxStatus, hash)
333    }
334
335    /// Return transaction_with_status for rpc (get_transaction verbosity = 2)
336    pub fn get_transaction_with_status(
337        &self,
338        hash: Byte32,
339    ) -> Result<GetTransactionWithStatusResult, AnyError> {
340        send_message!(self, GetTransactionWithStatus, hash)
341    }
342
343    /// Mainly used for compact block reconstruction and block proposal pre-broadcasting
344    /// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
345    pub async fn fetch_txs(
346        &self,
347        short_ids: HashSet<ProposalShortId>,
348    ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
349        let (responder, response) = tokio::sync::oneshot::channel();
350        let request = AsyncRequest::call(short_ids, responder);
351        self.sender.send(Message::FetchTxs(request)).await?;
352        response.await.map_err(Into::into)
353    }
354
355    /// Return txs with cycles
356    /// Mainly for relay transactions
357    pub async fn fetch_txs_with_cycles(
358        &self,
359        short_ids: HashSet<ProposalShortId>,
360    ) -> Result<FetchTxsWithCyclesResult, AnyError> {
361        let (responder, response) = tokio::sync::oneshot::channel();
362        let request = AsyncRequest::call(short_ids, responder);
363        self.sender
364            .send(Message::FetchTxsWithCycles(request))
365            .await?;
366        response.await.map_err(Into::into)
367    }
368
369    /// Clears the tx-pool, removing all txs, update snapshot.
370    pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
371        send_message!(self, ClearPool, new_snapshot)
372    }
373
374    /// Clears the tx-verify-queue.
375    pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
376        send_message!(self, ClearVerifyQueue, ())
377    }
378
379    /// Returns information about all transactions in the pool.
380    pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
381        send_message!(self, GetAllEntryInfo, ())
382    }
383
384    /// Returns the IDs of all transactions in the pool.
385    pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
386        send_message!(self, GetAllIds, ())
387    }
388
389    /// query the details of a transaction in the pool
390    pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
391        send_message!(self, GetPoolTxDetails, tx_hash)
392    }
393
394    /// Saves tx pool into disk.
395    pub fn save_pool(&self) -> Result<(), AnyError> {
396        info!("Please be patient, tx-pool are saving data into disk ...");
397        send_message!(self, SavePool, ())
398    }
399
400    /// Updates IBD state.
401    pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
402        send_message!(self, UpdateIBDState, in_ibd)
403    }
404
405    /// Estimates fee rate.
406    pub fn estimate_fee_rate(
407        &self,
408        estimate_mode: EstimateMode,
409        enable_fallback: bool,
410    ) -> Result<FeeEstimatesResult, AnyError> {
411        send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
412    }
413
414    /// Sends suspend chunk process cmd
415    pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
416        //debug!("[verify-test] run suspend_chunk_process");
417        self.chunk_tx
418            .send(ChunkCommand::Suspend)
419            .map_err(handle_send_cmd_error)
420            .map_err(Into::into)
421    }
422
423    /// Sends continue chunk process cmd
424    pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
425        //debug!("[verify-test] run continue_chunk_process");
426        self.chunk_tx
427            .send(ChunkCommand::Resume)
428            .map_err(handle_send_cmd_error)
429            .map_err(Into::into)
430    }
431
432    /// Load persisted txs into pool, assume that all txs are sorted
433    fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
434        if !txs.is_empty() {
435            info!("Loading persistent tx-pool data, total {} txs", txs.len());
436            let mut failed_txs = 0;
437            for tx in txs {
438                if self.submit_local_tx(tx)?.is_err() {
439                    failed_txs += 1;
440                }
441            }
442            if failed_txs == 0 {
443                info!("Persistent tx-pool data is loaded");
444            } else {
445                info!(
446                    "Persistent tx-pool data is loaded, {} stale txs are ignored",
447                    failed_txs
448                );
449            }
450        }
451        Ok(())
452    }
453
454    /// Plug tx-pool entry to tx-pool, skip verification. only for test
455    #[cfg(feature = "internal")]
456    pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
457        send_message!(self, PlugEntry, (entries, target))
458    }
459
460    /// Package txs with specified bytes_limit. for test
461    #[cfg(feature = "internal")]
462    pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
463        send_message!(self, PackageTxs, bytes_limit)
464    }
465
466    /// Submit local test tx to tx-pool, this tx will be put into verify queue directly.
467    pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
468        send_message!(self, SubmitLocalTestTx, tx)
469    }
470}
471
472/// A builder used to create TxPoolService.
473pub struct TxPoolServiceBuilder {
474    pub(crate) tx_pool_config: TxPoolConfig,
475    pub(crate) tx_pool_controller: TxPoolController,
476    pub(crate) snapshot: Arc<Snapshot>,
477    pub(crate) block_assembler: Option<BlockAssembler>,
478    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
479    pub(crate) callbacks: Callbacks,
480    pub(crate) receiver: mpsc::Receiver<Message>,
481    pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
482    pub(crate) signal_receiver: CancellationToken,
483    pub(crate) handle: Handle,
484    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
485    pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
486    pub(crate) started: Arc<AtomicBool>,
487    pub(crate) block_assembler_channel: (
488        mpsc::Sender<BlockAssemblerMessage>,
489        mpsc::Receiver<BlockAssemblerMessage>,
490    ),
491    pub(crate) fee_estimator: FeeEstimator,
492}
493
494impl TxPoolServiceBuilder {
495    /// Creates a new TxPoolServiceBuilder.
496    pub fn new(
497        tx_pool_config: TxPoolConfig,
498        snapshot: Arc<Snapshot>,
499        block_assembler_config: Option<BlockAssemblerConfig>,
500        txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
501        handle: &Handle,
502        tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
503        fee_estimator: FeeEstimator,
504    ) -> (TxPoolServiceBuilder, TxPoolController) {
505        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
506        let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
507        let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
508        let signal_receiver: CancellationToken = new_tokio_exit_rx();
509        let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
510        let started = Arc::new(AtomicBool::new(false));
511
512        let controller = TxPoolController {
513            sender,
514            reorg_sender,
515            handle: handle.clone(),
516            chunk_tx: Arc::new(chunk_tx),
517            started: Arc::clone(&started),
518        };
519
520        let block_assembler =
521            block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
522        let builder = TxPoolServiceBuilder {
523            tx_pool_config,
524            tx_pool_controller: controller.clone(),
525            snapshot,
526            block_assembler,
527            txs_verify_cache,
528            callbacks: Callbacks::new(),
529            receiver,
530            reorg_receiver,
531            signal_receiver,
532            handle: handle.clone(),
533            tx_relay_sender,
534            chunk_rx,
535            started,
536            block_assembler_channel,
537            fee_estimator,
538        };
539
540        (builder, controller)
541    }
542
543    /// Register new pending callback
544    pub fn register_pending(&mut self, callback: PendingCallback) {
545        self.callbacks.register_pending(callback);
546    }
547
548    /// Return cloned tx relayer sender
549    pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
550        self.tx_relay_sender.clone()
551    }
552
553    /// Register new proposed callback
554    pub fn register_proposed(&mut self, callback: ProposedCallback) {
555        self.callbacks.register_proposed(callback);
556    }
557
558    /// Register new abandon callback
559    pub fn register_reject(&mut self, callback: RejectCallback) {
560        self.callbacks.register_reject(callback);
561    }
562
563    /// Start a background thread tx-pool service by taking ownership of the Builder, and returns a TxPoolController.
564    pub fn start(self, network: NetworkController) {
565        let consensus = self.snapshot.cloned_consensus();
566
567        let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
568            self.tx_pool_config.max_tx_verify_cycles,
569        )));
570
571        let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
572        let txs = match tx_pool.load_from_file() {
573            Ok(txs) => txs,
574            Err(e) => {
575                error!("{}", e.to_string());
576                error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
577                Vec::new()
578            }
579        };
580
581        let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
582        let service = TxPoolService {
583            tx_pool_config: Arc::new(tx_pool.config.clone()),
584            tx_pool: Arc::new(RwLock::new(tx_pool)),
585            orphan: Arc::new(RwLock::new(OrphanPool::new())),
586            block_assembler: self.block_assembler,
587            txs_verify_cache: self.txs_verify_cache,
588            callbacks: Arc::new(self.callbacks),
589            tx_relay_sender: self.tx_relay_sender,
590            block_assembler_sender,
591            verify_queue: Arc::clone(&verify_queue),
592            network,
593            consensus,
594            fee_estimator: self.fee_estimator,
595        };
596
597        let mut verify_mgr =
598            VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
599        self.handle.spawn(async move { verify_mgr.run().await });
600
601        let mut receiver = self.receiver;
602        let mut reorg_receiver = self.reorg_receiver;
603        let handle_clone = self.handle.clone();
604
605        let process_service = service.clone();
606        let signal_receiver = self.signal_receiver.clone();
607        self.handle.spawn(async move {
608            loop {
609                tokio::select! {
610                    Some(message) = receiver.recv() => {
611                        let service_clone = process_service.clone();
612                        handle_clone.spawn(process(service_clone, message));
613                    },
614                    _ = signal_receiver.cancelled() => {
615                        info!("TxPool is saving, please wait...");
616                        process_service.save_pool().await;
617                        info!("TxPool process_service exit now");
618                        break
619                    },
620                    else => break,
621                }
622            }
623        });
624
625        let process_service = service.clone();
626        if let Some(ref block_assembler) = service.block_assembler {
627            let signal_receiver = self.signal_receiver.clone();
628            let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
629            if interval.is_zero() {
630                // block_assembler.update_interval_millis set zero interval should only be used for tests,
631                // external notification will be disabled.
632                ckb_logger::warn!(
633                    "block_assembler.update_interval_millis set to zero interval. \
634                    This should only be used for tests, as external notification will be disabled."
635                );
636                self.handle.spawn(async move {
637                    loop {
638                        tokio::select! {
639                            Some(message) = block_assembler_receiver.recv() => {
640                                let service_clone = process_service.clone();
641                                block_assembler::process(service_clone, &message).await;
642                            },
643                            _ = signal_receiver.cancelled() => {
644                                info!("TxPool block_assembler process service received exit signal, exit now");
645                                break
646                            },
647                            else => break,
648                        }
649                    }
650                });
651            } else {
652                self.handle.spawn(async move {
653                    let mut interval = tokio::time::interval(interval);
654                    let mut queue = LinkedHashSet::new();
655                    loop {
656                        tokio::select! {
657                            Some(message) = block_assembler_receiver.recv() => {
658                                if let BlockAssemblerMessage::Reset(..) = message {
659                                    let service_clone = process_service.clone();
660                                    queue.clear();
661                                    block_assembler::process(service_clone, &message).await;
662                                } else {
663                                    queue.insert(message);
664                                }
665                            },
666                            _ = interval.tick() => {
667                                for message in &queue {
668                                    let service_clone = process_service.clone();
669                                    block_assembler::process(service_clone, message).await;
670                                }
671                                if !queue.is_empty() {
672                                    if let Some(ref block_assembler) = process_service.block_assembler {
673                                        block_assembler.notify().await;
674                                    }
675                                }
676                                queue.clear();
677                            }
678                            _ = signal_receiver.cancelled() => {
679                                info!("TxPool block_assembler process service received exit signal, exit now");
680                                break
681                            },
682                            else => break,
683                        }
684                    }
685                });
686            }
687        }
688
689        let signal_receiver = self.signal_receiver;
690        self.handle.spawn(async move {
691            loop {
692                tokio::select! {
693                    Some(message) = reorg_receiver.recv() => {
694                        let Notify {
695                            arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
696                        } = message;
697                        let snapshot_clone = Arc::clone(&snapshot);
698                        let detached_blocks_clone = detached_blocks.clone();
699                        service.update_block_assembler_before_tx_pool_reorg(
700                            detached_blocks_clone,
701                            snapshot_clone
702                        ).await;
703
704                        let snapshot_clone = Arc::clone(&snapshot);
705                        service
706                        .update_tx_pool_for_reorg(
707                            detached_blocks,
708                            attached_blocks,
709                            detached_proposal_id,
710                            snapshot_clone,
711                        )
712                        .await;
713
714                        service.update_block_assembler_after_tx_pool_reorg().await;
715                    },
716                    _ = signal_receiver.cancelled() => {
717                        info!("TxPool reorg process service received exit signal, exit now");
718                        break
719                    },
720                    else => break,
721                }
722            }
723        });
724        self.started.store(true, Ordering::Release);
725        if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
726            error!("Failed to import persistent txs, cause: {}", err);
727        }
728    }
729}
730
731#[derive(Clone)]
732pub(crate) struct TxPoolService {
733    pub(crate) tx_pool: Arc<RwLock<TxPool>>,
734    pub(crate) orphan: Arc<RwLock<OrphanPool>>,
735    pub(crate) consensus: Arc<Consensus>,
736    pub(crate) tx_pool_config: Arc<TxPoolConfig>,
737    pub(crate) block_assembler: Option<BlockAssembler>,
738    pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
739    pub(crate) callbacks: Arc<Callbacks>,
740    pub(crate) network: NetworkController,
741    pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
742    pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
743    pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
744    pub(crate) fee_estimator: FeeEstimator,
745}
746
747/// tx verification result
748pub enum TxVerificationResult {
749    /// tx is verified
750    Ok {
751        /// original peer
752        original_peer: Option<PeerIndex>,
753        /// transaction hash
754        tx_hash: Byte32,
755    },
756    /// tx parent is unknown
757    UnknownParents {
758        /// original peer
759        peer: PeerIndex,
760        /// parents hashes
761        parents: HashSet<Byte32>,
762    },
763    /// tx is rejected
764    Reject {
765        /// transaction hash
766        tx_hash: Byte32,
767    },
768}
769
770#[allow(clippy::cognitive_complexity)]
771async fn process(mut service: TxPoolService, message: Message) {
772    match message {
773        Message::GetTxPoolInfo(Request { responder, .. }) => {
774            let info = service.info().await;
775            if let Err(e) = responder.send(info) {
776                error!("Responder sending get_tx_pool_info failed {:?}", e);
777            };
778        }
779        Message::GetLiveCell(Request {
780            responder,
781            arguments: (out_point, with_data),
782        }) => {
783            let live_cell_status = service.get_live_cell(out_point, with_data).await;
784            if let Err(e) = responder.send(live_cell_status) {
785                error!("Responder sending get_live_cell failed {:?}", e);
786            };
787        }
788        Message::BlockTemplate(Request {
789            responder,
790            arguments: (_bytes_limit, _proposals_limit, _max_version),
791        }) => {
792            let block_template_result = service.get_block_template().await;
793            if let Err(e) = responder.send(block_template_result) {
794                error!("Responder sending block_template_result failed {:?}", e);
795            };
796        }
797        Message::SubmitLocalTx(Request {
798            responder,
799            arguments: tx,
800        }) => {
801            let result = service.process_tx(tx, None).await.map(|_| ());
802            if let Err(e) = responder.send(result) {
803                error!("Responder sending submit_tx result failed {:?}", e);
804            };
805        }
806        Message::SubmitLocalTestTx(Request {
807            responder,
808            arguments: tx,
809        }) => {
810            let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
811            if let Err(e) = responder.send(result) {
812                error!("Responder sending submit_tx result failed {:?}", e);
813            };
814        }
815        Message::RemoveLocalTx(Request {
816            responder,
817            arguments: tx_hash,
818        }) => {
819            let result = service.remove_tx(tx_hash).await;
820            if let Err(e) = responder.send(result) {
821                error!("Responder sending remove_tx result failed {:?}", e);
822            };
823        }
824        Message::TestAcceptTx(Request {
825            responder,
826            arguments: tx,
827        }) => {
828            let result = service.test_accept_tx(tx).await;
829            if let Err(e) = responder.send(result.map(|r| r.into())) {
830                error!("Responder sending test_accept_tx result failed {:?}", e);
831            };
832        }
833        Message::SubmitRemoteTx(Request {
834            responder,
835            arguments: (tx, declared_cycles, peer),
836        }) => {
837            let _result = service
838                .resumeble_process_tx(tx, Some((declared_cycles, peer)))
839                .await;
840            if let Err(e) = responder.send(()) {
841                error!("Responder sending submit_tx result failed {:?}", e);
842            };
843        }
844        Message::NotifyTxs(Notify { arguments: txs }) => {
845            for tx in txs {
846                let _ret = service.resumeble_process_tx(tx, None).await;
847            }
848        }
849        Message::FreshProposalsFilter(AsyncRequest {
850            responder,
851            arguments: mut proposals,
852        }) => {
853            let tx_pool = service.tx_pool.read().await;
854            proposals.retain(|id| !tx_pool.contains_proposal_id(id));
855            if let Err(e) = responder.send(proposals) {
856                error!("Responder sending fresh_proposals_filter failed {:?}", e);
857            };
858        }
859        Message::GetTxStatus(Request {
860            responder,
861            arguments: hash,
862        }) => {
863            let id = ProposalShortId::from_tx_hash(&hash);
864            let tx_pool = service.tx_pool.read().await;
865            let ret = if let Some(PoolEntry {
866                status,
867                inner: entry,
868                ..
869            }) = tx_pool.pool_map.get_by_id(&id)
870            {
871                let status = if status == &Status::Proposed {
872                    TxStatus::Proposed
873                } else {
874                    TxStatus::Pending
875                };
876                Ok((status, Some(entry.cycles)))
877            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
878                let recent_reject_result = recent_reject_db.get(&hash);
879                if let Ok(recent_reject) = recent_reject_result {
880                    if let Some(record) = recent_reject {
881                        Ok((TxStatus::Rejected(record), None))
882                    } else {
883                        Ok((TxStatus::Unknown, None))
884                    }
885                } else {
886                    Err(recent_reject_result.unwrap_err())
887                }
888            } else {
889                Ok((TxStatus::Unknown, None))
890            };
891
892            if let Err(e) = responder.send(ret) {
893                error!("Responder sending get_tx_status failed {:?}", e)
894            };
895        }
896        Message::GetTransactionWithStatus(Request {
897            responder,
898            arguments: hash,
899        }) => {
900            let id = ProposalShortId::from_tx_hash(&hash);
901            let tx_pool = service.tx_pool.read().await;
902            let ret = if let Some(PoolEntry {
903                status,
904                inner: entry,
905                ..
906            }) = tx_pool.pool_map.get_by_id(&id)
907            {
908                let (tx_status, min_replace_fee) = if status == &Status::Proposed {
909                    (TxStatus::Proposed, None)
910                } else {
911                    (TxStatus::Pending, tx_pool.min_replace_fee(entry))
912                };
913                Ok(TransactionWithStatus::with_status(
914                    Some(entry.transaction().clone()),
915                    entry.cycles,
916                    entry.timestamp,
917                    tx_status,
918                    Some(entry.fee),
919                    min_replace_fee,
920                ))
921            } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
922                match recent_reject_db.get(&hash) {
923                    Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
924                    Ok(_) => Ok(TransactionWithStatus::with_unknown()),
925                    Err(err) => Err(err),
926                }
927            } else {
928                Ok(TransactionWithStatus::with_unknown())
929            };
930
931            if let Err(e) = responder.send(ret) {
932                error!("Responder sending get_tx_status failed {:?}", e)
933            };
934        }
935        Message::FetchTxs(AsyncRequest {
936            responder,
937            arguments: short_ids,
938        }) => {
939            let tx_pool = service.tx_pool.read().await;
940            let orphan = service.orphan.read().await;
941            let txs = short_ids
942                .into_iter()
943                .filter_map(|short_id| {
944                    tx_pool
945                        .get_tx_from_pool_or_store(&short_id)
946                        .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
947                        .map(|tx| (short_id, tx))
948                })
949                .collect();
950            if let Err(e) = responder.send(txs) {
951                error!("Responder sending fetch_txs failed {:?}", e);
952            };
953        }
954        Message::FetchTxsWithCycles(AsyncRequest {
955            responder,
956            arguments: short_ids,
957        }) => {
958            let tx_pool = service.tx_pool.read().await;
959            let txs = short_ids
960                .into_iter()
961                .filter_map(|short_id| {
962                    tx_pool
963                        .get_tx_with_cycles(&short_id)
964                        .map(|(tx, cycles)| (short_id, (tx, cycles)))
965                })
966                .collect();
967            if let Err(e) = responder.send(txs) {
968                error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
969            };
970        }
971        Message::NewUncle(Notify { arguments: uncle }) => {
972            service.receive_candidate_uncle(uncle).await;
973        }
974        Message::ClearPool(Request {
975            responder,
976            arguments: new_snapshot,
977        }) => {
978            service.clear_pool(new_snapshot).await;
979            if let Err(e) = responder.send(()) {
980                error!("Responder sending clear_pool failed {:?}", e)
981            };
982        }
983        Message::ClearVerifyQueue(Request { responder, .. }) => {
984            service.verify_queue.write().await.clear();
985            if let Err(e) = responder.send(()) {
986                error!("Responder sending clear_verify_queue failed {:?}", e)
987            };
988        }
989        Message::GetPoolTxDetails(Request {
990            responder,
991            arguments: tx_hash,
992        }) => {
993            let tx_pool = service.tx_pool.read().await;
994            let id = ProposalShortId::from_tx_hash(&tx_hash);
995            let tx_details = tx_pool
996                .get_tx_detail(&id)
997                .unwrap_or(PoolTxDetailInfo::with_unknown());
998            if let Err(e) = responder.send(tx_details) {
999                error!("responder send get_pool_tx_details failed {:?}", e)
1000            };
1001        }
1002        Message::GetAllEntryInfo(Request { responder, .. }) => {
1003            let tx_pool = service.tx_pool.read().await;
1004            let info = tx_pool.get_all_entry_info();
1005            if let Err(e) = responder.send(info) {
1006                error!("Responder sending get_all_entry_info failed {:?}", e)
1007            };
1008        }
1009        Message::GetAllIds(Request { responder, .. }) => {
1010            let tx_pool = service.tx_pool.read().await;
1011            let ids = tx_pool.get_ids();
1012            if let Err(e) = responder.send(ids) {
1013                error!("Responder sending get_ids failed {:?}", e)
1014            };
1015        }
1016        Message::SavePool(Request { responder, .. }) => {
1017            service.save_pool().await;
1018            if let Err(e) = responder.send(()) {
1019                error!("Responder sending save_pool failed {:?}", e)
1020            };
1021        }
1022        Message::UpdateIBDState(Request {
1023            responder,
1024            arguments: in_ibd,
1025        }) => {
1026            service.update_ibd_state(in_ibd).await;
1027            if let Err(e) = responder.send(()) {
1028                error!("Responder sending update_ibd_state failed {:?}", e)
1029            };
1030        }
1031        Message::EstimateFeeRate(Request {
1032            responder,
1033            arguments: (estimate_mode, enable_fallback),
1034        }) => {
1035            let fee_estimates_result = service
1036                .estimate_fee_rate(estimate_mode, enable_fallback)
1037                .await;
1038            if let Err(e) = responder.send(fee_estimates_result) {
1039                error!("Responder sending fee_estimates_result failed {:?}", e)
1040            };
1041        }
1042        #[cfg(feature = "internal")]
1043        Message::PlugEntry(Request {
1044            responder,
1045            arguments: (entries, target),
1046        }) => {
1047            service.plug_entry(entries, target).await;
1048
1049            if let Err(e) = responder.send(()) {
1050                error!("Responder sending plug_entry failed {:?}", e);
1051            };
1052        }
1053        #[cfg(feature = "internal")]
1054        Message::PackageTxs(Request {
1055            responder,
1056            arguments: bytes_limit,
1057        }) => {
1058            let max_block_cycles = service.consensus.max_block_cycles();
1059            let max_block_bytes = service.consensus.max_block_bytes();
1060            let tx_pool = service.tx_pool.read().await;
1061            let (txs, _size, _cycles) = tx_pool.package_txs(
1062                max_block_cycles,
1063                bytes_limit.unwrap_or(max_block_bytes) as usize,
1064            );
1065            if let Err(e) = responder.send(txs) {
1066                error!("Responder sending plug_entry failed {:?}", e);
1067            };
1068        }
1069    }
1070}
1071
1072impl TxPoolService {
1073    /// Tx-pool information
1074    async fn info(&self) -> TxPoolInfo {
1075        let tx_pool = self.tx_pool.read().await;
1076        let orphan = self.orphan.read().await;
1077        let verify_queue = self.verify_queue.read().await;
1078        let tip_header = tx_pool.snapshot.tip_header();
1079        TxPoolInfo {
1080            tip_hash: tip_header.hash(),
1081            tip_number: tip_header.number(),
1082            pending_size: tx_pool.pool_map.pending_size(),
1083            proposed_size: tx_pool.pool_map.proposed_size(),
1084            orphan_size: orphan.len(),
1085            total_tx_size: tx_pool.pool_map.total_tx_size,
1086            total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1087            min_fee_rate: self.tx_pool_config.min_fee_rate,
1088            min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1089            last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1090            tx_size_limit: TRANSACTION_SIZE_LIMIT,
1091            max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1092            verify_queue_size: verify_queue.len(),
1093        }
1094    }
1095
1096    /// Get Live Cell Status
1097    async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1098        let tx_pool = self.tx_pool.read().await;
1099        let snapshot = tx_pool.snapshot();
1100        let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1101        let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1102
1103        match provider.cell(&out_point, false) {
1104            CellStatus::Live(mut cell_meta) => {
1105                if eager_load {
1106                    if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1107                        cell_meta.mem_cell_data = Some(data);
1108                        cell_meta.mem_cell_data_hash = Some(data_hash);
1109                    }
1110                }
1111                CellStatus::live_cell(cell_meta)
1112            }
1113            _ => CellStatus::Unknown,
1114        }
1115    }
1116
1117    pub fn should_notify_block_assembler(&self) -> bool {
1118        self.block_assembler.is_some()
1119    }
1120
1121    pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1122        if let Some(ref block_assembler) = self.block_assembler {
1123            {
1124                block_assembler.candidate_uncles.lock().await.insert(uncle);
1125            }
1126            if self
1127                .block_assembler_sender
1128                .send(BlockAssemblerMessage::Uncle)
1129                .await
1130                .is_err()
1131            {
1132                error!("block_assembler receiver dropped");
1133            }
1134        }
1135    }
1136
1137    pub async fn update_block_assembler_before_tx_pool_reorg(
1138        &self,
1139        detached_blocks: VecDeque<BlockView>,
1140        snapshot: Arc<Snapshot>,
1141    ) {
1142        if let Some(ref block_assembler) = self.block_assembler {
1143            {
1144                let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1145                for detached_block in detached_blocks {
1146                    candidate_uncles.insert(detached_block.as_uncle());
1147                }
1148            }
1149
1150            if let Err(e) = block_assembler.update_blank(snapshot).await {
1151                error!("block_assembler update_blank error {}", e);
1152            }
1153            block_assembler.notify().await;
1154        }
1155    }
1156
1157    pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1158        if let Some(ref block_assembler) = self.block_assembler {
1159            if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1160                error!("block_assembler update failed {:?}", e);
1161            }
1162            block_assembler.notify().await;
1163        }
1164    }
1165
1166    #[cfg(feature = "internal")]
1167    pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1168        {
1169            let mut tx_pool = self.tx_pool.write().await;
1170            match target {
1171                PlugTarget::Pending => {
1172                    for entry in entries {
1173                        tx_pool
1174                            .add_pending(entry)
1175                            .expect("Plug entry add_pending error");
1176                    }
1177                }
1178                PlugTarget::Proposed => {
1179                    for entry in entries {
1180                        tx_pool
1181                            .add_proposed(entry)
1182                            .expect("Plug entry add_proposed error");
1183                    }
1184                }
1185            };
1186        }
1187
1188        if self.should_notify_block_assembler() {
1189            let msg = match target {
1190                PlugTarget::Pending => BlockAssemblerMessage::Pending,
1191                PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1192            };
1193            if self.block_assembler_sender.send(msg).await.is_err() {
1194                error!("block_assembler receiver dropped");
1195            }
1196        }
1197    }
1198}