1use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26 core::{
27 BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28 cell::{CellProvider, CellStatus, OverlayCellProvider},
29 tx_pool::{
30 EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31 TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32 },
33 },
34 packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40 Arc,
41 atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57 pub responder: oneshot::Sender<R>,
58 pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62 pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63 Request {
64 responder,
65 arguments,
66 }
67 }
68}
69
70pub(crate) struct AsyncRequest<A, R> {
71 pub responder: tokio::sync::oneshot::Sender<R>,
72 pub arguments: A,
73}
74
75impl<A, R> AsyncRequest<A, R> {
76 pub(crate) fn call(
77 arguments: A,
78 responder: tokio::sync::oneshot::Sender<R>,
79 ) -> AsyncRequest<A, R> {
80 AsyncRequest {
81 responder,
82 arguments,
83 }
84 }
85}
86
87pub(crate) struct Notify<A> {
88 pub arguments: A,
89}
90
91impl<A> Notify<A> {
92 pub(crate) fn new(arguments: A) -> Notify<A> {
93 Notify { arguments }
94 }
95}
96
97pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
98type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
99
100pub(crate) type SubmitTxResult = Result<(), Reject>;
101
102pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
103
104type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
105type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
106type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
107
108pub(crate) type ChainReorgArgs = (
109 VecDeque<BlockView>,
110 VecDeque<BlockView>,
111 HashSet<ProposalShortId>,
112 Arc<Snapshot>,
113);
114
115pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
116
117pub(crate) enum Message {
118 BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
119 SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
120 RemoveLocalTx(Request<Byte32, bool>),
121 TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
122 SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
123 NotifyTxs(Notify<Vec<TransactionView>>),
124 FreshProposalsFilter(AsyncRequest<Vec<ProposalShortId>, Vec<ProposalShortId>>),
125 FetchTxs(AsyncRequest<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
126 FetchTxsWithCycles(AsyncRequest<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
127 GetTxPoolInfo(Request<(), TxPoolInfo>),
128 GetLiveCell(Request<(OutPoint, bool), CellStatus>),
129 GetTxStatus(Request<Byte32, GetTxStatusResult>),
130 GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
131 NewUncle(Notify<UncleBlockView>),
132 ClearPool(Request<Arc<Snapshot>, ()>),
133 ClearVerifyQueue(Request<(), ()>),
134 GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
135 GetAllIds(Request<(), TxPoolIds>),
136 SavePool(Request<(), ()>),
137 GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
138
139 UpdateIBDState(Request<bool, ()>),
140 EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
141
142 #[cfg(feature = "internal")]
144 PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
145 #[cfg(feature = "internal")]
146 PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
147 SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
148}
149
150#[derive(Debug, Hash, Eq, PartialEq)]
151pub(crate) enum BlockAssemblerMessage {
152 Pending,
153 Proposed,
154 Uncle,
155 Reset(Arc<Snapshot>),
156}
157
158#[derive(Clone)]
162pub struct TxPoolController {
163 sender: mpsc::Sender<Message>,
164 reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
165 chunk_tx: Arc<watch::Sender<ChunkCommand>>,
166 handle: Handle,
167 started: Arc<AtomicBool>,
168}
169
170macro_rules! send_message {
171 ($self:ident, $msg_type:ident, $args:expr) => {{
172 let (responder, response) = oneshot::channel();
173 let request = Request::call($args, responder);
174 $self
175 .sender
176 .try_send(Message::$msg_type(request))
177 .map_err(|e| {
178 let (_m, e) = handle_try_send_error(e);
179 e
180 })?;
181 block_in_place(|| response.recv())
182 .map_err(handle_recv_error)
183 .map_err(Into::into)
184 }};
185}
186
187macro_rules! send_notify {
188 ($self:ident, $msg_type:ident, $args:expr) => {{
189 let notify = Notify::new($args);
190 $self
191 .sender
192 .try_send(Message::$msg_type(notify))
193 .map_err(|e| {
194 let (_m, e) = handle_try_send_error(e);
195 e.into()
196 })
197 }};
198}
199
200impl TxPoolController {
201 pub fn service_started(&self) -> bool {
203 self.started.load(Ordering::Acquire)
204 }
205
206 #[cfg(feature = "internal")]
208 pub fn set_service_started(&self, v: bool) {
209 self.started.store(v, Ordering::Release);
210 }
211
212 pub fn handle(&self) -> &Handle {
214 &self.handle
215 }
216
217 pub fn get_block_template(
219 &self,
220 bytes_limit: Option<u64>,
221 proposals_limit: Option<u64>,
222 max_version: Option<Version>,
223 ) -> Result<BlockTemplateResult, AnyError> {
224 send_message!(
225 self,
226 BlockTemplate,
227 (bytes_limit, proposals_limit, max_version)
228 )
229 }
230
231 pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
233 send_notify!(self, NewUncle, uncle)
234 }
235
236 pub fn update_tx_pool_for_reorg(
241 &self,
242 detached_blocks: VecDeque<BlockView>,
243 attached_blocks: VecDeque<BlockView>,
244 detached_proposal_id: HashSet<ProposalShortId>,
245 snapshot: Arc<Snapshot>,
246 ) -> Result<(), AnyError> {
247 let notify = Notify::new((
248 detached_blocks,
249 attached_blocks,
250 detached_proposal_id,
251 snapshot,
252 ));
253 self.reorg_sender.try_send(notify).map_err(|e| {
254 let (_m, e) = handle_try_send_error(e);
255 e.into()
256 })
257 }
258
259 pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
261 send_message!(self, SubmitLocalTx, tx)
262 }
263
264 pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
268 send_message!(self, TestAcceptTx, tx)
269 }
270
271 pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
273 send_message!(self, RemoveLocalTx, tx_hash)
274 }
275
276 pub async fn submit_remote_tx(
278 &self,
279 tx: TransactionView,
280 declared_cycles: Cycle,
281 peer: PeerIndex,
282 ) -> Result<(), AnyError> {
283 send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
284 }
285
286 pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
288 send_notify!(self, NotifyTxs, txs)
289 }
290
291 pub async fn notify_txs_async(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
293 let notify = Notify::new(txs);
294 self.sender
295 .send(Message::NotifyTxs(notify))
296 .await
297 .map_err(|e| {
298 let e = ckb_error::OtherError::new(format!("SendError {e}"));
299 e.into()
300 })
301 }
302
303 pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
305 send_message!(self, GetTxPoolInfo, ())
306 }
307
308 pub fn get_live_cell(
310 &self,
311 out_point: OutPoint,
312 with_data: bool,
313 ) -> Result<CellStatus, AnyError> {
314 send_message!(self, GetLiveCell, (out_point, with_data))
315 }
316
317 pub async fn fresh_proposals_filter(
319 &self,
320 proposals: Vec<ProposalShortId>,
321 ) -> Result<Vec<ProposalShortId>, AnyError> {
322 let (responder, response) = tokio::sync::oneshot::channel();
323 let request = AsyncRequest::call(proposals, responder);
324 self.sender
325 .send(Message::FreshProposalsFilter(request))
326 .await?;
327 response.await.map_err(Into::into)
328 }
329
330 pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
332 send_message!(self, GetTxStatus, hash)
333 }
334
335 pub fn get_transaction_with_status(
337 &self,
338 hash: Byte32,
339 ) -> Result<GetTransactionWithStatusResult, AnyError> {
340 send_message!(self, GetTransactionWithStatus, hash)
341 }
342
343 pub async fn fetch_txs(
346 &self,
347 short_ids: HashSet<ProposalShortId>,
348 ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
349 let (responder, response) = tokio::sync::oneshot::channel();
350 let request = AsyncRequest::call(short_ids, responder);
351 self.sender.send(Message::FetchTxs(request)).await?;
352 response.await.map_err(Into::into)
353 }
354
355 pub async fn fetch_txs_with_cycles(
358 &self,
359 short_ids: HashSet<ProposalShortId>,
360 ) -> Result<FetchTxsWithCyclesResult, AnyError> {
361 let (responder, response) = tokio::sync::oneshot::channel();
362 let request = AsyncRequest::call(short_ids, responder);
363 self.sender
364 .send(Message::FetchTxsWithCycles(request))
365 .await?;
366 response.await.map_err(Into::into)
367 }
368
369 pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
371 send_message!(self, ClearPool, new_snapshot)
372 }
373
374 pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
376 send_message!(self, ClearVerifyQueue, ())
377 }
378
379 pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
381 send_message!(self, GetAllEntryInfo, ())
382 }
383
384 pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
386 send_message!(self, GetAllIds, ())
387 }
388
389 pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
391 send_message!(self, GetPoolTxDetails, tx_hash)
392 }
393
394 pub fn save_pool(&self) -> Result<(), AnyError> {
396 info!("Please be patient, tx-pool are saving data into disk ...");
397 send_message!(self, SavePool, ())
398 }
399
400 pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
402 send_message!(self, UpdateIBDState, in_ibd)
403 }
404
405 pub fn estimate_fee_rate(
407 &self,
408 estimate_mode: EstimateMode,
409 enable_fallback: bool,
410 ) -> Result<FeeEstimatesResult, AnyError> {
411 send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
412 }
413
414 pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
416 self.chunk_tx
418 .send(ChunkCommand::Suspend)
419 .map_err(handle_send_cmd_error)
420 .map_err(Into::into)
421 }
422
423 pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
425 self.chunk_tx
427 .send(ChunkCommand::Resume)
428 .map_err(handle_send_cmd_error)
429 .map_err(Into::into)
430 }
431
432 fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
434 if !txs.is_empty() {
435 info!("Loading persistent tx-pool data, total {} txs", txs.len());
436 let mut failed_txs = 0;
437 for tx in txs {
438 if self.submit_local_tx(tx)?.is_err() {
439 failed_txs += 1;
440 }
441 }
442 if failed_txs == 0 {
443 info!("Persistent tx-pool data is loaded");
444 } else {
445 info!(
446 "Persistent tx-pool data is loaded, {} stale txs are ignored",
447 failed_txs
448 );
449 }
450 }
451 Ok(())
452 }
453
454 #[cfg(feature = "internal")]
456 pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
457 send_message!(self, PlugEntry, (entries, target))
458 }
459
460 #[cfg(feature = "internal")]
462 pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
463 send_message!(self, PackageTxs, bytes_limit)
464 }
465
466 pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
468 send_message!(self, SubmitLocalTestTx, tx)
469 }
470}
471
472pub struct TxPoolServiceBuilder {
474 pub(crate) tx_pool_config: TxPoolConfig,
475 pub(crate) tx_pool_controller: TxPoolController,
476 pub(crate) snapshot: Arc<Snapshot>,
477 pub(crate) block_assembler: Option<BlockAssembler>,
478 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
479 pub(crate) callbacks: Callbacks,
480 pub(crate) receiver: mpsc::Receiver<Message>,
481 pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
482 pub(crate) signal_receiver: CancellationToken,
483 pub(crate) handle: Handle,
484 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
485 pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
486 pub(crate) started: Arc<AtomicBool>,
487 pub(crate) block_assembler_channel: (
488 mpsc::Sender<BlockAssemblerMessage>,
489 mpsc::Receiver<BlockAssemblerMessage>,
490 ),
491 pub(crate) fee_estimator: FeeEstimator,
492}
493
494impl TxPoolServiceBuilder {
495 pub fn new(
497 tx_pool_config: TxPoolConfig,
498 snapshot: Arc<Snapshot>,
499 block_assembler_config: Option<BlockAssemblerConfig>,
500 txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
501 handle: &Handle,
502 tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
503 fee_estimator: FeeEstimator,
504 ) -> (TxPoolServiceBuilder, TxPoolController) {
505 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
506 let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
507 let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
508 let signal_receiver: CancellationToken = new_tokio_exit_rx();
509 let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
510 let started = Arc::new(AtomicBool::new(false));
511
512 let controller = TxPoolController {
513 sender,
514 reorg_sender,
515 handle: handle.clone(),
516 chunk_tx: Arc::new(chunk_tx),
517 started: Arc::clone(&started),
518 };
519
520 let block_assembler =
521 block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
522 let builder = TxPoolServiceBuilder {
523 tx_pool_config,
524 tx_pool_controller: controller.clone(),
525 snapshot,
526 block_assembler,
527 txs_verify_cache,
528 callbacks: Callbacks::new(),
529 receiver,
530 reorg_receiver,
531 signal_receiver,
532 handle: handle.clone(),
533 tx_relay_sender,
534 chunk_rx,
535 started,
536 block_assembler_channel,
537 fee_estimator,
538 };
539
540 (builder, controller)
541 }
542
543 pub fn register_pending(&mut self, callback: PendingCallback) {
545 self.callbacks.register_pending(callback);
546 }
547
548 pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
550 self.tx_relay_sender.clone()
551 }
552
553 pub fn register_proposed(&mut self, callback: ProposedCallback) {
555 self.callbacks.register_proposed(callback);
556 }
557
558 pub fn register_reject(&mut self, callback: RejectCallback) {
560 self.callbacks.register_reject(callback);
561 }
562
563 pub fn start(self, network: NetworkController) {
565 let consensus = self.snapshot.cloned_consensus();
566
567 let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
568 self.tx_pool_config.max_tx_verify_cycles,
569 )));
570
571 let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
572 let txs = match tx_pool.load_from_file() {
573 Ok(txs) => txs,
574 Err(e) => {
575 error!("{}", e.to_string());
576 error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
577 Vec::new()
578 }
579 };
580
581 let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
582 let service = TxPoolService {
583 tx_pool_config: Arc::new(tx_pool.config.clone()),
584 tx_pool: Arc::new(RwLock::new(tx_pool)),
585 orphan: Arc::new(RwLock::new(OrphanPool::new())),
586 block_assembler: self.block_assembler,
587 txs_verify_cache: self.txs_verify_cache,
588 callbacks: Arc::new(self.callbacks),
589 tx_relay_sender: self.tx_relay_sender,
590 block_assembler_sender,
591 verify_queue: Arc::clone(&verify_queue),
592 network,
593 consensus,
594 fee_estimator: self.fee_estimator,
595 };
596
597 let mut verify_mgr =
598 VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
599 self.handle.spawn(async move { verify_mgr.run().await });
600
601 let mut receiver = self.receiver;
602 let mut reorg_receiver = self.reorg_receiver;
603 let handle_clone = self.handle.clone();
604
605 let process_service = service.clone();
606 let signal_receiver = self.signal_receiver.clone();
607 self.handle.spawn(async move {
608 loop {
609 tokio::select! {
610 Some(message) = receiver.recv() => {
611 let service_clone = process_service.clone();
612 handle_clone.spawn(process(service_clone, message));
613 },
614 _ = signal_receiver.cancelled() => {
615 info!("TxPool is saving, please wait...");
616 process_service.save_pool().await;
617 info!("TxPool process_service exit now");
618 break
619 },
620 else => break,
621 }
622 }
623 });
624
625 let process_service = service.clone();
626 if let Some(ref block_assembler) = service.block_assembler {
627 let signal_receiver = self.signal_receiver.clone();
628 let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
629 if interval.is_zero() {
630 ckb_logger::warn!(
633 "block_assembler.update_interval_millis set to zero interval. \
634 This should only be used for tests, as external notification will be disabled."
635 );
636 self.handle.spawn(async move {
637 loop {
638 tokio::select! {
639 Some(message) = block_assembler_receiver.recv() => {
640 let service_clone = process_service.clone();
641 block_assembler::process(service_clone, &message).await;
642 },
643 _ = signal_receiver.cancelled() => {
644 info!("TxPool block_assembler process service received exit signal, exit now");
645 break
646 },
647 else => break,
648 }
649 }
650 });
651 } else {
652 self.handle.spawn(async move {
653 let mut interval = tokio::time::interval(interval);
654 let mut queue = LinkedHashSet::new();
655 loop {
656 tokio::select! {
657 Some(message) = block_assembler_receiver.recv() => {
658 if let BlockAssemblerMessage::Reset(..) = message {
659 let service_clone = process_service.clone();
660 queue.clear();
661 block_assembler::process(service_clone, &message).await;
662 } else {
663 queue.insert(message);
664 }
665 },
666 _ = interval.tick() => {
667 for message in &queue {
668 let service_clone = process_service.clone();
669 block_assembler::process(service_clone, message).await;
670 }
671 if !queue.is_empty() {
672 if let Some(ref block_assembler) = process_service.block_assembler {
673 block_assembler.notify().await;
674 }
675 }
676 queue.clear();
677 }
678 _ = signal_receiver.cancelled() => {
679 info!("TxPool block_assembler process service received exit signal, exit now");
680 break
681 },
682 else => break,
683 }
684 }
685 });
686 }
687 }
688
689 let signal_receiver = self.signal_receiver;
690 self.handle.spawn(async move {
691 loop {
692 tokio::select! {
693 Some(message) = reorg_receiver.recv() => {
694 let Notify {
695 arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
696 } = message;
697 let snapshot_clone = Arc::clone(&snapshot);
698 let detached_blocks_clone = detached_blocks.clone();
699 service.update_block_assembler_before_tx_pool_reorg(
700 detached_blocks_clone,
701 snapshot_clone
702 ).await;
703
704 let snapshot_clone = Arc::clone(&snapshot);
705 service
706 .update_tx_pool_for_reorg(
707 detached_blocks,
708 attached_blocks,
709 detached_proposal_id,
710 snapshot_clone,
711 )
712 .await;
713
714 service.update_block_assembler_after_tx_pool_reorg().await;
715 },
716 _ = signal_receiver.cancelled() => {
717 info!("TxPool reorg process service received exit signal, exit now");
718 break
719 },
720 else => break,
721 }
722 }
723 });
724 self.started.store(true, Ordering::Release);
725 if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
726 error!("Failed to import persistent txs, cause: {}", err);
727 }
728 }
729}
730
731#[derive(Clone)]
732pub(crate) struct TxPoolService {
733 pub(crate) tx_pool: Arc<RwLock<TxPool>>,
734 pub(crate) orphan: Arc<RwLock<OrphanPool>>,
735 pub(crate) consensus: Arc<Consensus>,
736 pub(crate) tx_pool_config: Arc<TxPoolConfig>,
737 pub(crate) block_assembler: Option<BlockAssembler>,
738 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
739 pub(crate) callbacks: Arc<Callbacks>,
740 pub(crate) network: NetworkController,
741 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
742 pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
743 pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
744 pub(crate) fee_estimator: FeeEstimator,
745}
746
747pub enum TxVerificationResult {
749 Ok {
751 original_peer: Option<PeerIndex>,
753 tx_hash: Byte32,
755 },
756 UnknownParents {
758 peer: PeerIndex,
760 parents: HashSet<Byte32>,
762 },
763 Reject {
765 tx_hash: Byte32,
767 },
768}
769
770#[allow(clippy::cognitive_complexity)]
771async fn process(mut service: TxPoolService, message: Message) {
772 match message {
773 Message::GetTxPoolInfo(Request { responder, .. }) => {
774 let info = service.info().await;
775 if let Err(e) = responder.send(info) {
776 error!("Responder sending get_tx_pool_info failed {:?}", e);
777 };
778 }
779 Message::GetLiveCell(Request {
780 responder,
781 arguments: (out_point, with_data),
782 }) => {
783 let live_cell_status = service.get_live_cell(out_point, with_data).await;
784 if let Err(e) = responder.send(live_cell_status) {
785 error!("Responder sending get_live_cell failed {:?}", e);
786 };
787 }
788 Message::BlockTemplate(Request {
789 responder,
790 arguments: (_bytes_limit, _proposals_limit, _max_version),
791 }) => {
792 let block_template_result = service.get_block_template().await;
793 if let Err(e) = responder.send(block_template_result) {
794 error!("Responder sending block_template_result failed {:?}", e);
795 };
796 }
797 Message::SubmitLocalTx(Request {
798 responder,
799 arguments: tx,
800 }) => {
801 let result = service.process_tx(tx, None).await.map(|_| ());
802 if let Err(e) = responder.send(result) {
803 error!("Responder sending submit_tx result failed {:?}", e);
804 };
805 }
806 Message::SubmitLocalTestTx(Request {
807 responder,
808 arguments: tx,
809 }) => {
810 let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
811 if let Err(e) = responder.send(result) {
812 error!("Responder sending submit_tx result failed {:?}", e);
813 };
814 }
815 Message::RemoveLocalTx(Request {
816 responder,
817 arguments: tx_hash,
818 }) => {
819 let result = service.remove_tx(tx_hash).await;
820 if let Err(e) = responder.send(result) {
821 error!("Responder sending remove_tx result failed {:?}", e);
822 };
823 }
824 Message::TestAcceptTx(Request {
825 responder,
826 arguments: tx,
827 }) => {
828 let result = service.test_accept_tx(tx).await;
829 if let Err(e) = responder.send(result.map(|r| r.into())) {
830 error!("Responder sending test_accept_tx result failed {:?}", e);
831 };
832 }
833 Message::SubmitRemoteTx(Request {
834 responder,
835 arguments: (tx, declared_cycles, peer),
836 }) => {
837 let _result = service
838 .resumeble_process_tx(tx, Some((declared_cycles, peer)))
839 .await;
840 if let Err(e) = responder.send(()) {
841 error!("Responder sending submit_tx result failed {:?}", e);
842 };
843 }
844 Message::NotifyTxs(Notify { arguments: txs }) => {
845 for tx in txs {
846 let _ret = service.resumeble_process_tx(tx, None).await;
847 }
848 }
849 Message::FreshProposalsFilter(AsyncRequest {
850 responder,
851 arguments: mut proposals,
852 }) => {
853 let tx_pool = service.tx_pool.read().await;
854 proposals.retain(|id| !tx_pool.contains_proposal_id(id));
855 if let Err(e) = responder.send(proposals) {
856 error!("Responder sending fresh_proposals_filter failed {:?}", e);
857 };
858 }
859 Message::GetTxStatus(Request {
860 responder,
861 arguments: hash,
862 }) => {
863 let id = ProposalShortId::from_tx_hash(&hash);
864 let tx_pool = service.tx_pool.read().await;
865 let ret = if let Some(PoolEntry {
866 status,
867 inner: entry,
868 ..
869 }) = tx_pool.pool_map.get_by_id(&id)
870 {
871 let status = if status == &Status::Proposed {
872 TxStatus::Proposed
873 } else {
874 TxStatus::Pending
875 };
876 Ok((status, Some(entry.cycles)))
877 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
878 let recent_reject_result = recent_reject_db.get(&hash);
879 if let Ok(recent_reject) = recent_reject_result {
880 if let Some(record) = recent_reject {
881 Ok((TxStatus::Rejected(record), None))
882 } else {
883 Ok((TxStatus::Unknown, None))
884 }
885 } else {
886 Err(recent_reject_result.unwrap_err())
887 }
888 } else {
889 Ok((TxStatus::Unknown, None))
890 };
891
892 if let Err(e) = responder.send(ret) {
893 error!("Responder sending get_tx_status failed {:?}", e)
894 };
895 }
896 Message::GetTransactionWithStatus(Request {
897 responder,
898 arguments: hash,
899 }) => {
900 let id = ProposalShortId::from_tx_hash(&hash);
901 let tx_pool = service.tx_pool.read().await;
902 let ret = if let Some(PoolEntry {
903 status,
904 inner: entry,
905 ..
906 }) = tx_pool.pool_map.get_by_id(&id)
907 {
908 let (tx_status, min_replace_fee) = if status == &Status::Proposed {
909 (TxStatus::Proposed, None)
910 } else {
911 (TxStatus::Pending, tx_pool.min_replace_fee(entry))
912 };
913 Ok(TransactionWithStatus::with_status(
914 Some(entry.transaction().clone()),
915 entry.cycles,
916 entry.timestamp,
917 tx_status,
918 Some(entry.fee),
919 min_replace_fee,
920 ))
921 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
922 match recent_reject_db.get(&hash) {
923 Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
924 Ok(_) => Ok(TransactionWithStatus::with_unknown()),
925 Err(err) => Err(err),
926 }
927 } else {
928 Ok(TransactionWithStatus::with_unknown())
929 };
930
931 if let Err(e) = responder.send(ret) {
932 error!("Responder sending get_tx_status failed {:?}", e)
933 };
934 }
935 Message::FetchTxs(AsyncRequest {
936 responder,
937 arguments: short_ids,
938 }) => {
939 let tx_pool = service.tx_pool.read().await;
940 let orphan = service.orphan.read().await;
941 let txs = short_ids
942 .into_iter()
943 .filter_map(|short_id| {
944 tx_pool
945 .get_tx_from_pool_or_store(&short_id)
946 .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
947 .map(|tx| (short_id, tx))
948 })
949 .collect();
950 if let Err(e) = responder.send(txs) {
951 error!("Responder sending fetch_txs failed {:?}", e);
952 };
953 }
954 Message::FetchTxsWithCycles(AsyncRequest {
955 responder,
956 arguments: short_ids,
957 }) => {
958 let tx_pool = service.tx_pool.read().await;
959 let txs = short_ids
960 .into_iter()
961 .filter_map(|short_id| {
962 tx_pool
963 .get_tx_with_cycles(&short_id)
964 .map(|(tx, cycles)| (short_id, (tx, cycles)))
965 })
966 .collect();
967 if let Err(e) = responder.send(txs) {
968 error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
969 };
970 }
971 Message::NewUncle(Notify { arguments: uncle }) => {
972 service.receive_candidate_uncle(uncle).await;
973 }
974 Message::ClearPool(Request {
975 responder,
976 arguments: new_snapshot,
977 }) => {
978 service.clear_pool(new_snapshot).await;
979 if let Err(e) = responder.send(()) {
980 error!("Responder sending clear_pool failed {:?}", e)
981 };
982 }
983 Message::ClearVerifyQueue(Request { responder, .. }) => {
984 service.verify_queue.write().await.clear();
985 if let Err(e) = responder.send(()) {
986 error!("Responder sending clear_verify_queue failed {:?}", e)
987 };
988 }
989 Message::GetPoolTxDetails(Request {
990 responder,
991 arguments: tx_hash,
992 }) => {
993 let tx_pool = service.tx_pool.read().await;
994 let id = ProposalShortId::from_tx_hash(&tx_hash);
995 let tx_details = tx_pool
996 .get_tx_detail(&id)
997 .unwrap_or(PoolTxDetailInfo::with_unknown());
998 if let Err(e) = responder.send(tx_details) {
999 error!("responder send get_pool_tx_details failed {:?}", e)
1000 };
1001 }
1002 Message::GetAllEntryInfo(Request { responder, .. }) => {
1003 let tx_pool = service.tx_pool.read().await;
1004 let info = tx_pool.get_all_entry_info();
1005 if let Err(e) = responder.send(info) {
1006 error!("Responder sending get_all_entry_info failed {:?}", e)
1007 };
1008 }
1009 Message::GetAllIds(Request { responder, .. }) => {
1010 let tx_pool = service.tx_pool.read().await;
1011 let ids = tx_pool.get_ids();
1012 if let Err(e) = responder.send(ids) {
1013 error!("Responder sending get_ids failed {:?}", e)
1014 };
1015 }
1016 Message::SavePool(Request { responder, .. }) => {
1017 service.save_pool().await;
1018 if let Err(e) = responder.send(()) {
1019 error!("Responder sending save_pool failed {:?}", e)
1020 };
1021 }
1022 Message::UpdateIBDState(Request {
1023 responder,
1024 arguments: in_ibd,
1025 }) => {
1026 service.update_ibd_state(in_ibd).await;
1027 if let Err(e) = responder.send(()) {
1028 error!("Responder sending update_ibd_state failed {:?}", e)
1029 };
1030 }
1031 Message::EstimateFeeRate(Request {
1032 responder,
1033 arguments: (estimate_mode, enable_fallback),
1034 }) => {
1035 let fee_estimates_result = service
1036 .estimate_fee_rate(estimate_mode, enable_fallback)
1037 .await;
1038 if let Err(e) = responder.send(fee_estimates_result) {
1039 error!("Responder sending fee_estimates_result failed {:?}", e)
1040 };
1041 }
1042 #[cfg(feature = "internal")]
1043 Message::PlugEntry(Request {
1044 responder,
1045 arguments: (entries, target),
1046 }) => {
1047 service.plug_entry(entries, target).await;
1048
1049 if let Err(e) = responder.send(()) {
1050 error!("Responder sending plug_entry failed {:?}", e);
1051 };
1052 }
1053 #[cfg(feature = "internal")]
1054 Message::PackageTxs(Request {
1055 responder,
1056 arguments: bytes_limit,
1057 }) => {
1058 let max_block_cycles = service.consensus.max_block_cycles();
1059 let max_block_bytes = service.consensus.max_block_bytes();
1060 let tx_pool = service.tx_pool.read().await;
1061 let (txs, _size, _cycles) = tx_pool.package_txs(
1062 max_block_cycles,
1063 bytes_limit.unwrap_or(max_block_bytes) as usize,
1064 );
1065 if let Err(e) = responder.send(txs) {
1066 error!("Responder sending plug_entry failed {:?}", e);
1067 };
1068 }
1069 }
1070}
1071
1072impl TxPoolService {
1073 async fn info(&self) -> TxPoolInfo {
1075 let tx_pool = self.tx_pool.read().await;
1076 let orphan = self.orphan.read().await;
1077 let verify_queue = self.verify_queue.read().await;
1078 let tip_header = tx_pool.snapshot.tip_header();
1079 TxPoolInfo {
1080 tip_hash: tip_header.hash(),
1081 tip_number: tip_header.number(),
1082 pending_size: tx_pool.pool_map.pending_size(),
1083 proposed_size: tx_pool.pool_map.proposed_size(),
1084 orphan_size: orphan.len(),
1085 total_tx_size: tx_pool.pool_map.total_tx_size,
1086 total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1087 min_fee_rate: self.tx_pool_config.min_fee_rate,
1088 min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1089 last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1090 tx_size_limit: TRANSACTION_SIZE_LIMIT,
1091 max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1092 verify_queue_size: verify_queue.len(),
1093 }
1094 }
1095
1096 async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1098 let tx_pool = self.tx_pool.read().await;
1099 let snapshot = tx_pool.snapshot();
1100 let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1101 let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1102
1103 match provider.cell(&out_point, false) {
1104 CellStatus::Live(mut cell_meta) => {
1105 if eager_load {
1106 if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1107 cell_meta.mem_cell_data = Some(data);
1108 cell_meta.mem_cell_data_hash = Some(data_hash);
1109 }
1110 }
1111 CellStatus::live_cell(cell_meta)
1112 }
1113 _ => CellStatus::Unknown,
1114 }
1115 }
1116
1117 pub fn should_notify_block_assembler(&self) -> bool {
1118 self.block_assembler.is_some()
1119 }
1120
1121 pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1122 if let Some(ref block_assembler) = self.block_assembler {
1123 {
1124 block_assembler.candidate_uncles.lock().await.insert(uncle);
1125 }
1126 if self
1127 .block_assembler_sender
1128 .send(BlockAssemblerMessage::Uncle)
1129 .await
1130 .is_err()
1131 {
1132 error!("block_assembler receiver dropped");
1133 }
1134 }
1135 }
1136
1137 pub async fn update_block_assembler_before_tx_pool_reorg(
1138 &self,
1139 detached_blocks: VecDeque<BlockView>,
1140 snapshot: Arc<Snapshot>,
1141 ) {
1142 if let Some(ref block_assembler) = self.block_assembler {
1143 {
1144 let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1145 for detached_block in detached_blocks {
1146 candidate_uncles.insert(detached_block.as_uncle());
1147 }
1148 }
1149
1150 if let Err(e) = block_assembler.update_blank(snapshot).await {
1151 error!("block_assembler update_blank error {}", e);
1152 }
1153 block_assembler.notify().await;
1154 }
1155 }
1156
1157 pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1158 if let Some(ref block_assembler) = self.block_assembler {
1159 if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1160 error!("block_assembler update failed {:?}", e);
1161 }
1162 block_assembler.notify().await;
1163 }
1164 }
1165
1166 #[cfg(feature = "internal")]
1167 pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1168 {
1169 let mut tx_pool = self.tx_pool.write().await;
1170 match target {
1171 PlugTarget::Pending => {
1172 for entry in entries {
1173 tx_pool
1174 .add_pending(entry)
1175 .expect("Plug entry add_pending error");
1176 }
1177 }
1178 PlugTarget::Proposed => {
1179 for entry in entries {
1180 tx_pool
1181 .add_proposed(entry)
1182 .expect("Plug entry add_proposed error");
1183 }
1184 }
1185 };
1186 }
1187
1188 if self.should_notify_block_assembler() {
1189 let msg = match target {
1190 PlugTarget::Pending => BlockAssemblerMessage::Pending,
1191 PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1192 };
1193 if self.block_assembler_sender.send(msg).await.is_err() {
1194 error!("block_assembler receiver dropped");
1195 }
1196 }
1197 }
1198}