1use crate::block_assembler::{self, BlockAssembler};
4use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
5use crate::component::orphan::OrphanPool;
6use crate::component::pool_map::{PoolEntry, Status};
7use crate::component::verify_queue::VerifyQueue;
8use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
9use crate::pool::TxPool;
10use crate::verify_mgr::VerifyMgr;
11use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
12use ckb_async_runtime::Handle;
13use ckb_chain_spec::consensus::Consensus;
14use ckb_channel::oneshot;
15use ckb_error::AnyError;
16use ckb_fee_estimator::FeeEstimator;
17use ckb_jsonrpc_types::BlockTemplate;
18use ckb_logger::error;
19use ckb_logger::info;
20use ckb_network::{NetworkController, PeerIndex};
21use ckb_script::ChunkCommand;
22use ckb_snapshot::Snapshot;
23use ckb_stop_handler::new_tokio_exit_rx;
24use ckb_store::ChainStore;
25use ckb_types::{
26 core::{
27 BlockView, Cycle, EstimateMode, FeeRate, TransactionView, UncleBlockView, Version,
28 cell::{CellProvider, CellStatus, OverlayCellProvider},
29 tx_pool::{
30 EntryCompleted, PoolTxDetailInfo, Reject, TRANSACTION_SIZE_LIMIT,
31 TransactionWithStatus, TxPoolEntryInfo, TxPoolIds, TxPoolInfo, TxStatus,
32 },
33 },
34 packed::{Byte32, OutPoint, ProposalShortId},
35};
36use ckb_util::LinkedHashSet;
37use ckb_verification::cache::TxVerificationCache;
38use std::collections::{HashMap, HashSet, VecDeque};
39use std::sync::{
40 Arc,
41 atomic::{AtomicBool, Ordering},
42};
43use std::time::Duration;
44use tokio::sync::watch;
45use tokio::sync::{RwLock, mpsc};
46use tokio::task::block_in_place;
47use tokio_util::sync::CancellationToken;
48
49use crate::pool_cell::PoolCell;
50#[cfg(feature = "internal")]
51use crate::{component::entry::TxEntry, process::PlugTarget};
52
53pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 512;
54pub(crate) const BLOCK_ASSEMBLER_CHANNEL_SIZE: usize = 100;
55
56pub(crate) struct Request<A, R> {
57 pub responder: oneshot::Sender<R>,
58 pub arguments: A,
59}
60
61impl<A, R> Request<A, R> {
62 pub(crate) fn call(arguments: A, responder: oneshot::Sender<R>) -> Request<A, R> {
63 Request {
64 responder,
65 arguments,
66 }
67 }
68}
69
70pub(crate) struct Notify<A> {
71 pub arguments: A,
72}
73
74impl<A> Notify<A> {
75 pub(crate) fn new(arguments: A) -> Notify<A> {
76 Notify { arguments }
77 }
78}
79
80pub(crate) type BlockTemplateResult = Result<BlockTemplate, AnyError>;
81type BlockTemplateArgs = (Option<u64>, Option<u64>, Option<Version>);
82
83pub(crate) type SubmitTxResult = Result<(), Reject>;
84
85pub(crate) type TestAcceptTxResult = Result<EntryCompleted, Reject>;
86
87type GetTxStatusResult = Result<(TxStatus, Option<Cycle>), AnyError>;
88type GetTransactionWithStatusResult = Result<TransactionWithStatus, AnyError>;
89type FetchTxsWithCyclesResult = Vec<(ProposalShortId, (TransactionView, Cycle))>;
90
91pub(crate) type ChainReorgArgs = (
92 VecDeque<BlockView>,
93 VecDeque<BlockView>,
94 HashSet<ProposalShortId>,
95 Arc<Snapshot>,
96);
97
98pub(crate) type FeeEstimatesResult = Result<FeeRate, AnyError>;
99
100pub(crate) enum Message {
101 BlockTemplate(Request<BlockTemplateArgs, BlockTemplateResult>),
102 SubmitLocalTx(Request<TransactionView, SubmitTxResult>),
103 RemoveLocalTx(Request<Byte32, bool>),
104 TestAcceptTx(Request<TransactionView, TestAcceptTxResult>),
105 SubmitRemoteTx(Request<(TransactionView, Cycle, PeerIndex), ()>),
106 NotifyTxs(Notify<Vec<TransactionView>>),
107 FreshProposalsFilter(Request<Vec<ProposalShortId>, Vec<ProposalShortId>>),
108 FetchTxs(Request<HashSet<ProposalShortId>, HashMap<ProposalShortId, TransactionView>>),
109 FetchTxsWithCycles(Request<HashSet<ProposalShortId>, FetchTxsWithCyclesResult>),
110 GetTxPoolInfo(Request<(), TxPoolInfo>),
111 GetLiveCell(Request<(OutPoint, bool), CellStatus>),
112 GetTxStatus(Request<Byte32, GetTxStatusResult>),
113 GetTransactionWithStatus(Request<Byte32, GetTransactionWithStatusResult>),
114 NewUncle(Notify<UncleBlockView>),
115 ClearPool(Request<Arc<Snapshot>, ()>),
116 ClearVerifyQueue(Request<(), ()>),
117 GetAllEntryInfo(Request<(), TxPoolEntryInfo>),
118 GetAllIds(Request<(), TxPoolIds>),
119 SavePool(Request<(), ()>),
120 GetPoolTxDetails(Request<Byte32, PoolTxDetailInfo>),
121
122 UpdateIBDState(Request<bool, ()>),
123 EstimateFeeRate(Request<(EstimateMode, bool), FeeEstimatesResult>),
124
125 #[cfg(feature = "internal")]
127 PlugEntry(Request<(Vec<TxEntry>, PlugTarget), ()>),
128 #[cfg(feature = "internal")]
129 PackageTxs(Request<Option<u64>, Vec<TxEntry>>),
130 SubmitLocalTestTx(Request<TransactionView, SubmitTxResult>),
131}
132
133#[derive(Debug, Hash, Eq, PartialEq)]
134pub(crate) enum BlockAssemblerMessage {
135 Pending,
136 Proposed,
137 Uncle,
138 Reset(Arc<Snapshot>),
139}
140
141#[derive(Clone)]
145pub struct TxPoolController {
146 sender: mpsc::Sender<Message>,
147 reorg_sender: mpsc::Sender<Notify<ChainReorgArgs>>,
148 chunk_tx: Arc<watch::Sender<ChunkCommand>>,
149 handle: Handle,
150 started: Arc<AtomicBool>,
151}
152
153macro_rules! send_message {
154 ($self:ident, $msg_type:ident, $args:expr) => {{
155 let (responder, response) = oneshot::channel();
156 let request = Request::call($args, responder);
157 $self
158 .sender
159 .try_send(Message::$msg_type(request))
160 .map_err(|e| {
161 let (_m, e) = handle_try_send_error(e);
162 e
163 })?;
164 block_in_place(|| response.recv())
165 .map_err(handle_recv_error)
166 .map_err(Into::into)
167 }};
168}
169
170macro_rules! send_notify {
171 ($self:ident, $msg_type:ident, $args:expr) => {{
172 let notify = Notify::new($args);
173 $self
174 .sender
175 .try_send(Message::$msg_type(notify))
176 .map_err(|e| {
177 let (_m, e) = handle_try_send_error(e);
178 e.into()
179 })
180 }};
181}
182
183impl TxPoolController {
184 pub fn service_started(&self) -> bool {
186 self.started.load(Ordering::Acquire)
187 }
188
189 #[cfg(feature = "internal")]
191 pub fn set_service_started(&self, v: bool) {
192 self.started.store(v, Ordering::Release);
193 }
194
195 pub fn handle(&self) -> &Handle {
197 &self.handle
198 }
199
200 pub fn get_block_template(
202 &self,
203 bytes_limit: Option<u64>,
204 proposals_limit: Option<u64>,
205 max_version: Option<Version>,
206 ) -> Result<BlockTemplateResult, AnyError> {
207 send_message!(
208 self,
209 BlockTemplate,
210 (bytes_limit, proposals_limit, max_version)
211 )
212 }
213
214 pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
216 send_notify!(self, NewUncle, uncle)
217 }
218
219 pub fn update_tx_pool_for_reorg(
224 &self,
225 detached_blocks: VecDeque<BlockView>,
226 attached_blocks: VecDeque<BlockView>,
227 detached_proposal_id: HashSet<ProposalShortId>,
228 snapshot: Arc<Snapshot>,
229 ) -> Result<(), AnyError> {
230 let notify = Notify::new((
231 detached_blocks,
232 attached_blocks,
233 detached_proposal_id,
234 snapshot,
235 ));
236 self.reorg_sender.try_send(notify).map_err(|e| {
237 let (_m, e) = handle_try_send_error(e);
238 e.into()
239 })
240 }
241
242 pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
244 send_message!(self, SubmitLocalTx, tx)
245 }
246
247 pub fn test_accept_tx(&self, tx: TransactionView) -> Result<TestAcceptTxResult, AnyError> {
251 send_message!(self, TestAcceptTx, tx)
252 }
253
254 pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
256 send_message!(self, RemoveLocalTx, tx_hash)
257 }
258
259 pub async fn submit_remote_tx(
261 &self,
262 tx: TransactionView,
263 declared_cycles: Cycle,
264 peer: PeerIndex,
265 ) -> Result<(), AnyError> {
266 send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
267 }
268
269 pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
271 send_notify!(self, NotifyTxs, txs)
272 }
273
274 pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
276 send_message!(self, GetTxPoolInfo, ())
277 }
278
279 pub fn get_live_cell(
281 &self,
282 out_point: OutPoint,
283 with_data: bool,
284 ) -> Result<CellStatus, AnyError> {
285 send_message!(self, GetLiveCell, (out_point, with_data))
286 }
287
288 pub fn fresh_proposals_filter(
290 &self,
291 proposals: Vec<ProposalShortId>,
292 ) -> Result<Vec<ProposalShortId>, AnyError> {
293 send_message!(self, FreshProposalsFilter, proposals)
294 }
295
296 pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
298 send_message!(self, GetTxStatus, hash)
299 }
300
301 pub fn get_transaction_with_status(
303 &self,
304 hash: Byte32,
305 ) -> Result<GetTransactionWithStatusResult, AnyError> {
306 send_message!(self, GetTransactionWithStatus, hash)
307 }
308
309 pub fn fetch_txs(
312 &self,
313 short_ids: HashSet<ProposalShortId>,
314 ) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
315 send_message!(self, FetchTxs, short_ids)
316 }
317
318 pub fn fetch_txs_with_cycles(
321 &self,
322 short_ids: HashSet<ProposalShortId>,
323 ) -> Result<FetchTxsWithCyclesResult, AnyError> {
324 send_message!(self, FetchTxsWithCycles, short_ids)
325 }
326
327 pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
329 send_message!(self, ClearPool, new_snapshot)
330 }
331
332 pub fn clear_verify_queue(&self) -> Result<(), AnyError> {
334 send_message!(self, ClearVerifyQueue, ())
335 }
336
337 pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
339 send_message!(self, GetAllEntryInfo, ())
340 }
341
342 pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
344 send_message!(self, GetAllIds, ())
345 }
346
347 pub fn get_tx_detail(&self, tx_hash: Byte32) -> Result<PoolTxDetailInfo, AnyError> {
349 send_message!(self, GetPoolTxDetails, tx_hash)
350 }
351
352 pub fn save_pool(&self) -> Result<(), AnyError> {
354 info!("Please be patient, tx-pool are saving data into disk ...");
355 send_message!(self, SavePool, ())
356 }
357
358 pub fn update_ibd_state(&self, in_ibd: bool) -> Result<(), AnyError> {
360 send_message!(self, UpdateIBDState, in_ibd)
361 }
362
363 pub fn estimate_fee_rate(
365 &self,
366 estimate_mode: EstimateMode,
367 enable_fallback: bool,
368 ) -> Result<FeeEstimatesResult, AnyError> {
369 send_message!(self, EstimateFeeRate, (estimate_mode, enable_fallback))
370 }
371
372 pub fn suspend_chunk_process(&self) -> Result<(), AnyError> {
374 self.chunk_tx
376 .send(ChunkCommand::Suspend)
377 .map_err(handle_send_cmd_error)
378 .map_err(Into::into)
379 }
380
381 pub fn continue_chunk_process(&self) -> Result<(), AnyError> {
383 self.chunk_tx
385 .send(ChunkCommand::Resume)
386 .map_err(handle_send_cmd_error)
387 .map_err(Into::into)
388 }
389
390 fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
392 if !txs.is_empty() {
393 info!("Loading persistent tx-pool data, total {} txs", txs.len());
394 let mut failed_txs = 0;
395 for tx in txs {
396 if self.submit_local_tx(tx)?.is_err() {
397 failed_txs += 1;
398 }
399 }
400 if failed_txs == 0 {
401 info!("Persistent tx-pool data is loaded");
402 } else {
403 info!(
404 "Persistent tx-pool data is loaded, {} stale txs are ignored",
405 failed_txs
406 );
407 }
408 }
409 Ok(())
410 }
411
412 #[cfg(feature = "internal")]
414 pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
415 send_message!(self, PlugEntry, (entries, target))
416 }
417
418 #[cfg(feature = "internal")]
420 pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
421 send_message!(self, PackageTxs, bytes_limit)
422 }
423
424 pub fn submit_local_test_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
426 send_message!(self, SubmitLocalTestTx, tx)
427 }
428}
429
430pub struct TxPoolServiceBuilder {
432 pub(crate) tx_pool_config: TxPoolConfig,
433 pub(crate) tx_pool_controller: TxPoolController,
434 pub(crate) snapshot: Arc<Snapshot>,
435 pub(crate) block_assembler: Option<BlockAssembler>,
436 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
437 pub(crate) callbacks: Callbacks,
438 pub(crate) receiver: mpsc::Receiver<Message>,
439 pub(crate) reorg_receiver: mpsc::Receiver<Notify<ChainReorgArgs>>,
440 pub(crate) signal_receiver: CancellationToken,
441 pub(crate) handle: Handle,
442 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
443 pub(crate) chunk_rx: watch::Receiver<ChunkCommand>,
444 pub(crate) started: Arc<AtomicBool>,
445 pub(crate) block_assembler_channel: (
446 mpsc::Sender<BlockAssemblerMessage>,
447 mpsc::Receiver<BlockAssemblerMessage>,
448 ),
449 pub(crate) fee_estimator: FeeEstimator,
450}
451
452impl TxPoolServiceBuilder {
453 pub fn new(
455 tx_pool_config: TxPoolConfig,
456 snapshot: Arc<Snapshot>,
457 block_assembler_config: Option<BlockAssemblerConfig>,
458 txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
459 handle: &Handle,
460 tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
461 fee_estimator: FeeEstimator,
462 ) -> (TxPoolServiceBuilder, TxPoolController) {
463 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
464 let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE);
465 let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
466 let signal_receiver: CancellationToken = new_tokio_exit_rx();
467 let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume);
468 let started = Arc::new(AtomicBool::new(false));
469
470 let controller = TxPoolController {
471 sender,
472 reorg_sender,
473 handle: handle.clone(),
474 chunk_tx: Arc::new(chunk_tx),
475 started: Arc::clone(&started),
476 };
477
478 let block_assembler =
479 block_assembler_config.map(|config| BlockAssembler::new(config, Arc::clone(&snapshot)));
480 let builder = TxPoolServiceBuilder {
481 tx_pool_config,
482 tx_pool_controller: controller.clone(),
483 snapshot,
484 block_assembler,
485 txs_verify_cache,
486 callbacks: Callbacks::new(),
487 receiver,
488 reorg_receiver,
489 signal_receiver,
490 handle: handle.clone(),
491 tx_relay_sender,
492 chunk_rx,
493 started,
494 block_assembler_channel,
495 fee_estimator,
496 };
497
498 (builder, controller)
499 }
500
501 pub fn register_pending(&mut self, callback: PendingCallback) {
503 self.callbacks.register_pending(callback);
504 }
505
506 pub fn tx_relay_sender(&self) -> ckb_channel::Sender<TxVerificationResult> {
508 self.tx_relay_sender.clone()
509 }
510
511 pub fn register_proposed(&mut self, callback: ProposedCallback) {
513 self.callbacks.register_proposed(callback);
514 }
515
516 pub fn register_reject(&mut self, callback: RejectCallback) {
518 self.callbacks.register_reject(callback);
519 }
520
521 pub fn start(self, network: NetworkController) {
523 let consensus = self.snapshot.cloned_consensus();
524
525 let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(
526 self.tx_pool_config.max_tx_verify_cycles,
527 )));
528
529 let tx_pool = TxPool::new(self.tx_pool_config, self.snapshot);
530 let txs = match tx_pool.load_from_file() {
531 Ok(txs) => txs,
532 Err(e) => {
533 error!("{}", e.to_string());
534 error!("Failed to load txs from tx-pool persistent data file, all txs are ignored");
535 Vec::new()
536 }
537 };
538
539 let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
540 let service = TxPoolService {
541 tx_pool_config: Arc::new(tx_pool.config.clone()),
542 tx_pool: Arc::new(RwLock::new(tx_pool)),
543 orphan: Arc::new(RwLock::new(OrphanPool::new())),
544 block_assembler: self.block_assembler,
545 txs_verify_cache: self.txs_verify_cache,
546 callbacks: Arc::new(self.callbacks),
547 tx_relay_sender: self.tx_relay_sender,
548 block_assembler_sender,
549 verify_queue: Arc::clone(&verify_queue),
550 network,
551 consensus,
552 fee_estimator: self.fee_estimator,
553 };
554
555 let mut verify_mgr =
556 VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
557 self.handle.spawn(async move { verify_mgr.run().await });
558
559 let mut receiver = self.receiver;
560 let mut reorg_receiver = self.reorg_receiver;
561 let handle_clone = self.handle.clone();
562
563 let process_service = service.clone();
564 let signal_receiver = self.signal_receiver.clone();
565 self.handle.spawn(async move {
566 loop {
567 tokio::select! {
568 Some(message) = receiver.recv() => {
569 let service_clone = process_service.clone();
570 handle_clone.spawn(process(service_clone, message));
571 },
572 _ = signal_receiver.cancelled() => {
573 info!("TxPool is saving, please wait...");
574 process_service.save_pool().await;
575 info!("TxPool process_service exit now");
576 break
577 },
578 else => break,
579 }
580 }
581 });
582
583 let process_service = service.clone();
584 if let Some(ref block_assembler) = service.block_assembler {
585 let signal_receiver = self.signal_receiver.clone();
586 let interval = Duration::from_millis(block_assembler.config.update_interval_millis);
587 if interval.is_zero() {
588 ckb_logger::warn!(
591 "block_assembler.update_interval_millis set to zero interval. \
592 This should only be used for tests, as external notification will be disabled."
593 );
594 self.handle.spawn(async move {
595 loop {
596 tokio::select! {
597 Some(message) = block_assembler_receiver.recv() => {
598 let service_clone = process_service.clone();
599 block_assembler::process(service_clone, &message).await;
600 },
601 _ = signal_receiver.cancelled() => {
602 info!("TxPool block_assembler process service received exit signal, exit now");
603 break
604 },
605 else => break,
606 }
607 }
608 });
609 } else {
610 self.handle.spawn(async move {
611 let mut interval = tokio::time::interval(interval);
612 let mut queue = LinkedHashSet::new();
613 loop {
614 tokio::select! {
615 Some(message) = block_assembler_receiver.recv() => {
616 if let BlockAssemblerMessage::Reset(..) = message {
617 let service_clone = process_service.clone();
618 queue.clear();
619 block_assembler::process(service_clone, &message).await;
620 } else {
621 queue.insert(message);
622 }
623 },
624 _ = interval.tick() => {
625 for message in &queue {
626 let service_clone = process_service.clone();
627 block_assembler::process(service_clone, message).await;
628 }
629 if !queue.is_empty() {
630 if let Some(ref block_assembler) = process_service.block_assembler {
631 block_assembler.notify().await;
632 }
633 }
634 queue.clear();
635 }
636 _ = signal_receiver.cancelled() => {
637 info!("TxPool block_assembler process service received exit signal, exit now");
638 break
639 },
640 else => break,
641 }
642 }
643 });
644 }
645 }
646
647 let signal_receiver = self.signal_receiver;
648 self.handle.spawn(async move {
649 loop {
650 tokio::select! {
651 Some(message) = reorg_receiver.recv() => {
652 let Notify {
653 arguments: (detached_blocks, attached_blocks, detached_proposal_id, snapshot),
654 } = message;
655 let snapshot_clone = Arc::clone(&snapshot);
656 let detached_blocks_clone = detached_blocks.clone();
657 service.update_block_assembler_before_tx_pool_reorg(
658 detached_blocks_clone,
659 snapshot_clone
660 ).await;
661
662 let snapshot_clone = Arc::clone(&snapshot);
663 service
664 .update_tx_pool_for_reorg(
665 detached_blocks,
666 attached_blocks,
667 detached_proposal_id,
668 snapshot_clone,
669 )
670 .await;
671
672 service.update_block_assembler_after_tx_pool_reorg().await;
673 },
674 _ = signal_receiver.cancelled() => {
675 info!("TxPool reorg process service received exit signal, exit now");
676 break
677 },
678 else => break,
679 }
680 }
681 });
682 self.started.store(true, Ordering::Release);
683 if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) {
684 error!("Failed to import persistent txs, cause: {}", err);
685 }
686 }
687}
688
689#[derive(Clone)]
690pub(crate) struct TxPoolService {
691 pub(crate) tx_pool: Arc<RwLock<TxPool>>,
692 pub(crate) orphan: Arc<RwLock<OrphanPool>>,
693 pub(crate) consensus: Arc<Consensus>,
694 pub(crate) tx_pool_config: Arc<TxPoolConfig>,
695 pub(crate) block_assembler: Option<BlockAssembler>,
696 pub(crate) txs_verify_cache: Arc<RwLock<TxVerificationCache>>,
697 pub(crate) callbacks: Arc<Callbacks>,
698 pub(crate) network: NetworkController,
699 pub(crate) tx_relay_sender: ckb_channel::Sender<TxVerificationResult>,
700 pub(crate) verify_queue: Arc<RwLock<VerifyQueue>>,
701 pub(crate) block_assembler_sender: mpsc::Sender<BlockAssemblerMessage>,
702 pub(crate) fee_estimator: FeeEstimator,
703}
704
705pub enum TxVerificationResult {
707 Ok {
709 original_peer: Option<PeerIndex>,
711 tx_hash: Byte32,
713 },
714 UnknownParents {
716 peer: PeerIndex,
718 parents: HashSet<Byte32>,
720 },
721 Reject {
723 tx_hash: Byte32,
725 },
726}
727
728#[allow(clippy::cognitive_complexity)]
729async fn process(mut service: TxPoolService, message: Message) {
730 match message {
731 Message::GetTxPoolInfo(Request { responder, .. }) => {
732 let info = service.info().await;
733 if let Err(e) = responder.send(info) {
734 error!("Responder sending get_tx_pool_info failed {:?}", e);
735 };
736 }
737 Message::GetLiveCell(Request {
738 responder,
739 arguments: (out_point, with_data),
740 }) => {
741 let live_cell_status = service.get_live_cell(out_point, with_data).await;
742 if let Err(e) = responder.send(live_cell_status) {
743 error!("Responder sending get_live_cell failed {:?}", e);
744 };
745 }
746 Message::BlockTemplate(Request {
747 responder,
748 arguments: (_bytes_limit, _proposals_limit, _max_version),
749 }) => {
750 let block_template_result = service.get_block_template().await;
751 if let Err(e) = responder.send(block_template_result) {
752 error!("Responder sending block_template_result failed {:?}", e);
753 };
754 }
755 Message::SubmitLocalTx(Request {
756 responder,
757 arguments: tx,
758 }) => {
759 let result = service.process_tx(tx, None).await.map(|_| ());
760 if let Err(e) = responder.send(result) {
761 error!("Responder sending submit_tx result failed {:?}", e);
762 };
763 }
764 Message::SubmitLocalTestTx(Request {
765 responder,
766 arguments: tx,
767 }) => {
768 let result = service.resumeble_process_tx(tx, None).await.map(|_| ());
769 if let Err(e) = responder.send(result) {
770 error!("Responder sending submit_tx result failed {:?}", e);
771 };
772 }
773 Message::RemoveLocalTx(Request {
774 responder,
775 arguments: tx_hash,
776 }) => {
777 let result = service.remove_tx(tx_hash).await;
778 if let Err(e) = responder.send(result) {
779 error!("Responder sending remove_tx result failed {:?}", e);
780 };
781 }
782 Message::TestAcceptTx(Request {
783 responder,
784 arguments: tx,
785 }) => {
786 let result = service.test_accept_tx(tx).await;
787 if let Err(e) = responder.send(result.map(|r| r.into())) {
788 error!("Responder sending test_accept_tx result failed {:?}", e);
789 };
790 }
791 Message::SubmitRemoteTx(Request {
792 responder,
793 arguments: (tx, declared_cycles, peer),
794 }) => {
795 let _result = service
796 .resumeble_process_tx(tx, Some((declared_cycles, peer)))
797 .await;
798 if let Err(e) = responder.send(()) {
799 error!("Responder sending submit_tx result failed {:?}", e);
800 };
801 }
802 Message::NotifyTxs(Notify { arguments: txs }) => {
803 for tx in txs {
804 let _ret = service.resumeble_process_tx(tx, None).await;
805 }
806 }
807 Message::FreshProposalsFilter(Request {
808 responder,
809 arguments: mut proposals,
810 }) => {
811 let tx_pool = service.tx_pool.read().await;
812 proposals.retain(|id| !tx_pool.contains_proposal_id(id));
813 if let Err(e) = responder.send(proposals) {
814 error!("Responder sending fresh_proposals_filter failed {:?}", e);
815 };
816 }
817 Message::GetTxStatus(Request {
818 responder,
819 arguments: hash,
820 }) => {
821 let id = ProposalShortId::from_tx_hash(&hash);
822 let tx_pool = service.tx_pool.read().await;
823 let ret = if let Some(PoolEntry {
824 status,
825 inner: entry,
826 ..
827 }) = tx_pool.pool_map.get_by_id(&id)
828 {
829 let status = if status == &Status::Proposed {
830 TxStatus::Proposed
831 } else {
832 TxStatus::Pending
833 };
834 Ok((status, Some(entry.cycles)))
835 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
836 let recent_reject_result = recent_reject_db.get(&hash);
837 if let Ok(recent_reject) = recent_reject_result {
838 if let Some(record) = recent_reject {
839 Ok((TxStatus::Rejected(record), None))
840 } else {
841 Ok((TxStatus::Unknown, None))
842 }
843 } else {
844 Err(recent_reject_result.unwrap_err())
845 }
846 } else {
847 Ok((TxStatus::Unknown, None))
848 };
849
850 if let Err(e) = responder.send(ret) {
851 error!("Responder sending get_tx_status failed {:?}", e)
852 };
853 }
854 Message::GetTransactionWithStatus(Request {
855 responder,
856 arguments: hash,
857 }) => {
858 let id = ProposalShortId::from_tx_hash(&hash);
859 let tx_pool = service.tx_pool.read().await;
860 let ret = if let Some(PoolEntry {
861 status,
862 inner: entry,
863 ..
864 }) = tx_pool.pool_map.get_by_id(&id)
865 {
866 let (tx_status, min_replace_fee) = if status == &Status::Proposed {
867 (TxStatus::Proposed, None)
868 } else {
869 (TxStatus::Pending, tx_pool.min_replace_fee(entry))
870 };
871 Ok(TransactionWithStatus::with_status(
872 Some(entry.transaction().clone()),
873 entry.cycles,
874 entry.timestamp,
875 tx_status,
876 Some(entry.fee),
877 min_replace_fee,
878 ))
879 } else if let Some(ref recent_reject_db) = tx_pool.recent_reject {
880 match recent_reject_db.get(&hash) {
881 Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)),
882 Ok(_) => Ok(TransactionWithStatus::with_unknown()),
883 Err(err) => Err(err),
884 }
885 } else {
886 Ok(TransactionWithStatus::with_unknown())
887 };
888
889 if let Err(e) = responder.send(ret) {
890 error!("Responder sending get_tx_status failed {:?}", e)
891 };
892 }
893 Message::FetchTxs(Request {
894 responder,
895 arguments: short_ids,
896 }) => {
897 let tx_pool = service.tx_pool.read().await;
898 let orphan = service.orphan.read().await;
899 let txs = short_ids
900 .into_iter()
901 .filter_map(|short_id| {
902 tx_pool
903 .get_tx_from_pool_or_store(&short_id)
904 .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
905 .map(|tx| (short_id, tx))
906 })
907 .collect();
908 if let Err(e) = responder.send(txs) {
909 error!("Responder sending fetch_txs failed {:?}", e);
910 };
911 }
912 Message::FetchTxsWithCycles(Request {
913 responder,
914 arguments: short_ids,
915 }) => {
916 let tx_pool = service.tx_pool.read().await;
917 let txs = short_ids
918 .into_iter()
919 .filter_map(|short_id| {
920 tx_pool
921 .get_tx_with_cycles(&short_id)
922 .map(|(tx, cycles)| (short_id, (tx, cycles)))
923 })
924 .collect();
925 if let Err(e) = responder.send(txs) {
926 error!("Responder sending fetch_txs_with_cycles failed {:?}", e);
927 };
928 }
929 Message::NewUncle(Notify { arguments: uncle }) => {
930 service.receive_candidate_uncle(uncle).await;
931 }
932 Message::ClearPool(Request {
933 responder,
934 arguments: new_snapshot,
935 }) => {
936 service.clear_pool(new_snapshot).await;
937 if let Err(e) = responder.send(()) {
938 error!("Responder sending clear_pool failed {:?}", e)
939 };
940 }
941 Message::ClearVerifyQueue(Request { responder, .. }) => {
942 service.verify_queue.write().await.clear();
943 if let Err(e) = responder.send(()) {
944 error!("Responder sending clear_verify_queue failed {:?}", e)
945 };
946 }
947 Message::GetPoolTxDetails(Request {
948 responder,
949 arguments: tx_hash,
950 }) => {
951 let tx_pool = service.tx_pool.read().await;
952 let id = ProposalShortId::from_tx_hash(&tx_hash);
953 let tx_details = tx_pool
954 .get_tx_detail(&id)
955 .unwrap_or(PoolTxDetailInfo::with_unknown());
956 if let Err(e) = responder.send(tx_details) {
957 error!("responder send get_pool_tx_details failed {:?}", e)
958 };
959 }
960 Message::GetAllEntryInfo(Request { responder, .. }) => {
961 let tx_pool = service.tx_pool.read().await;
962 let info = tx_pool.get_all_entry_info();
963 if let Err(e) = responder.send(info) {
964 error!("Responder sending get_all_entry_info failed {:?}", e)
965 };
966 }
967 Message::GetAllIds(Request { responder, .. }) => {
968 let tx_pool = service.tx_pool.read().await;
969 let ids = tx_pool.get_ids();
970 if let Err(e) = responder.send(ids) {
971 error!("Responder sending get_ids failed {:?}", e)
972 };
973 }
974 Message::SavePool(Request { responder, .. }) => {
975 service.save_pool().await;
976 if let Err(e) = responder.send(()) {
977 error!("Responder sending save_pool failed {:?}", e)
978 };
979 }
980 Message::UpdateIBDState(Request {
981 responder,
982 arguments: in_ibd,
983 }) => {
984 service.update_ibd_state(in_ibd).await;
985 if let Err(e) = responder.send(()) {
986 error!("Responder sending update_ibd_state failed {:?}", e)
987 };
988 }
989 Message::EstimateFeeRate(Request {
990 responder,
991 arguments: (estimate_mode, enable_fallback),
992 }) => {
993 let fee_estimates_result = service
994 .estimate_fee_rate(estimate_mode, enable_fallback)
995 .await;
996 if let Err(e) = responder.send(fee_estimates_result) {
997 error!("Responder sending fee_estimates_result failed {:?}", e)
998 };
999 }
1000 #[cfg(feature = "internal")]
1001 Message::PlugEntry(Request {
1002 responder,
1003 arguments: (entries, target),
1004 }) => {
1005 service.plug_entry(entries, target).await;
1006
1007 if let Err(e) = responder.send(()) {
1008 error!("Responder sending plug_entry failed {:?}", e);
1009 };
1010 }
1011 #[cfg(feature = "internal")]
1012 Message::PackageTxs(Request {
1013 responder,
1014 arguments: bytes_limit,
1015 }) => {
1016 let max_block_cycles = service.consensus.max_block_cycles();
1017 let max_block_bytes = service.consensus.max_block_bytes();
1018 let tx_pool = service.tx_pool.read().await;
1019 let (txs, _size, _cycles) = tx_pool.package_txs(
1020 max_block_cycles,
1021 bytes_limit.unwrap_or(max_block_bytes) as usize,
1022 );
1023 if let Err(e) = responder.send(txs) {
1024 error!("Responder sending plug_entry failed {:?}", e);
1025 };
1026 }
1027 }
1028}
1029
1030impl TxPoolService {
1031 async fn info(&self) -> TxPoolInfo {
1033 let tx_pool = self.tx_pool.read().await;
1034 let orphan = self.orphan.read().await;
1035 let verify_queue = self.verify_queue.read().await;
1036 let tip_header = tx_pool.snapshot.tip_header();
1037 TxPoolInfo {
1038 tip_hash: tip_header.hash(),
1039 tip_number: tip_header.number(),
1040 pending_size: tx_pool.pool_map.pending_size(),
1041 proposed_size: tx_pool.pool_map.proposed_size(),
1042 orphan_size: orphan.len(),
1043 total_tx_size: tx_pool.pool_map.total_tx_size,
1044 total_tx_cycles: tx_pool.pool_map.total_tx_cycles,
1045 min_fee_rate: self.tx_pool_config.min_fee_rate,
1046 min_rbf_rate: self.tx_pool_config.min_rbf_rate,
1047 last_txs_updated_at: tx_pool.pool_map.get_max_update_time(),
1048 tx_size_limit: TRANSACTION_SIZE_LIMIT,
1049 max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64,
1050 verify_queue_size: verify_queue.len(),
1051 }
1052 }
1053
1054 async fn get_live_cell(&self, out_point: OutPoint, eager_load: bool) -> CellStatus {
1056 let tx_pool = self.tx_pool.read().await;
1057 let snapshot = tx_pool.snapshot();
1058 let pool_cell = PoolCell::new(&tx_pool.pool_map, false);
1059 let provider = OverlayCellProvider::new(&pool_cell, snapshot);
1060
1061 match provider.cell(&out_point, false) {
1062 CellStatus::Live(mut cell_meta) => {
1063 if eager_load {
1064 if let Some((data, data_hash)) = snapshot.get_cell_data(&out_point) {
1065 cell_meta.mem_cell_data = Some(data);
1066 cell_meta.mem_cell_data_hash = Some(data_hash);
1067 }
1068 }
1069 CellStatus::live_cell(cell_meta)
1070 }
1071 _ => CellStatus::Unknown,
1072 }
1073 }
1074
1075 pub fn should_notify_block_assembler(&self) -> bool {
1076 self.block_assembler.is_some()
1077 }
1078
1079 pub async fn receive_candidate_uncle(&self, uncle: UncleBlockView) {
1080 if let Some(ref block_assembler) = self.block_assembler {
1081 {
1082 block_assembler.candidate_uncles.lock().await.insert(uncle);
1083 }
1084 if self
1085 .block_assembler_sender
1086 .send(BlockAssemblerMessage::Uncle)
1087 .await
1088 .is_err()
1089 {
1090 error!("block_assembler receiver dropped");
1091 }
1092 }
1093 }
1094
1095 pub async fn update_block_assembler_before_tx_pool_reorg(
1096 &self,
1097 detached_blocks: VecDeque<BlockView>,
1098 snapshot: Arc<Snapshot>,
1099 ) {
1100 if let Some(ref block_assembler) = self.block_assembler {
1101 {
1102 let mut candidate_uncles = block_assembler.candidate_uncles.lock().await;
1103 for detached_block in detached_blocks {
1104 candidate_uncles.insert(detached_block.as_uncle());
1105 }
1106 }
1107
1108 if let Err(e) = block_assembler.update_blank(snapshot).await {
1109 error!("block_assembler update_blank error {}", e);
1110 }
1111 block_assembler.notify().await;
1112 }
1113 }
1114
1115 pub async fn update_block_assembler_after_tx_pool_reorg(&self) {
1116 if let Some(ref block_assembler) = self.block_assembler {
1117 if let Err(e) = block_assembler.update_full(&self.tx_pool).await {
1118 error!("block_assembler update failed {:?}", e);
1119 }
1120 block_assembler.notify().await;
1121 }
1122 }
1123
1124 #[cfg(feature = "internal")]
1125 pub async fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) {
1126 {
1127 let mut tx_pool = self.tx_pool.write().await;
1128 match target {
1129 PlugTarget::Pending => {
1130 for entry in entries {
1131 tx_pool
1132 .add_pending(entry)
1133 .expect("Plug entry add_pending error");
1134 }
1135 }
1136 PlugTarget::Proposed => {
1137 for entry in entries {
1138 tx_pool
1139 .add_proposed(entry)
1140 .expect("Plug entry add_proposed error");
1141 }
1142 }
1143 };
1144 }
1145
1146 if self.should_notify_block_assembler() {
1147 let msg = match target {
1148 PlugTarget::Pending => BlockAssemblerMessage::Pending,
1149 PlugTarget::Proposed => BlockAssemblerMessage::Proposed,
1150 };
1151 if self.block_assembler_sender.send(msg).await.is_err() {
1152 error!("block_assembler receiver dropped");
1153 }
1154 }
1155 }
1156}