1use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::util::after_delay_window;
11use crate::verify_mgr::VerifyMgr;
12use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
13use ckb_async_runtime::Handle;
14use ckb_chain_spec::consensus::Consensus;
15use ckb_channel::oneshot;
16use ckb_error::AnyError;
17use ckb_fee_estimator::FeeEstimator;
18use ckb_jsonrpc_types::BlockTemplate;
19use ckb_logger::error;
20use ckb_logger::info;
21use ckb_network::{NetworkController, PeerIndex};
22use ckb_script::ChunkCommand;
23use ckb_snapshot::Snapshot;
24use ckb_stop_handler::new_tokio_exit_rx;
25use ckb_store::ChainStore;
26use ckb_types::{
27 core::{
28 cell::{CellProvider, CellStatus, OverlayCellProvider},
29 tx_pool::{
30 EntryCompleted, PoolTxDetailInfo, Reject, TransactionWithStatus, TxPoolEntryInfo,
31 TxPoolIds, TxPoolInfo, TxStatus, TRANSACTION_SIZE_LIMIT,
32 },
33 BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
34 },
35 packed::{Byte32, OutPoint, ProposalShortId},
36};
37use ckb_util::{LinkedHashMap, LinkedHashSet};
38use ckb_verification::cache::TxVerificationCache;
39use std::collections::{HashMap, HashSet, VecDeque};
40use std::sync::{
41 atomic::{AtomicBool, Ordering},
42 Arc,
43};
44use std::time::Duration;
45use tokio::sync::watch;
46use tokio::sync::{mpsc, RwLock};
47use tokio::task::block_in_place;
48use tokio_util::sync::CancellationToken;
49
50use crate::pool_cell::PoolCell;
51#[cfg(feature = "internal")]
52use crate::{component::entry::TxEntry, process::PlugTarget};
53
54pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
55pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
56
57pub(crate) struct Request<A, R> {
58 pub responder: oneshot::Sender<R>,
59 pub arguments: A,
60}
61
62impl<A, R> Request<A, R> {
63 pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
64 Request {
65 responder,
66 arguments,
67 }
68 }
69}
70
71pub(crate) struct Notify<A> {
72 pub arguments: A,
73}
74
75impl<A> Notify<A> {
76 pub(crate) fn new(arguments: A) -> Notify<A> {
77 Notify { arguments }
78 }
79}
80
81pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
82type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
83
84pub(crate) type SubmitTxResult = Result<(), Reject>;
85
86pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
87
88type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
89type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
90type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
91
92pub(crate) type ChainReorgArgs = (
93 VecDeque<BlockView>,
94 VecDeque<BlockView>,
95 HashSet<ProposalShortId>,
96 Arc<Snapshot>,
97);
98
99pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
100
101pub(crate) enum Message {
102 BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
103 SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
104 RemoveLocalTx(Request<Byte32, bool>),
105 TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
106 SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
107 NotifyTxs(Notify<Vec<TransactionView>>),
108 FreshProposalsFilter(Request<Vec<ProposalShortId>, Vec<ProposalShortId>>),
109 FetchTxs(Request<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
110 FetchTxsWithCycles(Request<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
111 GetTxPoolInfo(Request<(), TxPoolInfo>),
112 GetLiveCell(Request<(OutPoint, bool), CellStatus>),
113 GetTxStatus(Request<Byte32, GetTxStatusResult>),
114 GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
115 NewUncle(Notify<UncleBlockView>),
116 ClearPool(Request<Arc<Snapshot>, ()>),
117 ClearVerifyQueue(Request<(), ()>),
118 GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
119 GetAllIds(Request<(), TxPoolIds>),
120 SavePool(Request<(), ()>),
121 GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
122
123 UpdateIBDState(Request<bool, ()>),
124 EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
125
126 #[cfg(feature = "internal")]
128 PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
129 #[cfg(feature = "internal")]
130 PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
131 SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
132}
133
134#[derive(Debug, Hash, Eq, PartialEq)]
135pub(crate) enum BlockAssemblerMessage {
136 Pending,
137 Proposed,
138 Uncle,
139 Reset(Arc<Snapshot>),
140}
141
142#[derive(Clone)]
146pub struct TxPoolController {
147 sender: mpsc::Sender<Message>,
148 reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
149 chunk_tx: Arc<watch::Sender<ChunkCommand>>,
150 handle: Handle,
151 started: Arc<AtomicBool>,
152}
153
154macro_rules! send_message {
155 ($self:ident, $msg_type:ident, $args:expr) => {{
156 let (responder, response) = oneshot::channel();
157 let request = Request::call($args, responder);
158 $self
159 .sender
160 .try_send(Message::$msg_type(request))
161 .map_err(|e| {
162 let (_m, e) = handle_try_send_error(e);
163 e
164 })?;
165 block_in_place(|| response.recv())
166 .map_err(handle_recv_error)
167 .map_err(Into::into)
168 }};
169}
170
171macro_rules! send_notify {
172 ($self:ident, $msg_type:ident, $args:expr) => {{
173 let notify = Notify::new($args);
174 $self
175 .sender
176 .try_send(Message::$msg_type(notify))
177 .map_err(|e| {
178 let (_m, e) = handle_try_send_error(e);
179 e.into()
180 })
181 }};
182}
183
184impl TxPoolController {
185 pub fn service_started(&self) -> bool {
187 self.started.load(Ordering::Acquire)
188 }
189
190 #[cfg(feature = "internal")]
192 pub fn set_service_started(&self, v: bool) {
193 self.started.store(v, Ordering::Release);
194 }
195
196 pub fn handle(&self) -> &Handle {
198 &self.handle
199 }
200
201 pub fn get_block_template(
203 &self,
204 bytes_limit: Option<u64>,
205 proposals_limit: Option<u64>,
206 max_version: Option<Version>,
207 ) -> Result<BlockTemplateResult, AnyError> {
208 send_message!(
209 self,
210 BlockTemplate,
211 (bytes_limit, proposals_limit, max_version)
212 )
213 }
214
215 pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
217 send_notify!(self, NewUncle, uncle)
218 }
219
220 pub fn update_tx_pool_for_reorg(
225 &self,
226 detached_blocks: VecDeque<BlockView>,
227 attached_blocks: VecDeque<BlockView>,
228 detached_proposal_id: HashSet<ProposalShortId>,
229 snapshot: Arc<Snapshot>,
230 ) -> Result<(), AnyError> {
231 let notify = Notify::new((
232 detached_blocks,
233 attached_blocks,
234 detached_proposal_id,
235 snapshot,
236 ));
237 self.reorg_sender.try_send(notify).map_err(|e| {
238 let (_m, e) = handle_try_send_error(e);
239 e.into()
240 })
241 }
242
243 pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
245 send_message!(self, SubmitLocalTx, tx)
246 }
247
248 pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
252 send_message!(self, TestAcceptTx, tx)
253 }
254
255 pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
257 send_message!(self, RemoveLocalTx, tx_hash)
258 }
259
260 pub async fn submit_remote_tx(
262 &self,
263 tx: TransactionView,
264 declared_cycles: Cycle,
265 peer: PeerIndex,
266 ) -> Result<(), AnyError> {
267 send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
268 }
269
270 pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
272 send_notify!(self, NotifyTxs, txs)
273 }
274
275 pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
277 send_message!(self, GetTxPoolInfo, ())
278 }
279
280 pub fn get_live_cell(
282 &self,
283 out_point: OutPoint,
284 with_data: bool,
285 ) -> Result<CellStatus, AnyError> {
286 send_message!(self, GetLiveCell, (out_point, with_data))
287 }
288
289 pub fn fresh_proposals_filter(
291 &self,
292 proposals: Vec<ProposalShortId>,
293 ) -> Result<Vec<ProposalShortId>, AnyError> {
294 send_message!(self, FreshProposalsFilter, proposals)
295 }
296
297 pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
299 send_message!(self, GetTxStatus, hash)
300 }
301
302 pub fn get_transaction_with_status(
304 &self,
305 hash: Byte32,
306 ) -> Result<GetTransactionWithStatusResult, AnyError> {
307 send_message!(self, GetTransactionWithStatus, hash)
308 }
309
310 pub fn fetch_txs(
313 &self,
314 short_ids: HashSet<ProposalShortId>,
315 ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
316 send_message!(self, FetchTxs, short_ids)
317 }
318
319 pub fn fetch_txs_with_cycles(
322 &self,
323 short_ids: HashSet<ProposalShortId>,
324 ) -> Result<FetchTxsWithCyclesResult, AnyError> {
325 send_message!(self, FetchTxsWithCycles, short_ids)
326 }
327
328 pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
330 send_message!(self, ClearPool, new_snapshot)
331 }
332
333 pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
335 send_message!(self, ClearVerifyQueue, ())
336 }
337
338 pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
340 send_message!(self, GetAllEntryInfo, ())
341 }
342
343 pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
345 send_message!(self, GetAllIds, ())
346 }
347
348 pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
350 send_message!(self, GetPoolTxDetails, tx_hash)
351 }
352
353 pub fn save_pool(&self) -> Result<(), AnyError> {
355 info!("Please be patient, tx-pool are saving data into disk ...");
356 send_message!(self, SavePool, ())
357 }
358
359 pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
361 send_message!(self, UpdateIBDState, in_ibd)
362 }
363
364 pub fn estimate_fee_rate(
366 &self,
367 estimate_mode: EstimateMode,
368 enable_fallback: bool,
369 ) -> Result<FeeEstimatesResult, AnyError> {
370 send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
371 }
372
373 pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
375 self.chunk_tx
377 .send(ChunkCommand::Suspend)
378 .map_err(handle_send_cmd_error)
379 .map_err(Into::into)
380 }
381
382 pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
384 self.chunk_tx
386 .send(ChunkCommand::Resume)
387 .map_err(handle_send_cmd_error)
388 .map_err(Into::into)
389 }
390
391 fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
393 if !txs.is_empty() {
394 info!("Loading persistent tx-pool data, total {} txs", txs.len());
395 let mut failed_txs = 0;
396 for tx in txs {
397 if self.submit_local_tx(tx)?.is_err() {
398 failed_txs += 1;
399 }
400 }
401 if failed_txs == 0 {
402 info!("Persistent tx-pool data is loaded");
403 } else {
404 info!(
405 "Persistent tx-pool data is loaded, {} stale txs are ignored",
406 failed_txs
407 );
408 }
409 }
410 Ok(())
411 }
412
413 #[cfg(feature = "internal")]
415 pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
416 send_message!(self, PlugEntry, (entries, target))
417 }
418
419 #[cfg(feature = "internal")]
421 pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
422 send_message!(self, PackageTxs, bytes_limit)
423 }
424
425 pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
427 send_message!(self, SubmitLocalTestTx, tx)
428 }
429}
430
431pub struct TxPoolServiceBuilder {
433 pub(crate) tx_pool_config: TxPoolConfig,
434 pub(crate) tx_pool_controller: TxPoolController,
435 pub(crate) snapshot: Arc<Snapshot>,
436 pub(crate) block_assembler: Option<BlockAssembler>,
437 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
438 pub(crate) callbacks: Callbacks,
439 pub(crate) receiver: mpsc::Receiver<Message>,
440 pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
441 pub(crate) signal_receiver: CancellationToken,
442 pub(crate) handle: Handle,
443 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
444 pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
445 pub(crate) started: Arc<AtomicBool>,
446 pub(crate) block_assembler_channel: (
447 mpsc::Sender<BlockAssemblerMessage>,
448 mpsc::Receiver<BlockAssemblerMessage>,
449 ),
450 pub(crate) fee_estimator: FeeEstimator,
451}
452
453impl TxPoolServiceBuilder {
454 pub fn new(
456 tx_pool_config: TxPoolConfig,
457 snapshot: Arc<Snapshot>,
458 block_assembler_config: Option<BlockAssemblerConfig>,
459 txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
460 handle: &Handle,
461 tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
462 fee_estimator: FeeEstimator,
463 ) -> (TxPoolServiceBuilder, TxPoolController) {
464 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
465 let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
466 let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
467 let signal_receiver: CancellationToken = new_tokio_exit_rx();
468 let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
469 let started = Arc::new(AtomicBool::new(false));
470
471 let controller = TxPoolController {
472 sender,
473 reorg_sender,
474 handle: handle.clone(),
475 chunk_tx: Arc::new(chunk_tx),
476 started: Arc::clone(&started),
477 };
478
479 let block_assembler =
480 block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
481 let builder = TxPoolServiceBuilder {
482 tx_pool_config,
483 tx_pool_controller: controller.clone(),
484 snapshot,
485 block_assembler,
486 txs_verify_cache,
487 callbacks: Callbacks::new(),
488 receiver,
489 reorg_receiver,
490 signal_receiver,
491 handle: handle.clone(),
492 tx_relay_sender,
493 chunk_rx,
494 started,
495 block_assembler_channel,
496 fee_estimator,
497 };
498
499 (builder, controller)
500 }
501
502 pub fn register_pending(&mut self, callback: PendingCallback) {
504 self.callbacks.register_pending(callback);
505 }
506
507 pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
509 self.tx_relay_sender.clone()
510 }
511
512 pub fn register_proposed(&mut self, callback: ProposedCallback) {
514 self.callbacks.register_proposed(callback);
515 }
516
517 pub fn register_reject(&mut self, callback: RejectCallback) {
519 self.callbacks.register_reject(callback);
520 }
521
522 pub fn start(self, network: NetworkController) {
524 let consensus = self.snapshot.cloned_consensus();
525 let after_delay_window = after_delay_window(&self.snapshot);
526
527 let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
528 self.tx_pool_config.max_tx_verify_cycles,
529 )));
530
531 let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
532 let txs = match tx_pool.load_from_file() {
533 Ok(txs) => txs,
534 Err(e) => {
535 error!("{}", e.to_string());
536 error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
537 Vec::new()
538 }
539 };
540
541 let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
542 let service = TxPoolService {
543 tx_pool_config: Arc::new(tx_pool.config.clone()),
544 tx_pool: Arc::new(RwLock::new(tx_pool)),
545 orphan: Arc::new(RwLock::new(OrphanPool::new())),
546 block_assembler: self.block_assembler,
547 txs_verify_cache: self.txs_verify_cache,
548 callbacks: Arc::new(self.callbacks),
549 tx_relay_sender: self.tx_relay_sender,
550 block_assembler_sender,
551 verify_queue: Arc::clone(&verify_queue),
552 network,
553 consensus,
554 delay: Arc::new(RwLock::new(LinkedHashMap::new())),
555 after_delay: Arc::new(AtomicBool::new(after_delay_window)),
556 fee_estimator: self.fee_estimator,
557 };
558
559 let mut verify_mgr =
560 VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
561 self.handle.spawn(async move { verify_mgr.run().await });
562
563 let mut receiver = self.receiver;
564 let mut reorg_receiver = self.reorg_receiver;
565 let handle_clone = self.handle.clone();
566
567 let process_service = service.clone();
568 let signal_receiver = self.signal_receiver.clone();
569 self.handle.spawn(async move {
570 loop {
571 tokio::select! {
572 Some(message) = receiver.recv() => {
573 let service_clone = process_service.clone();
574 handle_clone.spawn(process(service_clone, message));
575 },
576 _ = signal_receiver.cancelled() => {
577 info!("TxPool is saving, please wait...");
578 process_service.save_pool().await;
579 info!("TxPool process_service exit now");
580 break
581 },
582 else => break,
583 }
584 }
585 });
586
587 let process_service = service.clone();
588 if let Some(ref block_assembler) = service.block_assembler {
589 let signal_receiver = self.signal_receiver.clone();
590 let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
591 if interval.is_zero() {
592 ckb_logger::warn!(
595 "block_assembler.update_interval_millis set to zero interval. \
596 This should only be used for tests, as external notification will be disabled."
597 );
598 self.handle.spawn(async move {
599 loop {
600 tokio::select! {
601 Some(message) = block_assembler_receiver.recv() => {
602 let service_clone = process_service.clone();
603 block_assembler::process(service_clone, &message).await;
604 },
605 _ = signal_receiver.cancelled() => {
606 info!("TxPool block_assembler process service received exit signal, exit now");
607 break
608 },
609 else => break,
610 }
611 }
612 });
613 } else {
614 self.handle.spawn(async move {
615 let mut interval = tokio::time::interval(interval);
616 let mut queue = LinkedHashSet::new();
617 loop {
618 tokio::select! {
619 Some(message) = block_assembler_receiver.recv() => {
620 if let BlockAssemblerMessage::Reset(..) = message {
621 let service_clone = process_service.clone();
622 queue.clear();
623 block_assembler::process(service_clone, &message).await;
624 } else {
625 queue.insert(message);
626 }
627 },
628 _ = interval.tick() => {
629 for message in &queue {
630 let service_clone = process_service.clone();
631 block_assembler::process(service_clone, message).await;
632 }
633 if !queue.is_empty() {
634 if let Some(ref block_assembler) = process_service.block_assembler {
635 block_assembler.notify().await;
636 }
637 }
638 queue.clear();
639 }
640 _ = signal_receiver.cancelled() => {
641 info!("TxPool block_assembler process service received exit signal, exit now");
642 break
643 },
644 else => break,
645 }
646 }
647 });
648 }
649 }
650
651 let signal_receiver = self.signal_receiver;
652 self.handle.spawn(async move {
653 loop {
654 tokio::select! {
655 Some(message) = reorg_receiver.recv() => {
656 let Notify {
657 arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
658 } = message;
659 let snapshot_clone = Arc::clone(&snapshot);
660 let detached_blocks_clone = detached_blocks.clone();
661 service.update_block_assembler_before_tx_pool_reorg(
662 detached_blocks_clone,
663 snapshot_clone
664 ).await;
665
666 let snapshot_clone = Arc::clone(&snapshot);
667 service
668 .update_tx_pool_for_reorg(
669 detached_blocks,
670 attached_blocks,
671 detached_proposal_id,
672 snapshot_clone,
673 )
674 .await;
675
676 service.update_block_assembler_after_tx_pool_reorg().await;
677 },
678 _ = signal_receiver.cancelled() => {
679 info!("TxPool reorg process service received exit signal, exit now");
680 break
681 },
682 else => break,
683 }
684 }
685 });
686 self.started.store(true, Ordering::Release);
687 if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
688 error!("Failed to import persistent txs, cause: {}", err);
689 }
690 }
691}
692
693#[derive(Clone)]
694pub(crate) struct TxPoolService {
695 pub(crate) tx_pool: Arc<RwLock<TxPool>>,
696 pub(crate) orphan: Arc<RwLock<OrphanPool>>,
697 pub(crate) consensus: Arc<Consensus>,
698 pub(crate) tx_pool_config: Arc<TxPoolConfig>,
699 pub(crate) block_assembler: Option<BlockAssembler>,
700 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
701 pub(crate) callbacks: Arc<Callbacks>,
702 pub(crate) network: NetworkController,
703 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
704 pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
705 pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
706 pub(crate) delay: Arc<RwLock<LinkedHashMap<ProposalShortId, TransactionView>>>,
707 pub(crate) after_delay: Arc<AtomicBool>,
708 pub(crate) fee_estimator: FeeEstimator,
709}
710
711pub enum TxVerificationResult {
713 Ok {
715 original_peer: Option<PeerIndex>,
717 with_vm_2023: bool,
719 tx_hash: Byte32,
721 },
722 UnknownParents {
724 peer: PeerIndex,
726 parents: HashSet<Byte32>,
728 },
729 Reject {
731 tx_hash: Byte32,
733 },
734}
735
736#[allow(clippy::cognitive_complexity)]
737async fn process(mut service: TxPoolService, message: Message) {
738 match message {
739 Message::GetTxPoolInfo(Request { responder, .. }) => {
740 let info = service.info().await;
741 if let Err(e) = responder.send(info) {
742 error!("Responder sending get_tx_pool_info failed {:?}", e);
743 };
744 }
745 Message::GetLiveCell(Request {
746 responder,
747 arguments: (out_point, with_data),
748 }) => {
749 let live_cell_status = service.get_live_cell(out_point, with_data).await;
750 if let Err(e) = responder.send(live_cell_status) {
751 error!("Responder sending get_live_cell failed {:?}", e);
752 };
753 }
754 Message::BlockTemplate(Request {
755 responder,
756 arguments: (_bytes_limit, _proposals_limit, _max_version),
757 }) => {
758 let block_template_result = service.get_block_template().await;
759 if let Err(e) = responder.send(block_template_result) {
760 error!("Responder sending block_template_result failed {:?}", e);
761 };
762 }
763 Message::SubmitLocalTx(Request {
764 responder,
765 arguments: tx,
766 }) => {
767 let result = service.process_tx(tx, None).await.map(|_| ());
768 if let Err(e) = responder.send(result) {
769 error!("Responder sending submit_tx result failed {:?}", e);
770 };
771 }
772 Message::SubmitLocalTestTx(Request {
773 responder,
774 arguments: tx,
775 }) => {
776 let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
777 if let Err(e) = responder.send(result) {
778 error!("Responder sending submit_tx result failed {:?}", e);
779 };
780 }
781 Message::RemoveLocalTx(Request {
782 responder,
783 arguments: tx_hash,
784 }) => {
785 let result = service.remove_tx(tx_hash).await;
786 if let Err(e) = responder.send(result) {
787 error!("Responder sending remove_tx result failed {:?}", e);
788 };
789 }
790 Message::TestAcceptTx(Request {
791 responder,
792 arguments: tx,
793 }) => {
794 let result = service.test_accept_tx(tx).await;
795 if let Err(e) = responder.send(result.map(|r| r.into())) {
796 error!("Responder sending test_accept_tx result failed {:?}", e);
797 };
798 }
799 Message::SubmitRemoteTx(Request {
800 responder,
801 arguments: (tx, declared_cycles, peer),
802 }) => {
803 let _result = service
804 .resumeble_process_tx(tx, Some((declared_cycles, peer)))
805 .await;
806 if let Err(e) = responder.send(()) {
807 error!("Responder sending submit_tx result failed {:?}", e);
808 };
809 }
810 Message::NotifyTxs(Notify { arguments: txs }) => {
811 for tx in txs {
812 let _ret = service.resumeble_process_tx(tx, None).await;
813 }
814 }
815 Message::FreshProposalsFilter(Request {
816 responder,
817 arguments: mut proposals,
818 }) => {
819 let tx_pool = service.tx_pool.read().await;
820 proposals.retain(|id| !tx_pool.contains_proposal_id(id));
821 if let Err(e) = responder.send(proposals) {
822 error!("Responder sending fresh_proposals_filter failed {:?}", e);
823 };
824 }
825 Message::GetTxStatus(Request {
826 responder,
827 arguments: hash,
828 }) => {
829 let id = ProposalShortId::from_tx_hash(&hash);
830 let tx_pool = service.tx_pool.read().await;
831 let ret = if let Some(PoolEntry {
832 status,
833 inner: entry,
834 ..
835 }) = tx_pool.pool_map.get_by_id(&id)
836 {
837 let status = if status == &Status::Proposed {
838 TxStatus::Proposed
839 } else {
840 TxStatus::Pending
841 };
842 Ok((status, Some(entry.cycles)))
843 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
844 let recent_reject_result = recent_reject_db.get(&hash);
845 if let Ok(recent_reject) = recent_reject_result {
846 if let Some(record) = recent_reject {
847 Ok((TxStatus::Rejected(record), None))
848 } else {
849 Ok((TxStatus::Unknown, None))
850 }
851 } else {
852 Err(recent_reject_result.unwrap_err())
853 }
854 } else {
855 Ok((TxStatus::Unknown, None))
856 };
857
858 if let Err(e) = responder.send(ret) {
859 error!("Responder sending get_tx_status failed {:?}", e)
860 };
861 }
862 Message::GetTransactionWithStatus(Request {
863 responder,
864 arguments: hash,
865 }) => {
866 let id = ProposalShortId::from_tx_hash(&hash);
867 let tx_pool = service.tx_pool.read().await;
868 let ret = if let Some(PoolEntry {
869 status,
870 inner: entry,
871 ..
872 }) = tx_pool.pool_map.get_by_id(&id)
873 {
874 let (tx_status, min_replace_fee) = if status == &Status::Proposed {
875 (TxStatus::Proposed, None)
876 } else {
877 (TxStatus::Pending, tx_pool.min_replace_fee(entry))
878 };
879 Ok(TransactionWithStatus::with_status(
880 Some(entry.transaction().clone()),
881 entry.cycles,
882 entry.timestamp,
883 tx_status,
884 Some(entry.fee),
885 min_replace_fee,
886 ))
887 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
888 match recent_reject_db.get(&hash) {
889 Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
890 Ok(_) => Ok(TransactionWithStatus::with_unknown()),
891 Err(err) => Err(err),
892 }
893 } else {
894 Ok(TransactionWithStatus::with_unknown())
895 };
896
897 if let Err(e) = responder.send(ret) {
898 error!("Responder sending get_tx_status failed {:?}", e)
899 };
900 }
901 Message::FetchTxs(Request {
902 responder,
903 arguments: short_ids,
904 }) => {
905 let tx_pool = service.tx_pool.read().await;
906 let orphan = service.orphan.read().await;
907 let txs = short_ids
908 .into_iter()
909 .filter_map(|short_id| {
910 tx_pool
911 .get_tx_from_pool_or_store(&short_id)
912 .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
913 .map(|tx| (short_id, tx))
914 })
915 .collect();
916 if let Err(e) = responder.send(txs) {
917 error!("Responder sending fetch_txs failed {:?}", e);
918 };
919 }
920 Message::FetchTxsWithCycles(Request {
921 responder,
922 arguments: short_ids,
923 }) => {
924 let tx_pool = service.tx_pool.read().await;
925 let txs = short_ids
926 .into_iter()
927 .filter_map(|short_id| {
928 tx_pool
929 .get_tx_with_cycles(&short_id)
930 .map(|(tx, cycles)| (short_id, (tx, cycles)))
931 })
932 .collect();
933 if let Err(e) = responder.send(txs) {
934 error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
935 };
936 }
937 Message::NewUncle(Notify { arguments: uncle }) => {
938 service.receive_candidate_uncle(uncle).await;
939 }
940 Message::ClearPool(Request {
941 responder,
942 arguments: new_snapshot,
943 }) => {
944 service.clear_pool(new_snapshot).await;
945 if let Err(e) = responder.send(()) {
946 error!("Responder sending clear_pool failed {:?}", e)
947 };
948 }
949 Message::ClearVerifyQueue(Request { responder, .. }) => {
950 service.verify_queue.write().await.clear();
951 if let Err(e) = responder.send(()) {
952 error!("Responder sending clear_verify_queue failed {:?}", e)
953 };
954 }
955 Message::GetPoolTxDetails(Request {
956 responder,
957 arguments: tx_hash,
958 }) => {
959 let tx_pool = service.tx_pool.read().await;
960 let id = ProposalShortId::from_tx_hash(&tx_hash);
961 let tx_details = tx_pool
962 .get_tx_detail(&id)
963 .unwrap_or(PoolTxDetailInfo::with_unknown());
964 if let Err(e) = responder.send(tx_details) {
965 error!("responder send get_pool_tx_details failed {:?}", e)
966 };
967 }
968 Message::GetAllEntryInfo(Request { responder, .. }) => {
969 let tx_pool = service.tx_pool.read().await;
970 let info = tx_pool.get_all_entry_info();
971 if let Err(e) = responder.send(info) {
972 error!("Responder sending get_all_entry_info failed {:?}", e)
973 };
974 }
975 Message::GetAllIds(Request { responder, .. }) => {
976 let tx_pool = service.tx_pool.read().await;
977 let ids = tx_pool.get_ids();
978 if let Err(e) = responder.send(ids) {
979 error!("Responder sending get_ids failed {:?}", e)
980 };
981 }
982 Message::SavePool(Request { responder, .. }) => {
983 service.save_pool().await;
984 if let Err(e) = responder.send(()) {
985 error!("Responder sending save_pool failed {:?}", e)
986 };
987 }
988 Message::UpdateIBDState(Request {
989 responder,
990 arguments: in_ibd,
991 }) => {
992 service.update_ibd_state(in_ibd).await;
993 if let Err(e) = responder.send(()) {
994 error!("Responder sending update_ibd_state failed {:?}", e)
995 };
996 }
997 Message::EstimateFeeRate(Request {
998 responder,
999 arguments: (estimate_mode, enable_fallback),
1000 }) => {
1001 let fee_estimates_result = service
1002 .estimate_fee_rate(estimate_mode, enable_fallback)
1003 .await;
1004 if let Err(e) = responder.send(fee_estimates_result) {
1005 error!("Responder sending fee_estimates_result failed {:?}", e)
1006 };
1007 }
1008 #[cfg(feature = "internal")]
1009 Message::PlugEntry(Request {
1010 responder,
1011 arguments: (entries, target),
1012 }) => {
1013 service.plug_entry(entries, target).await;
1014
1015 if let Err(e) = responder.send(()) {
1016 error!("Responder sending plug_entry failed {:?}", e);
1017 };
1018 }
1019 #[cfg(feature = "internal")]
1020 Message::PackageTxs(Request {
1021 responder,
1022 arguments: bytes_limit,
1023 }) => {
1024 let max_block_cycles = service.consensus.max_block_cycles();
1025 let max_block_bytes = service.consensus.max_block_bytes();
1026 let tx_pool = service.tx_pool.read().await;
1027 let (txs, _size, _cycles) = tx_pool.package_txs(
1028 max_block_cycles,
1029 bytes_limit.unwrap_or(max_block_bytes) as usize,
1030 );
1031 if let Err(e) = responder.send(txs) {
1032 error!("Responder sending plug_entry failed {:?}", e);
1033 };
1034 }
1035 }
1036}
1037
1038impl TxPoolService {
1039 async fn info(&self) -> TxPoolInfo {
1041 let tx_pool = self.tx_pool.read().await;
1042 let orphan = self.orphan.read().await;
1043 let verify_queue = self.verify_queue.read().await;
1044 let tip_header = tx_pool.snapshot.tip_header();
1045 TxPoolInfo {
1046 tip_hash: tip_header.hash(),
1047 tip_number: tip_header.number(),
1048 pending_size: tx_pool.pool_map.pending_size(),
1049 proposed_size: tx_pool.pool_map.proposed_size(),
1050 orphan_size: orphan.len(),
1051 total_tx_size: tx_pool.pool_map.total_tx_size,
1052 total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1053 min_fee_rate: self.tx_pool_config.min_fee_rate,
1054 min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1055 last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1056 tx_size_limit: TRANSACTION_SIZE_LIMIT,
1057 max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1058 verify_queue_size: verify_queue.len(),
1059 }
1060 }
1061
1062 async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1064 let tx_pool = self.tx_pool.read().await;
1065 let snapshot = tx_pool.snapshot();
1066 let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1067 let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1068
1069 match provider.cell(&out_point, false) {
1070 CellStatus::Live(mut cell_meta) => {
1071 if eager_load {
1072 if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1073 cell_meta.mem_cell_data = Some(data);
1074 cell_meta.mem_cell_data_hash = Some(data_hash);
1075 }
1076 }
1077 CellStatus::live_cell(cell_meta)
1078 }
1079 _ => CellStatus::Unknown,
1080 }
1081 }
1082
1083 pub fn should_notify_block_assembler(&self) -> bool {
1084 self.block_assembler.is_some()
1085 }
1086
1087 pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1088 if let Some(ref block_assembler) = self.block_assembler {
1089 {
1090 block_assembler.candidate_uncles.lock().await.insert(uncle);
1091 }
1092 if self
1093 .block_assembler_sender
1094 .send(BlockAssemblerMessage::Uncle)
1095 .await
1096 .is_err()
1097 {
1098 error!("block_assembler receiver dropped");
1099 }
1100 }
1101 }
1102
1103 pub async fn update_block_assembler_before_tx_pool_reorg(
1104 &self,
1105 detached_blocks: VecDeque<BlockView>,
1106 snapshot: Arc<Snapshot>,
1107 ) {
1108 if let Some(ref block_assembler) = self.block_assembler {
1109 {
1110 let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1111 for detached_block in detached_blocks {
1112 candidate_uncles.insert(detached_block.as_uncle());
1113 }
1114 }
1115
1116 if let Err(e) = block_assembler.update_blank(snapshot).await {
1117 error!("block_assembler update_blank error {}", e);
1118 }
1119 block_assembler.notify().await;
1120 }
1121 }
1122
1123 pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1124 if let Some(ref block_assembler) = self.block_assembler {
1125 if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1126 error!("block_assembler update failed {:?}", e);
1127 }
1128 block_assembler.notify().await;
1129 }
1130 }
1131
1132 #[cfg(feature = "internal")]
1133 pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1134 {
1135 let mut tx_pool = self.tx_pool.write().await;
1136 match target {
1137 PlugTarget::Pending => {
1138 for entry in entries {
1139 tx_pool
1140 .add_pending(entry)
1141 .expect("Plug entry add_pending error");
1142 }
1143 }
1144 PlugTarget::Proposed => {
1145 for entry in entries {
1146 tx_pool
1147 .add_proposed(entry)
1148 .expect("Plug entry add_proposed error");
1149 }
1150 }
1151 };
1152 }
1153
1154 if self.should_notify_block_assembler() {
1155 let msg = match target {
1156 PlugTarget::Pending => BlockAssemblerMessage::Pending,
1157 PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1158 };
1159 if self.block_assembler_sender.send(msg).await.is_err() {
1160 error!("block_assembler receiver dropped");
1161 }
1162 }
1163 }
1164
1165 pub fn after_delay(&self) -> bool {
1166 self.after_delay.load(Ordering::Acquire)
1167 }
1168
1169 pub fn set_after_delay_true(&self) {
1170 self.after_delay.store(true, Ordering::Release);
1171 }
1172}