1use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26 core::{
27 BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28 cell::{CellProvider, CellStatus, OverlayCellProvider},
29 tx_pool::{
30 EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31 TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32 },
33 },
34 packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40 Arc,
41 atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57 pub responder: oneshot::Sender<R>,
58 pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62 pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63 Request {
64 responder,
65 arguments,
66 }
67 }
68}
69
70pub(crate) struct AsyncRequest<A, R> {
71 pub responder: tokio::sync::oneshot::Sender<R>,
72 pub arguments: A,
73}
74
75impl<A, R> AsyncRequest<A, R> {
76 pub(crate) fn call(
77 arguments: A,
78 responder: tokio::sync::oneshot::Sender<R>,
79 ) -> AsyncRequest<A, R> {
80 AsyncRequest {
81 responder,
82 arguments,
83 }
84 }
85}
86
87pub(crate) struct Notify<A> {
88 pub arguments: A,
89}
90
91impl<A> Notify<A> {
92 pub(crate) fn new(arguments: A) -> Notify<A> {
93 Notify { arguments }
94 }
95}
96
97pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
98type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
99
100pub(crate) type SubmitTxResult = Result<(), Reject>;
101
102pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
103
104type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
105type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
106type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
107
108pub(crate) type ChainReorgArgs = (
109 VecDeque<BlockView>,
110 VecDeque<BlockView>,
111 HashSet<ProposalShortId>,
112 Arc<Snapshot>,
113);
114
115pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
116
117pub(crate) enum Message {
118 BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
119 SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
120 RemoveLocalTx(Request<Byte32, bool>),
121 TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
122 SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
123 NotifyTxs(Notify<Vec<TransactionView>>),
124 FreshProposalsFilter(AsyncRequest<Vec<ProposalShortId>, Vec<ProposalShortId>>),
125 FetchTxs(AsyncRequest<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
126 FetchTxsWithCycles(AsyncRequest<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
127 GetTxPoolInfo(Request<(), TxPoolInfo>),
128 GetLiveCell(Request<(OutPoint, bool), CellStatus>),
129 GetTxStatus(Request<Byte32, GetTxStatusResult>),
130 GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
131 NewUncle(Notify<UncleBlockView>),
132 ClearPool(Request<Arc<Snapshot>, ()>),
133 ClearVerifyQueue(Request<(), ()>),
134 GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
135 GetAllIds(Request<(), TxPoolIds>),
136 SavePool(Request<(), ()>),
137 GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
138 GetTotalRecentRejectNum(Request<(), Option<u64>>),
139
140 UpdateIBDState(Request<bool, ()>),
141 EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
142
143 #[cfg(feature = "internal")]
145 PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
146 #[cfg(feature = "internal")]
147 PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
148 SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
149}
150
151#[derive(Debug, Hash, Eq, PartialEq)]
152pub(crate) enum BlockAssemblerMessage {
153 Pending,
154 Proposed,
155 Uncle,
156 Reset(Arc<Snapshot>),
157}
158
159#[derive(Clone)]
163pub struct TxPoolController {
164 sender: mpsc::Sender<Message>,
165 reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
166 chunk_tx: Arc<watch::Sender<ChunkCommand>>,
167 handle: Handle,
168 started: Arc<AtomicBool>,
169}
170
171macro_rules! send_message {
172 ($self:ident, $msg_type:ident, $args:expr) => {{
173 let (responder, response) = oneshot::channel();
174 let request = Request::call($args, responder);
175 $self
176 .sender
177 .try_send(Message::$msg_type(request))
178 .map_err(|e| {
179 let (_m, e) = handle_try_send_error(e);
180 e
181 })?;
182 block_in_place(|| response.recv())
183 .map_err(handle_recv_error)
184 .map_err(Into::into)
185 }};
186}
187
188macro_rules! send_notify {
189 ($self:ident, $msg_type:ident, $args:expr) => {{
190 let notify = Notify::new($args);
191 $self
192 .sender
193 .try_send(Message::$msg_type(notify))
194 .map_err(|e| {
195 let (_m, e) = handle_try_send_error(e);
196 e.into()
197 })
198 }};
199}
200
201impl TxPoolController {
202 pub fn service_started(&self) -> bool {
204 self.started.load(Ordering::Acquire)
205 }
206
207 #[cfg(feature = "internal")]
209 pub fn set_service_started(&self, v: bool) {
210 self.started.store(v, Ordering::Release);
211 }
212
213 pub fn handle(&self) -> &Handle {
215 &self.handle
216 }
217
218 pub fn get_block_template(
220 &self,
221 bytes_limit: Option<u64>,
222 proposals_limit: Option<u64>,
223 max_version: Option<Version>,
224 ) -> Result<BlockTemplateResult, AnyError> {
225 send_message!(
226 self,
227 BlockTemplate,
228 (bytes_limit, proposals_limit, max_version)
229 )
230 }
231
232 pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
234 send_notify!(self, NewUncle, uncle)
235 }
236
237 pub fn update_tx_pool_for_reorg(
242 &self,
243 detached_blocks: VecDeque<BlockView>,
244 attached_blocks: VecDeque<BlockView>,
245 detached_proposal_id: HashSet<ProposalShortId>,
246 snapshot: Arc<Snapshot>,
247 ) -> Result<(), AnyError> {
248 let notify = Notify::new((
249 detached_blocks,
250 attached_blocks,
251 detached_proposal_id,
252 snapshot,
253 ));
254 self.reorg_sender.try_send(notify).map_err(|e| {
255 let (_m, e) = handle_try_send_error(e);
256 e.into()
257 })
258 }
259
260 pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
262 send_message!(self, SubmitLocalTx, tx)
263 }
264
265 pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
269 send_message!(self, TestAcceptTx, tx)
270 }
271
272 pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
274 send_message!(self, RemoveLocalTx, tx_hash)
275 }
276
277 pub async fn submit_remote_tx(
279 &self,
280 tx: TransactionView,
281 declared_cycles: Cycle,
282 peer: PeerIndex,
283 ) -> Result<(), AnyError> {
284 send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
285 }
286
287 pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
289 send_notify!(self, NotifyTxs, txs)
290 }
291
292 pub async fn notify_txs_async(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
294 let notify = Notify::new(txs);
295 self.sender
296 .send(Message::NotifyTxs(notify))
297 .await
298 .map_err(|e| {
299 let e = ckb_error::OtherError::new(format!("SendError {e}"));
300 e.into()
301 })
302 }
303
304 pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
306 send_message!(self, GetTxPoolInfo, ())
307 }
308
309 pub fn get_live_cell(
311 &self,
312 out_point: OutPoint,
313 with_data: bool,
314 ) -> Result<CellStatus, AnyError> {
315 send_message!(self, GetLiveCell, (out_point, with_data))
316 }
317
318 pub async fn fresh_proposals_filter(
320 &self,
321 proposals: Vec<ProposalShortId>,
322 ) -> Result<Vec<ProposalShortId>, AnyError> {
323 let (responder, response) = tokio::sync::oneshot::channel();
324 let request = AsyncRequest::call(proposals, responder);
325 self.sender
326 .send(Message::FreshProposalsFilter(request))
327 .await?;
328 response.await.map_err(Into::into)
329 }
330
331 pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
333 send_message!(self, GetTxStatus, hash)
334 }
335
336 pub fn get_transaction_with_status(
338 &self,
339 hash: Byte32,
340 ) -> Result<GetTransactionWithStatusResult, AnyError> {
341 send_message!(self, GetTransactionWithStatus, hash)
342 }
343
344 pub async fn fetch_txs(
347 &self,
348 short_ids: HashSet<ProposalShortId>,
349 ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
350 let (responder, response) = tokio::sync::oneshot::channel();
351 let request = AsyncRequest::call(short_ids, responder);
352 self.sender.send(Message::FetchTxs(request)).await?;
353 response.await.map_err(Into::into)
354 }
355
356 pub async fn fetch_txs_with_cycles(
359 &self,
360 short_ids: HashSet<ProposalShortId>,
361 ) -> Result<FetchTxsWithCyclesResult, AnyError> {
362 let (responder, response) = tokio::sync::oneshot::channel();
363 let request = AsyncRequest::call(short_ids, responder);
364 self.sender
365 .send(Message::FetchTxsWithCycles(request))
366 .await?;
367 response.await.map_err(Into::into)
368 }
369
370 pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
372 send_message!(self, ClearPool, new_snapshot)
373 }
374
375 pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
377 send_message!(self, ClearVerifyQueue, ())
378 }
379
380 pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
382 send_message!(self, GetAllEntryInfo, ())
383 }
384
385 pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
387 send_message!(self, GetAllIds, ())
388 }
389
390 pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
392 send_message!(self, GetPoolTxDetails, tx_hash)
393 }
394
395 pub fn save_pool(&self) -> Result<(), AnyError> {
397 info!("Please be patient, tx-pool are saving data into disk ...");
398 send_message!(self, SavePool, ())
399 }
400
401 pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
403 send_message!(self, UpdateIBDState, in_ibd)
404 }
405
406 pub fn estimate_fee_rate(
408 &self,
409 estimate_mode: EstimateMode,
410 enable_fallback: bool,
411 ) -> Result<FeeEstimatesResult, AnyError> {
412 send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
413 }
414
415 pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
417 self.chunk_tx
419 .send(ChunkCommand::Suspend)
420 .map_err(handle_send_cmd_error)
421 .map_err(Into::into)
422 }
423
424 pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
426 self.chunk_tx
428 .send(ChunkCommand::Resume)
429 .map_err(handle_send_cmd_error)
430 .map_err(Into::into)
431 }
432
433 fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
435 if !txs.is_empty() {
436 info!("Loading persistent tx-pool data, total {} txs", txs.len());
437 let mut failed_txs = 0;
438 for tx in txs {
439 if self.submit_local_tx(tx)?.is_err() {
440 failed_txs += 1;
441 }
442 }
443 if failed_txs == 0 {
444 info!("Persistent tx-pool data is loaded");
445 } else {
446 info!(
447 "Persistent tx-pool data is loaded, {} stale txs are ignored",
448 failed_txs
449 );
450 }
451 }
452 Ok(())
453 }
454
455 #[cfg(feature = "internal")]
457 pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
458 send_message!(self, PlugEntry, (entries, target))
459 }
460
461 #[cfg(feature = "internal")]
463 pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
464 send_message!(self, PackageTxs, bytes_limit)
465 }
466
467 pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
469 send_message!(self, SubmitLocalTestTx, tx)
470 }
471
472 pub fn get_total_recent_reject_num(&self) -> Result<Option<u64>, AnyError> {
474 send_message!(self, GetTotalRecentRejectNum, ())
475 }
476}
477
478pub struct TxPoolServiceBuilder {
480 pub(crate) tx_pool_config: TxPoolConfig,
481 pub(crate) tx_pool_controller: TxPoolController,
482 pub(crate) snapshot: Arc<Snapshot>,
483 pub(crate) block_assembler: Option<BlockAssembler>,
484 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
485 pub(crate) callbacks: Callbacks,
486 pub(crate) receiver: mpsc::Receiver<Message>,
487 pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
488 pub(crate) signal_receiver: CancellationToken,
489 pub(crate) handle: Handle,
490 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
491 pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
492 pub(crate) started: Arc<AtomicBool>,
493 pub(crate) block_assembler_channel: (
494 mpsc::Sender<BlockAssemblerMessage>,
495 mpsc::Receiver<BlockAssemblerMessage>,
496 ),
497 pub(crate) fee_estimator: FeeEstimator,
498}
499
500impl TxPoolServiceBuilder {
501 pub fn new(
503 tx_pool_config: TxPoolConfig,
504 snapshot: Arc<Snapshot>,
505 block_assembler_config: Option<BlockAssemblerConfig>,
506 txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
507 handle: &Handle,
508 tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
509 fee_estimator: FeeEstimator,
510 ) -> (TxPoolServiceBuilder, TxPoolController) {
511 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
512 let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
513 let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
514 let signal_receiver: CancellationToken = new_tokio_exit_rx();
515 let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
516 let started = Arc::new(AtomicBool::new(false));
517
518 let controller = TxPoolController {
519 sender,
520 reorg_sender,
521 handle: handle.clone(),
522 chunk_tx: Arc::new(chunk_tx),
523 started: Arc::clone(&started),
524 };
525
526 let block_assembler = block_assembler_config.and_then(|config| {
527 BlockAssembler::new(config, Arc::clone(&snapshot))
528 .inspect_err(|err| error!("failed to initialize block assembler: {}", err))
529 .ok()
530 });
531 let builder = TxPoolServiceBuilder {
532 tx_pool_config,
533 tx_pool_controller: controller.clone(),
534 snapshot,
535 block_assembler,
536 txs_verify_cache,
537 callbacks: Callbacks::new(),
538 receiver,
539 reorg_receiver,
540 signal_receiver,
541 handle: handle.clone(),
542 tx_relay_sender,
543 chunk_rx,
544 started,
545 block_assembler_channel,
546 fee_estimator,
547 };
548
549 (builder, controller)
550 }
551
552 pub fn register_pending(&mut self, callback: PendingCallback) {
554 self.callbacks.register_pending(callback);
555 }
556
557 pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
559 self.tx_relay_sender.clone()
560 }
561
562 pub fn register_proposed(&mut self, callback: ProposedCallback) {
564 self.callbacks.register_proposed(callback);
565 }
566
567 pub fn register_reject(&mut self, callback: RejectCallback) {
569 self.callbacks.register_reject(callback);
570 }
571
572 pub fn start(self, network: NetworkController) {
574 let consensus = self.snapshot.cloned_consensus();
575
576 let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
577 self.tx_pool_config.max_tx_verify_cycles,
578 )));
579
580 let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
581 let txs = match tx_pool.load_from_file() {
582 Ok(txs) => txs,
583 Err(e) => {
584 error!("{}", e.to_string());
585 error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
586 Vec::new()
587 }
588 };
589
590 let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
591 let service = TxPoolService {
592 tx_pool_config: Arc::new(tx_pool.config.clone()),
593 tx_pool: Arc::new(RwLock::new(tx_pool)),
594 orphan: Arc::new(RwLock::new(OrphanPool::new())),
595 block_assembler: self.block_assembler,
596 txs_verify_cache: self.txs_verify_cache,
597 callbacks: Arc::new(self.callbacks),
598 tx_relay_sender: self.tx_relay_sender,
599 block_assembler_sender,
600 verify_queue: Arc::clone(&verify_queue),
601 network,
602 consensus,
603 fee_estimator: self.fee_estimator,
604 };
605
606 let mut verify_mgr =
607 VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
608 self.handle.spawn(async move { verify_mgr.run().await });
609
610 let mut receiver = self.receiver;
611 let mut reorg_receiver = self.reorg_receiver;
612 let handle_clone = self.handle.clone();
613
614 let process_service = service.clone();
615 let signal_receiver = self.signal_receiver.clone();
616 self.handle.spawn(async move {
617 loop {
618 tokio::select! {
619 Some(message) = receiver.recv() => {
620 let service_clone = process_service.clone();
621 handle_clone.spawn(process(service_clone, message));
622 },
623 _ = signal_receiver.cancelled() => {
624 info!("TxPool is saving, please wait...");
625 process_service.save_pool().await;
626 info!("TxPool process_service exit now");
627 break
628 },
629 else => break,
630 }
631 }
632 });
633
634 let process_service = service.clone();
635 if let Some(ref block_assembler) = service.block_assembler {
636 let signal_receiver = self.signal_receiver.clone();
637 let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
638 if interval.is_zero() {
639 ckb_logger::warn!(
642 "block_assembler.update_interval_millis set to zero interval. \
643 This should only be used for tests, as external notification will be disabled."
644 );
645 self.handle.spawn(async move {
646 loop {
647 tokio::select! {
648 Some(message) = block_assembler_receiver.recv() => {
649 let service_clone = process_service.clone();
650 block_assembler::process(service_clone, &message).await;
651 },
652 _ = signal_receiver.cancelled() => {
653 info!("TxPool block_assembler process service received exit signal, exit now");
654 break
655 },
656 else => break,
657 }
658 }
659 });
660 } else {
661 self.handle.spawn(async move {
662 let mut interval = tokio::time::interval(interval);
663 let mut queue = LinkedHashSet::new();
664 loop {
665 tokio::select! {
666 Some(message) = block_assembler_receiver.recv() => {
667 if let BlockAssemblerMessage::Reset(..) = message {
668 let service_clone = process_service.clone();
669 queue.clear();
670 block_assembler::process(service_clone, &message).await;
671 } else {
672 queue.insert(message);
673 }
674 },
675 _ = interval.tick() => {
676 for message in &queue {
677 let service_clone = process_service.clone();
678 block_assembler::process(service_clone, message).await;
679 }
680 if !queue.is_empty()
681 && let Some(ref block_assembler) = process_service.block_assembler {
682 block_assembler.notify().await;
683 }
684 queue.clear();
685 }
686 _ = signal_receiver.cancelled() => {
687 info!("TxPool block_assembler process service received exit signal, exit now");
688 break
689 },
690 else => break,
691 }
692 }
693 });
694 }
695 }
696
697 let signal_receiver = self.signal_receiver;
698 self.handle.spawn(async move {
699 loop {
700 tokio::select! {
701 Some(message) = reorg_receiver.recv() => {
702 let Notify {
703 arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
704 } = message;
705 let snapshot_clone = Arc::clone(&snapshot);
706 let detached_blocks_clone = detached_blocks.clone();
707 service.update_block_assembler_before_tx_pool_reorg(
708 detached_blocks_clone,
709 snapshot_clone
710 ).await;
711
712 let snapshot_clone = Arc::clone(&snapshot);
713 service
714 .update_tx_pool_for_reorg(
715 detached_blocks,
716 attached_blocks,
717 detached_proposal_id,
718 snapshot_clone,
719 )
720 .await;
721
722 service.update_block_assembler_after_tx_pool_reorg().await;
723 },
724 _ = signal_receiver.cancelled() => {
725 info!("TxPool reorg process service received exit signal, exit now");
726 break
727 },
728 else => break,
729 }
730 }
731 });
732 self.started.store(true, Ordering::Release);
733 if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
734 error!("Failed to import persistent txs, cause: {}", err);
735 }
736 }
737}
738
739#[derive(Clone)]
740pub(crate) struct TxPoolService {
741 pub(crate) tx_pool: Arc<RwLock<TxPool>>,
742 pub(crate) orphan: Arc<RwLock<OrphanPool>>,
743 pub(crate) consensus: Arc<Consensus>,
744 pub(crate) tx_pool_config: Arc<TxPoolConfig>,
745 pub(crate) block_assembler: Option<BlockAssembler>,
746 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
747 pub(crate) callbacks: Arc<Callbacks>,
748 pub(crate) network: NetworkController,
749 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
750 pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
751 pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
752 pub(crate) fee_estimator: FeeEstimator,
753}
754
755pub enum TxVerificationResult {
757 Ok {
759 original_peer: Option<PeerIndex>,
761 tx_hash: Byte32,
763 },
764 UnknownParents {
766 peer: PeerIndex,
768 parents: HashSet<Byte32>,
770 },
771 Reject {
773 tx_hash: Byte32,
775 },
776}
777
778#[allow(clippy::cognitive_complexity)]
779async fn process(mut service: TxPoolService, message: Message) {
780 match message {
781 Message::GetTxPoolInfo(Request { responder, .. }) => {
782 let info = service.info().await;
783 if let Err(e) = responder.send(info) {
784 error!("Responder sending get_tx_pool_info failed {:?}", e);
785 };
786 }
787 Message::GetLiveCell(Request {
788 responder,
789 arguments: (out_point, with_data),
790 }) => {
791 let live_cell_status = service.get_live_cell(out_point, with_data).await;
792 if let Err(e) = responder.send(live_cell_status) {
793 error!("Responder sending get_live_cell failed {:?}", e);
794 };
795 }
796 Message::BlockTemplate(Request {
797 responder,
798 arguments: (_bytes_limit, _proposals_limit, _max_version),
799 }) => {
800 let block_template_result = service.get_block_template().await;
801 if let Err(e) = responder.send(block_template_result) {
802 error!("Responder sending block_template_result failed {:?}", e);
803 };
804 }
805 Message::SubmitLocalTx(Request {
806 responder,
807 arguments: tx,
808 }) => {
809 let result = service.process_tx(tx, None).await.map(|_| ());
810 if let Err(e) = responder.send(result) {
811 error!("Responder sending submit_tx result failed {:?}", e);
812 };
813 }
814 Message::SubmitLocalTestTx(Request {
815 responder,
816 arguments: tx,
817 }) => {
818 let result = service
819 .resumeble_process_tx(tx, false, None)
820 .await
821 .map(|_| ());
822 if let Err(e) = responder.send(result) {
823 error!("Responder sending submit_tx result failed {:?}", e);
824 };
825 }
826 Message::RemoveLocalTx(Request {
827 responder,
828 arguments: tx_hash,
829 }) => {
830 let result = service.remove_tx(tx_hash).await;
831 if let Err(e) = responder.send(result) {
832 error!("Responder sending remove_tx result failed {:?}", e);
833 };
834 }
835 Message::TestAcceptTx(Request {
836 responder,
837 arguments: tx,
838 }) => {
839 let result = service.test_accept_tx(tx).await;
840 if let Err(e) = responder.send(result.map(|r| r.into())) {
841 error!("Responder sending test_accept_tx result failed {:?}", e);
842 };
843 }
844 Message::SubmitRemoteTx(Request {
845 responder,
846 arguments: (tx, declared_cycles, peer),
847 }) => {
848 let _result = service
849 .resumeble_process_tx(tx, false, Some((declared_cycles, peer)))
850 .await;
851 if let Err(e) = responder.send(()) {
852 error!("Responder sending submit_tx result failed {:?}", e);
853 };
854 }
855 Message::NotifyTxs(Notify { arguments: txs }) => {
856 for tx in txs {
857 let _ret = service.resumeble_process_tx(tx, true, None).await;
858 }
859 }
860 Message::FreshProposalsFilter(AsyncRequest {
861 responder,
862 arguments: proposals,
863 }) => {
864 let new_proposals = service.exclude_existing_proposal(proposals).await;
865 if let Err(e) = responder.send(new_proposals) {
866 error!("Responder sending fresh_proposals_filter failed {:?}", e);
867 };
868 }
869 Message::GetTxStatus(Request {
870 responder,
871 arguments: hash,
872 }) => {
873 let id = ProposalShortId::from_tx_hash(&hash);
874 let tx_pool = service.tx_pool.read().await;
875 let ret = if let Some(PoolEntry {
876 status,
877 inner: entry,
878 ..
879 }) = tx_pool.pool_map.get_by_id(&id)
880 {
881 let status = if status == &Status::Proposed {
882 TxStatus::Proposed
883 } else {
884 TxStatus::Pending
885 };
886 Ok((status, Some(entry.cycles)))
887 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
888 let recent_reject_result = recent_reject_db.get(&hash);
889 if let Ok(recent_reject) = recent_reject_result {
890 if let Some(record) = recent_reject {
891 Ok((TxStatus::Rejected(record), None))
892 } else {
893 Ok((TxStatus::Unknown, None))
894 }
895 } else {
896 Err(recent_reject_result.unwrap_err())
897 }
898 } else {
899 Ok((TxStatus::Unknown, None))
900 };
901
902 if let Err(e) = responder.send(ret) {
903 error!("Responder sending get_tx_status failed {:?}", e)
904 };
905 }
906 Message::GetTransactionWithStatus(Request {
907 responder,
908 arguments: hash,
909 }) => {
910 let id = ProposalShortId::from_tx_hash(&hash);
911 let tx_pool = service.tx_pool.read().await;
912 let ret = if let Some(PoolEntry {
913 status,
914 inner: entry,
915 ..
916 }) = tx_pool.pool_map.get_by_id(&id)
917 {
918 let (tx_status, min_replace_fee) = if status == &Status::Proposed {
919 (TxStatus::Proposed, None)
920 } else {
921 (TxStatus::Pending, tx_pool.min_replace_fee(entry))
922 };
923 Ok(TransactionWithStatus::with_status(
924 Some(entry.transaction().clone()),
925 entry.cycles,
926 entry.timestamp,
927 tx_status,
928 Some(entry.fee),
929 min_replace_fee,
930 ))
931 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
932 match recent_reject_db.get(&hash) {
933 Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
934 Ok(_) => Ok(TransactionWithStatus::with_unknown()),
935 Err(err) => Err(err),
936 }
937 } else {
938 Ok(TransactionWithStatus::with_unknown())
939 };
940
941 if let Err(e) = responder.send(ret) {
942 error!("Responder sending get_tx_status failed {:?}", e)
943 };
944 }
945 Message::FetchTxs(AsyncRequest {
946 responder,
947 arguments: short_ids,
948 }) => {
949 let txs_map = service.get_tx_for_compact_block(short_ids).await;
950 if let Err(e) = responder.send(txs_map) {
951 error!("Responder sending fetch_txs failed {:?}", e);
952 };
953 }
954 Message::FetchTxsWithCycles(AsyncRequest {
955 responder,
956 arguments: short_ids,
957 }) => {
958 let tx_pool = service.tx_pool.read().await;
959 let txs = short_ids
960 .into_iter()
961 .filter_map(|short_id| {
962 tx_pool
963 .get_tx_with_cycles(&short_id)
964 .map(|(tx, cycles)| (short_id, (tx, cycles)))
965 })
966 .collect();
967 if let Err(e) = responder.send(txs) {
968 error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
969 };
970 }
971 Message::NewUncle(Notify { arguments: uncle }) => {
972 service.receive_candidate_uncle(uncle).await;
973 }
974 Message::ClearPool(Request {
975 responder,
976 arguments: new_snapshot,
977 }) => {
978 service.clear_pool(new_snapshot).await;
979 if let Err(e) = responder.send(()) {
980 error!("Responder sending clear_pool failed {:?}", e)
981 };
982 }
983 Message::ClearVerifyQueue(Request { responder, .. }) => {
984 service.verify_queue.write().await.clear();
985 if let Err(e) = responder.send(()) {
986 error!("Responder sending clear_verify_queue failed {:?}", e)
987 };
988 }
989 Message::GetPoolTxDetails(Request {
990 responder,
991 arguments: tx_hash,
992 }) => {
993 let tx_pool = service.tx_pool.read().await;
994 let id = ProposalShortId::from_tx_hash(&tx_hash);
995 let tx_details = tx_pool
996 .get_tx_detail(&id)
997 .unwrap_or(PoolTxDetailInfo::with_unknown());
998 if let Err(e) = responder.send(tx_details) {
999 error!("responder send get_pool_tx_details failed {:?}", e)
1000 };
1001 }
1002 Message::GetAllEntryInfo(Request { responder, .. }) => {
1003 let tx_pool = service.tx_pool.read().await;
1004 let info = tx_pool.get_all_entry_info();
1005 if let Err(e) = responder.send(info) {
1006 error!("Responder sending get_all_entry_info failed {:?}", e)
1007 };
1008 }
1009 Message::GetAllIds(Request { responder, .. }) => {
1010 let tx_pool = service.tx_pool.read().await;
1011 let ids = tx_pool.get_ids();
1012 if let Err(e) = responder.send(ids) {
1013 error!("Responder sending get_ids failed {:?}", e)
1014 };
1015 }
1016 Message::SavePool(Request { responder, .. }) => {
1017 service.save_pool().await;
1018 if let Err(e) = responder.send(()) {
1019 error!("Responder sending save_pool failed {:?}", e)
1020 };
1021 }
1022 Message::UpdateIBDState(Request {
1023 responder,
1024 arguments: in_ibd,
1025 }) => {
1026 service.update_ibd_state(in_ibd).await;
1027 if let Err(e) = responder.send(()) {
1028 error!("Responder sending update_ibd_state failed {:?}", e)
1029 };
1030 }
1031 Message::EstimateFeeRate(Request {
1032 responder,
1033 arguments: (estimate_mode, enable_fallback),
1034 }) => {
1035 let fee_estimates_result = service
1036 .estimate_fee_rate(estimate_mode, enable_fallback)
1037 .await;
1038 if let Err(e) = responder.send(fee_estimates_result) {
1039 error!("Responder sending fee_estimates_result failed {:?}", e)
1040 };
1041 }
1042 #[cfg(feature = "internal")]
1043 Message::PlugEntry(Request {
1044 responder,
1045 arguments: (entries, target),
1046 }) => {
1047 service.plug_entry(entries, target).await;
1048
1049 if let Err(e) = responder.send(()) {
1050 error!("Responder sending plug_entry failed {:?}", e);
1051 };
1052 }
1053 #[cfg(feature = "internal")]
1054 Message::PackageTxs(Request {
1055 responder,
1056 arguments: bytes_limit,
1057 }) => {
1058 let max_block_cycles = service.consensus.max_block_cycles();
1059 let max_block_bytes = service.consensus.max_block_bytes();
1060 let tx_pool = service.tx_pool.read().await;
1061 let (txs, _size, _cycles) = tx_pool.package_txs(
1062 max_block_cycles,
1063 bytes_limit.unwrap_or(max_block_bytes) as usize,
1064 );
1065 if let Err(e) = responder.send(txs) {
1066 error!("Responder sending plug_entry failed {:?}", e);
1067 };
1068 }
1069 Message::GetTotalRecentRejectNum(Request { responder, .. }) => {
1070 let total_recent_reject_num = service.get_total_recent_reject_num().await;
1071 if let Err(e) = responder.send(total_recent_reject_num) {
1072 error!("Responder sending total_recent_reject_num failed {:?}", e)
1073 };
1074 }
1075 }
1076}
1077
1078impl TxPoolService {
1079 async fn info(&self) -> TxPoolInfo {
1081 let tx_pool = self.tx_pool.read().await;
1082 let orphan = self.orphan.read().await;
1083 let verify_queue = self.verify_queue.read().await;
1084 let tip_header = tx_pool.snapshot.tip_header();
1085 TxPoolInfo {
1086 tip_hash: tip_header.hash(),
1087 tip_number: tip_header.number(),
1088 pending_size: tx_pool.pool_map.pending_size(),
1089 proposed_size: tx_pool.pool_map.proposed_size(),
1090 orphan_size: orphan.len(),
1091 total_tx_size: tx_pool.pool_map.total_tx_size,
1092 total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1093 min_fee_rate: self.tx_pool_config.min_fee_rate,
1094 min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1095 last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1096 tx_size_limit: TRANSACTION_SIZE_LIMIT,
1097 max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1098 verify_queue_size: verify_queue.len(),
1099 }
1100 }
1101
1102 async fn get_total_recent_reject_num(&self) -> Option<u64> {
1103 let tx_pool = self.tx_pool.read().await;
1104 tx_pool
1105 .recent_reject
1106 .as_ref()
1107 .map(|r| r.get_estimate_total_keys_num())
1108 }
1109
1110 async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1112 let tx_pool = self.tx_pool.read().await;
1113 let snapshot = tx_pool.snapshot();
1114 let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1115 let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1116
1117 match provider.cell(&out_point, false) {
1118 CellStatus::Live(mut cell_meta) => {
1119 if eager_load && let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1120 cell_meta.mem_cell_data = Some(data);
1121 cell_meta.mem_cell_data_hash = Some(data_hash);
1122 }
1123 CellStatus::live_cell(cell_meta)
1124 }
1125 _ => CellStatus::Unknown,
1126 }
1127 }
1128
1129 pub fn should_notify_block_assembler(&self) -> bool {
1130 self.block_assembler.is_some()
1131 }
1132
1133 pub async fn exclude_existing_proposal(
1146 &self,
1147 mut proposals: Vec<ProposalShortId>,
1148 ) -> Vec<ProposalShortId> {
1149 {
1150 let verify_queue = self.verify_queue.read().await;
1151 proposals.retain(|id| !verify_queue.contains_key(id));
1152 }
1153 {
1154 let orphan = self.orphan.read().await;
1155 proposals.retain(|id| !orphan.contains_key(id));
1156 }
1157 {
1158 let tx_pool = self.tx_pool.read().await;
1159 proposals.retain(|id| !tx_pool.contains_proposal_id(id));
1160 }
1161 proposals
1162 }
1163
1164 pub async fn get_tx_for_compact_block(
1181 &self,
1182 short_ids: HashSet<ProposalShortId>,
1183 ) -> HashMap<ProposalShortId, TransactionView> {
1184 let mut txs = HashMap::with_capacity(short_ids.len());
1185 {
1186 let verify_queue = self.verify_queue.read().await;
1187 txs.extend(short_ids.iter().filter_map(|short_id| {
1188 verify_queue
1189 .get_tx_by_id(short_id)
1190 .map(|entry| (short_id.to_owned(), entry.tx.to_owned()))
1191 }));
1192 }
1193 {
1194 let orphan = self.orphan.read().await;
1195 txs.extend(short_ids.iter().filter_map(|short_id| {
1196 orphan
1197 .get(short_id)
1198 .map(|entry| (short_id.to_owned(), entry.tx.to_owned()))
1199 }));
1200 }
1201 {
1202 let tx_pool = self.tx_pool.read().await;
1203 txs.extend(short_ids.iter().filter_map(|short_id| {
1204 tx_pool
1205 .get_tx_from_pool_or_store(short_id)
1206 .map(|tx| (short_id.to_owned(), tx))
1207 }));
1208 }
1209 txs
1210 }
1211
1212 pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1213 if let Some(ref block_assembler) = self.block_assembler {
1214 {
1215 block_assembler.candidate_uncles.lock().await.insert(uncle);
1216 }
1217 if self
1218 .block_assembler_sender
1219 .send(BlockAssemblerMessage::Uncle)
1220 .await
1221 .is_err()
1222 {
1223 error!("block_assembler receiver dropped");
1224 }
1225 }
1226 }
1227
1228 pub async fn update_block_assembler_before_tx_pool_reorg(
1229 &self,
1230 detached_blocks: VecDeque<BlockView>,
1231 snapshot: Arc<Snapshot>,
1232 ) {
1233 if let Some(ref block_assembler) = self.block_assembler {
1234 {
1235 let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1236 for detached_block in detached_blocks {
1237 candidate_uncles.insert(detached_block.as_uncle());
1238 }
1239 }
1240
1241 if let Err(e) = block_assembler.update_blank(snapshot).await {
1242 error!("block_assembler update_blank error {}", e);
1243 }
1244 block_assembler.notify().await;
1245 }
1246 }
1247
1248 pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1249 if let Some(ref block_assembler) = self.block_assembler {
1250 if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1251 error!("block_assembler update failed {:?}", e);
1252 }
1253 block_assembler.notify().await;
1254 }
1255 }
1256
1257 #[cfg(feature = "internal")]
1258 pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1259 {
1260 let mut tx_pool = self.tx_pool.write().await;
1261 match target {
1262 PlugTarget::Pending => {
1263 for entry in entries {
1264 tx_pool
1265 .add_pending(entry)
1266 .expect("Plug entry add_pending error");
1267 }
1268 }
1269 PlugTarget::Proposed => {
1270 for entry in entries {
1271 tx_pool
1272 .add_proposed(entry)
1273 .expect("Plug entry add_proposed error");
1274 }
1275 }
1276 };
1277 }
1278
1279 if self.should_notify_block_assembler() {
1280 let msg = match target {
1281 PlugTarget::Pending => BlockAssemblerMessage::Pending,
1282 PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1283 };
1284 if self.block_assembler_sender.send(msg).await.is_err() {
1285 error!("block_assembler receiver dropped");
1286 }
1287 }
1288 }
1289}