1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10 after_delay_window, check_tx_fee, check_txid_collision, is_missing_input,
11 non_contextual_verify, time_relative_verify, verify_rtx,
12};
13use ckb_chain_spec::consensus::MAX_BLOCK_PROPOSALS_LIMIT;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_jsonrpc_types::BlockTemplate;
17use ckb_logger::Level::Trace;
18use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
19use ckb_network::PeerIndex;
20use ckb_script::ChunkCommand;
21use ckb_snapshot::Snapshot;
22use ckb_types::core::error::OutPointError;
23use ckb_types::{
24 core::{
25 BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
26 cell::ResolvedTransaction,
27 },
28 packed::{Byte32, ProposalShortId},
29};
30use ckb_util::LinkedHashSet;
31use ckb_verification::{
32 TxVerifyEnv,
33 cache::{CacheEntry, Completed},
34};
35use std::collections::HashSet;
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::watch;
40
41const DELAY_LIMIT: usize = 1_500 * 21; pub enum PlugTarget {
45 Pending,
47 Proposed,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TxStatus {
53 Fresh,
54 Gap,
55 Proposed,
56}
57
58impl TxStatus {
59 fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
60 match self {
61 TxStatus::Fresh => TxVerifyEnv::new_submit(header),
62 TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
63 TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
64 }
65 }
66}
67
68impl TxPoolService {
69 pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
70 if let Some(ref block_assembler) = self.block_assembler {
71 Ok(block_assembler.get_current().await)
72 } else {
73 Err(InternalErrorKind::Config
74 .other("BlockAssembler disabled")
75 .into())
76 }
77 }
78
79 pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
80 let guard = self.txs_verify_cache.read().await;
81 guard.peek(&tx.witness_hash()).cloned()
82 }
83
84 async fn fetch_txs_verify_cache(
85 &self,
86 txs: impl Iterator<Item = &TransactionView>,
87 ) -> HashMap<Byte32, CacheEntry> {
88 let guard = self.txs_verify_cache.read().await;
89 txs.filter_map(|tx| {
90 let wtx_hash = tx.witness_hash();
91 guard
92 .peek(&wtx_hash)
93 .cloned()
94 .map(|value| (wtx_hash, value))
95 })
96 .collect()
97 }
98
99 pub(crate) async fn submit_entry(
100 &self,
101 pre_resolve_tip: Byte32,
102 entry: TxEntry,
103 mut status: TxStatus,
104 ) -> (Result<(), Reject>, Arc<Snapshot>) {
105 let (ret, snapshot) = self
106 .with_tx_pool_write_lock(move |tx_pool, snapshot| {
107 let conflicts = if tx_pool.enable_rbf() {
109 tx_pool.check_rbf(&snapshot, &entry)?
110 } else {
111 let conflicted_outpoint =
114 tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
115 if let Some(outpoint) = conflicted_outpoint {
116 return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
117 }
118 HashSet::new()
119 };
120
121 let tip_hash = snapshot.tip_hash();
123 if pre_resolve_tip != tip_hash {
124 debug!(
125 "submit_entry {} context changed. previous:{} now:{}",
126 entry.proposal_short_id(),
127 pre_resolve_tip,
128 tip_hash
129 );
130
131 status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
133
134 let tip_header = snapshot.tip_header();
135 let tx_env = status.with_env(tip_header);
136 time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
137 }
138
139 let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
140 let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
141
142 for evict in evicted {
145 let reject = Reject::Invalidated(format!(
146 "invalidated by tx {}",
147 evict.transaction().hash()
148 ));
149 self.callbacks.call_reject(tx_pool, &evict, reject);
150 }
151
152 tx_pool.remove_conflict(&entry.proposal_short_id());
153 tx_pool
154 .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
155 .map_or(Ok(()), Err)?;
156
157 if !may_recovered_txs.is_empty() {
158 let self_clone = self.clone();
159 tokio::spawn(async move {
160 let mut queue = self_clone.verify_queue.write().await;
162 for tx in may_recovered_txs {
163 debug!("recover back: {:?}", tx.proposal_short_id());
164 let _ = queue.add_tx(tx, None);
165 }
166 });
167 }
168 Ok(())
169 })
170 .await;
171
172 (ret, snapshot)
173 }
174
175 pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
176 if self.should_notify_block_assembler() {
177 match status {
178 TxStatus::Fresh => {
179 if self
180 .block_assembler_sender
181 .send(BlockAssemblerMessage::Pending)
182 .await
183 .is_err()
184 {
185 error!("block_assembler receiver dropped");
186 }
187 }
188 TxStatus::Proposed => {
189 if self
190 .block_assembler_sender
191 .send(BlockAssemblerMessage::Proposed)
192 .await
193 .is_err()
194 {
195 error!("block_assembler receiver dropped");
196 }
197 }
198 _ => {}
199 }
200 }
201 }
202
203 fn process_rbf(
206 &self,
207 tx_pool: &mut TxPool,
208 entry: &TxEntry,
209 conflicts: &HashSet<ProposalShortId>,
210 ) -> Vec<TransactionView> {
211 let mut may_recovered_txs = vec![];
212 let mut available_inputs = HashSet::new();
213
214 if conflicts.is_empty() {
215 return may_recovered_txs;
216 }
217
218 let all_removed: Vec<_> = conflicts
219 .iter()
220 .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
221 .collect();
222
223 available_inputs.extend(
224 all_removed
225 .iter()
226 .flat_map(|removed| removed.transaction().input_pts_iter()),
227 );
228
229 for input in entry.transaction().input_pts_iter() {
230 available_inputs.remove(&input);
231 }
232
233 may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
234 for old in all_removed {
235 debug!(
236 "remove conflict tx {} for RBF by new tx {}",
237 old.transaction().hash(),
238 entry.transaction().hash()
239 );
240 let reject =
241 Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
242
243 tx_pool.record_conflict(old.transaction().clone());
245 self.callbacks.call_reject(tx_pool, &old, reject);
247 }
248 assert!(!may_recovered_txs.contains(entry.transaction()));
249 may_recovered_txs
250 }
251
252 pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
253 let queue = self.verify_queue.read().await;
254 queue.contains_key(&tx.proposal_short_id())
255 }
256
257 pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
258 let orphan = self.orphan.read().await;
259 orphan.contains_key(&tx.proposal_short_id())
260 }
261
262 pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
263 &self,
264 mut f: F,
265 ) -> (U, Arc<Snapshot>) {
266 let tx_pool = self.tx_pool.read().await;
267 let snapshot = tx_pool.cloned_snapshot();
268
269 let ret = f(&tx_pool, Arc::clone(&snapshot));
270 (ret, snapshot)
271 }
272
273 pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
274 &self,
275 mut f: F,
276 ) -> (U, Arc<Snapshot>) {
277 let mut tx_pool = self.tx_pool.write().await;
278 let snapshot = tx_pool.cloned_snapshot();
279
280 let ret = f(&mut tx_pool, Arc::clone(&snapshot));
281 (ret, snapshot)
282 }
283
284 pub(crate) async fn pre_check(
285 &self,
286 tx: &TransactionView,
287 ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
288 let tx_size = tx.data().serialized_size_in_block();
290
291 let (ret, snapshot) = self
292 .with_tx_pool_read_lock(|tx_pool, snapshot| {
293 let tip_hash = snapshot.tip_hash();
294
295 check_txid_collision(tx_pool, tx)?;
298
299 let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
302 match res {
303 Ok((rtx, status)) => {
304 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
305 Ok((tip_hash, rtx, status, fee, tx_size))
306 }
307 Err(Reject::Resolve(OutPointError::Dead(out))) => {
308 let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
309 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
310 let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
311 if conflicts.is_none() {
312 error!(
315 "{} is resolved as Dead, but there is no conflicted tx",
316 rtx.transaction.proposal_short_id()
317 );
318 return Err(Reject::Resolve(OutPointError::Dead(out)));
319 }
320 Ok((tip_hash, rtx, status, fee, tx_size))
325 }
326 Err(err) => Err(err),
327 }
328 })
329 .await;
330 (ret, snapshot)
331 }
332
333 pub(crate) async fn non_contextual_verify(
334 &self,
335 tx: &TransactionView,
336 remote: Option<(Cycle, PeerIndex)>,
337 ) -> Result<(), Reject> {
338 if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
339 if reject.is_malformed_tx() {
340 if let Some(remote) = remote {
341 self.ban_malformed(remote.1, format!("reject {reject}"))
342 .await;
343 }
344 }
345 return Err(reject);
346 }
347 Ok(())
348 }
349
350 pub(crate) async fn resumeble_process_tx(
351 &self,
352 tx: TransactionView,
353 remote: Option<(Cycle, PeerIndex)>,
354 ) -> Result<bool, Reject> {
355 self.non_contextual_verify(&tx, remote).await?;
357
358 if self.orphan_contains(&tx).await {
359 debug!("reject tx {} already in orphan pool", tx.hash());
360 return Err(Reject::Duplicated(tx.hash()));
361 }
362
363 if self.verify_queue_contains(&tx).await {
364 return Err(Reject::Duplicated(tx.hash()));
365 }
366 self.enqueue_verify_queue(tx, remote).await
367 }
368
369 pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
370 self.non_contextual_verify(&tx, None).await?;
372
373 if self.verify_queue_contains(&tx).await {
374 return Err(Reject::Duplicated(tx.hash()));
375 }
376
377 if self.orphan_contains(&tx).await {
378 debug!("reject tx {} already in orphan pool", tx.hash());
379 return Err(Reject::Duplicated(tx.hash()));
380 }
381 self._test_accept_tx(tx.clone()).await
382 }
383
384 pub(crate) async fn process_tx(
385 &self,
386 tx: TransactionView,
387 remote: Option<(Cycle, PeerIndex)>,
388 ) -> Result<Completed, Reject> {
389 self.non_contextual_verify(&tx, remote).await?;
391
392 if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
393 return Err(Reject::Duplicated(tx.hash()));
394 }
395
396 if let Some((ret, snapshot)) = self
397 ._process_tx(tx.clone(), remote.map(|r| r.0), None)
398 .await
399 {
400 self.after_process(tx, remote, &snapshot, &ret).await;
401 ret
402 } else {
403 Ok(Completed {
405 cycles: 0,
406 fee: Capacity::zero(),
407 })
408 }
409 }
410
411 pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
412 let mut tx_pool = self.tx_pool.write().await;
413 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
414 if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
415 error!(
416 "Failed to record recent_reject {} {} {}",
417 tx_hash, reject, e
418 );
419 }
420 }
421 }
422
423 pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
424 let id = ProposalShortId::from_tx_hash(&tx_hash);
425 {
426 let mut queue = self.verify_queue.write().await;
427 if queue.remove_tx(&id).is_some() {
428 return true;
429 }
430 }
431 {
432 let mut orphan = self.orphan.write().await;
433 if orphan.remove_orphan_tx(&id).is_some() {
434 return true;
435 }
436 }
437 let mut tx_pool = self.tx_pool.write().await;
438 tx_pool.remove_tx(&id)
439 }
440
441 pub(crate) async fn after_process(
442 &self,
443 tx: TransactionView,
444 remote: Option<(Cycle, PeerIndex)>,
445 snapshot: &Snapshot,
446 ret: &Result<Completed, Reject>,
447 ) {
448 let tx_hash = tx.hash();
449
450 let with_vm_2023 = {
453 let epoch = snapshot
454 .tip_header()
455 .epoch()
456 .minimum_epoch_number_after_n_blocks(1);
457
458 self.consensus
459 .hardfork_switch
460 .ckb2023
461 .is_vm_version_2_and_syscalls_3_enabled(epoch)
462 };
463
464 if log_enabled_target!("ckb_tx_monitor", Trace) {
466 if let Ok(c) = ret {
467 trace_target!(
468 "ckb_tx_monitor",
469 r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
470 tx_hash,
471 c.cycles
472 );
473 }
474 }
475
476 if matches!(
477 ret,
478 Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
479 ) {
480 let mut tx_pool = self.tx_pool.write().await;
481 if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
482 tx_pool.record_conflict(tx.clone());
483 }
484 }
485
486 match remote {
487 Some((declared_cycle, peer)) => match ret {
488 Ok(_) => {
489 debug!(
490 "after_process remote send_result_to_relayer {} {}",
491 tx_hash, peer
492 );
493 self.send_result_to_relayer(TxVerificationResult::Ok {
494 original_peer: Some(peer),
495 with_vm_2023,
496 tx_hash,
497 });
498 self.process_orphan_tx(&tx).await;
499 }
500 Err(reject) => {
501 info!(
502 "after_process {} {} remote reject: {} ",
503 tx_hash, peer, reject
504 );
505 if is_missing_input(reject) {
506 self.send_result_to_relayer(TxVerificationResult::UnknownParents {
507 peer,
508 parents: tx.unique_parents(),
509 });
510 self.add_orphan(tx, peer, declared_cycle).await;
511 } else {
512 if reject.is_malformed_tx() {
513 self.ban_malformed(peer, format!("reject {reject}")).await;
514 }
515 if reject.is_allowed_relay() {
516 self.send_result_to_relayer(TxVerificationResult::Reject {
517 tx_hash: tx_hash.clone(),
518 });
519 }
520 if reject.should_recorded() {
521 self.put_recent_reject(&tx_hash, reject).await;
522 }
523 }
524 }
525 },
526 None => {
527 match ret {
528 Ok(_) => {
529 debug!("after_process local send_result_to_relayer {}", tx_hash);
530 self.send_result_to_relayer(TxVerificationResult::Ok {
531 original_peer: None,
532 with_vm_2023,
533 tx_hash,
534 });
535 self.process_orphan_tx(&tx).await;
536 }
537 Err(Reject::Duplicated(_)) => {
538 debug!("after_process {} duplicated", tx_hash);
539 self.send_result_to_relayer(TxVerificationResult::Ok {
541 original_peer: None,
542 with_vm_2023,
543 tx_hash,
544 });
545 }
546 Err(reject) => {
547 debug!("after_process {} reject: {} ", tx_hash, reject);
548 if reject.should_recorded() {
549 self.put_recent_reject(&tx_hash, reject).await;
550 }
551 }
552 }
553 }
554 }
555 }
556
557 pub(crate) async fn add_orphan(
558 &self,
559 tx: TransactionView,
560 peer: PeerIndex,
561 declared_cycle: Cycle,
562 ) {
563 let evicted_txs = self
564 .orphan
565 .write()
566 .await
567 .add_orphan_tx(tx, peer, declared_cycle);
568 for tx_hash in evicted_txs {
571 self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
572 }
573 }
574
575 pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
576 let orphan = self.orphan.read().await;
577 orphan
578 .find_by_previous(tx)
579 .iter()
580 .filter_map(|id| orphan.get(id).cloned())
581 .collect::<Vec<_>>()
582 }
583
584 pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
585 self.orphan.write().await.remove_orphan_tx(id);
586 }
587
588 pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
592 let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
593 orphan_queue.push_back(tx.clone());
594
595 while let Some(previous) = orphan_queue.pop_front() {
596 let orphans = self.find_orphan_by_previous(&previous).await;
597 for orphan in orphans.into_iter() {
598 if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
599 debug!(
600 "process_orphan {} added to verify queue; find previous from {}",
601 orphan.tx.hash(),
602 tx.hash(),
603 );
604 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
605 self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
606 .await
607 .expect("enqueue suspended tx");
608 } else if let Some((ret, snapshot)) = self
609 ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
610 .await
611 {
612 match ret {
613 Ok(_) => {
614 let with_vm_2023 = {
615 let epoch = snapshot
616 .tip_header()
617 .epoch()
618 .minimum_epoch_number_after_n_blocks(1);
619
620 self.consensus
621 .hardfork_switch
622 .ckb2023
623 .is_vm_version_2_and_syscalls_3_enabled(epoch)
624 };
625 self.send_result_to_relayer(TxVerificationResult::Ok {
626 original_peer: Some(orphan.peer),
627 with_vm_2023,
628 tx_hash: orphan.tx.hash(),
629 });
630 debug!(
631 "process_orphan {} success, find previous from {}",
632 orphan.tx.hash(),
633 tx.hash()
634 );
635 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
636 orphan_queue.push_back(orphan.tx);
637 }
638 Err(reject) => {
639 debug!(
640 "process_orphan {} reject {}, find previous from {}",
641 orphan.tx.hash(),
642 reject,
643 tx.hash(),
644 );
645
646 if !is_missing_input(&reject) {
647 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
648 if reject.is_malformed_tx() {
649 self.ban_malformed(orphan.peer, format!("reject {reject}"))
650 .await;
651 }
652 if reject.is_allowed_relay() {
653 self.send_result_to_relayer(TxVerificationResult::Reject {
654 tx_hash: orphan.tx.hash(),
655 });
656 }
657 if reject.should_recorded() {
658 self.put_recent_reject(&orphan.tx.hash(), &reject).await;
659 }
660 }
661 }
662 }
663 }
664 }
665 }
666 }
667
668 pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
669 if let Err(e) = self.tx_relay_sender.send(result) {
670 error!("tx-pool tx_relay_sender internal error {}", e);
671 }
672 }
673
674 async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
675 const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
676
677 #[cfg(feature = "with_sentry")]
678 use sentry::{Level, capture_message, with_scope};
679
680 #[cfg(feature = "with_sentry")]
681 with_scope(
682 |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
683 || {
684 capture_message(
685 &format!(
686 "Ban peer {} for {} seconds, reason: \
687 {}",
688 peer,
689 DEFAULT_BAN_TIME.as_secs(),
690 reason
691 ),
692 Level::Info,
693 )
694 },
695 );
696 self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
697 self.verify_queue.write().await.remove_txs_by_peer(&peer);
698 }
699
700 pub(crate) async fn _process_tx(
701 &self,
702 tx: TransactionView,
703 declared_cycles: Option<Cycle>,
704 command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
705 ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
706 let wtx_hash = tx.witness_hash();
707 let instant = Instant::now();
708 let is_sync_process = command_rx.is_none();
709
710 let (ret, snapshot) = self.pre_check(&tx).await;
711
712 let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
713
714 if self.is_in_delay_window(&snapshot) {
715 let mut delay = self.delay.write().await;
716 if delay.len() < DELAY_LIMIT {
717 delay.insert(tx.proposal_short_id(), tx);
718 }
719 return None;
720 }
721
722 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
723 let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
724 let tip_header = snapshot.tip_header();
725 let tx_env = Arc::new(status.with_env(tip_header));
726
727 let verified_ret = verify_rtx(
728 Arc::clone(&snapshot),
729 Arc::clone(&rtx),
730 tx_env,
731 &verify_cache,
732 max_cycles,
733 command_rx,
734 )
735 .await;
736
737 let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
738
739 if let Some(declared) = declared_cycles {
740 if declared != verified.cycles {
741 info!(
742 "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
743 declared, verified.cycles, tx
744 );
745 return Some((
746 Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
747 snapshot,
748 ));
749 }
750 }
751
752 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
753
754 let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
755 try_or_return_with_snapshot!(ret, submit_snapshot);
756
757 self.notify_block_assembler(status).await;
758
759 if verify_cache.is_none() {
760 let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
762 tokio::spawn(async move {
763 let mut guard = txs_verify_cache.write().await;
764 guard.put(wtx_hash, verified);
765 });
766 }
767
768 if let Some(metrics) = ckb_metrics::handle() {
769 let elapsed = instant.elapsed().as_secs_f64();
770 if is_sync_process {
771 metrics.ckb_tx_pool_sync_process.observe(elapsed);
772 } else {
773 metrics.ckb_tx_pool_async_process.observe(elapsed);
774 }
775 }
776
777 Some((Ok(verified), submit_snapshot))
778 }
779
780 pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
781 let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
782
783 let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
784
785 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
788 let max_cycles = self.consensus.max_block_cycles();
789 let tip_header = snapshot.tip_header();
790 let tx_env = Arc::new(status.with_env(tip_header));
791
792 verify_rtx(
793 Arc::clone(&snapshot),
794 Arc::clone(&rtx),
795 tx_env,
796 &verify_cache,
797 max_cycles,
798 None,
799 )
800 .await
801 }
802
803 pub(crate) async fn update_tx_pool_for_reorg(
804 &self,
805 detached_blocks: VecDeque<BlockView>,
806 attached_blocks: VecDeque<BlockView>,
807 detached_proposal_id: HashSet<ProposalShortId>,
808 snapshot: Arc<Snapshot>,
809 ) {
810 let mine_mode = self.block_assembler.is_some();
811 let mut detached = LinkedHashSet::default();
812 let mut attached = LinkedHashSet::default();
813
814 let epoch_of_next_block = snapshot
815 .tip_header()
816 .epoch()
817 .minimum_epoch_number_after_n_blocks(1);
818
819 let new_tip_after_delay = after_delay_window(&snapshot);
820 let is_in_delay_window = self.is_in_delay_window(&snapshot);
821
822 let detached_headers: HashSet<Byte32> = detached_blocks
823 .iter()
824 .map(|blk| blk.header().hash())
825 .collect();
826
827 for blk in detached_blocks {
828 detached.extend(blk.transactions().into_iter().skip(1))
829 }
830
831 for blk in attached_blocks {
832 self.fee_estimator.commit_block(&blk);
833 attached.extend(blk.transactions().into_iter().skip(1));
834 }
835 let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
836
837 let fetched_cache = if is_in_delay_window {
838 HashMap::new()
840 } else {
841 self.fetch_txs_verify_cache(retain.iter()).await
842 };
843
844 let txs_opt = {
849 let mut tx_pool = self.tx_pool.write().await;
851
852 let txs_opt = if is_in_delay_window {
853 {
854 self.verify_queue.write().await.clear();
855 }
856 Some(tx_pool.drain_all_transactions())
857 } else {
858 None
859 };
860
861 _update_tx_pool_for_reorg(
862 &mut tx_pool,
863 &attached,
864 &detached_headers,
865 detached_proposal_id,
866 snapshot,
867 &self.callbacks,
868 mine_mode,
869 );
870
871 if !self.network.load_ckb2023()
876 && self
877 .consensus
878 .hardfork_switch
879 .ckb2023
880 .is_vm_version_2_and_syscalls_3_enabled(epoch_of_next_block)
881 {
882 self.network.init_ckb2023()
883 }
884
885 self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
887 .await;
888
889 txs_opt
890 };
891
892 if let Some(txs) = txs_opt {
893 let mut delay = self.delay.write().await;
894 if delay.len() < DELAY_LIMIT {
895 for tx in txs {
896 delay.insert(tx.proposal_short_id(), tx);
897 }
898 }
899 }
900
901 {
902 let delay_txs = if !self.after_delay() && new_tip_after_delay {
903 let limit = MAX_BLOCK_PROPOSALS_LIMIT as usize;
904 let mut txs = Vec::with_capacity(limit);
905 let mut delay = self.delay.write().await;
906 let keys: Vec<_> = { delay.keys().take(limit).cloned().collect() };
907 for k in keys {
908 if let Some(v) = delay.remove(&k) {
909 txs.push(v);
910 }
911 }
912 if delay.is_empty() {
913 self.set_after_delay_true();
914 }
915 Some(txs)
916 } else {
917 None
918 };
919 if let Some(txs) = delay_txs {
920 self.try_process_txs(txs).await;
921 }
922 }
923
924 self.remove_orphan_txs_by_attach(&attached).await;
925 {
926 let mut queue = self.verify_queue.write().await;
927 queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
928 }
929 }
930
931 async fn enqueue_verify_queue(
932 &self,
933 tx: TransactionView,
934 remote: Option<(Cycle, PeerIndex)>,
935 ) -> Result<bool, Reject> {
936 let mut queue = self.verify_queue.write().await;
937 queue.add_tx(tx, remote)
938 }
939
940 async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
941 for tx in txs.iter() {
942 self.process_orphan_tx(tx).await;
943 }
944 let mut orphan = self.orphan.write().await;
945 orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
946 }
947
948 async fn readd_detached_tx(
949 &self,
950 tx_pool: &mut TxPool,
951 txs: Vec<TransactionView>,
952 fetched_cache: HashMap<Byte32, CacheEntry>,
953 ) {
954 let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
955 for tx in txs {
956 let tx_size = tx.data().serialized_size_in_block();
957 let tx_hash = tx.hash();
958 if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
959 if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
960 let verify_cache = fetched_cache.get(&tx_hash).cloned();
961 let snapshot = tx_pool.cloned_snapshot();
962 let tip_header = snapshot.tip_header();
963 let tx_env = Arc::new(status.with_env(tip_header));
964 if let Ok(verified) = verify_rtx(
965 snapshot,
966 Arc::clone(&rtx),
967 tx_env,
968 &verify_cache,
969 max_cycles,
970 None,
971 )
972 .await
973 {
974 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
975 if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
976 error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
977 } else {
978 debug!("readd_detached_tx submit_entry {}", tx_hash);
979 }
980 }
981 }
982 }
983 }
984 }
985
986 pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
987 {
988 let mut tx_pool = self.tx_pool.write().await;
989 tx_pool.clear(Arc::clone(&new_snapshot));
990 }
991 if self
993 .block_assembler_sender
994 .send(BlockAssemblerMessage::Reset(new_snapshot))
995 .await
996 .is_err()
997 {
998 error!("block_assembler receiver dropped");
999 }
1000 }
1001
1002 pub(crate) async fn save_pool(&self) {
1003 let mut tx_pool = self.tx_pool.write().await;
1004 if let Err(err) = tx_pool.save_into_file() {
1005 error!("failed to save pool, error: {:?}", err)
1006 } else {
1007 info!("TxPool saved successfully")
1008 }
1009 }
1010
1011 pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
1012 self.fee_estimator.update_ibd_state(in_ibd);
1013 }
1014
1015 pub(crate) async fn estimate_fee_rate(
1016 &self,
1017 estimate_mode: EstimateMode,
1018 enable_fallback: bool,
1019 ) -> Result<FeeRate, AnyError> {
1020 let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
1021 match self
1022 .fee_estimator
1023 .estimate_fee_rate(estimate_mode, all_entry_info)
1024 {
1025 Ok(fee_rate) => Ok(fee_rate),
1026 Err(err) => {
1027 if enable_fallback {
1028 let target_blocks =
1029 FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
1030 self.tx_pool
1031 .read()
1032 .await
1033 .estimate_fee_rate(target_blocks)
1034 .map_err(Into::into)
1035 } else {
1036 Err(err.into())
1037 }
1038 }
1039 }
1040 }
1041
1042 async fn try_process_txs(&self, txs: Vec<TransactionView>) {
1046 if txs.is_empty() {
1047 return;
1048 }
1049 let total = txs.len();
1050 let mut count = 0usize;
1051 for tx in txs {
1052 let tx_hash = tx.hash();
1053 if let Err(err) = self.process_tx(tx, None).await {
1054 error!("failed to process {:#x}, error: {:?}", tx_hash, err);
1055 count += 1;
1056 }
1057 }
1058 if count != 0 {
1059 info!("{}/{} transaction process failed.", count, total);
1060 }
1061 }
1062
1063 pub(crate) fn is_in_delay_window(&self, snapshot: &Snapshot) -> bool {
1064 let epoch = snapshot.tip_header().epoch();
1065 self.consensus.is_in_delay_window(&epoch)
1066 }
1067}
1068
1069type PreCheckedTx = (
1070 Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
1076
1077type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
1078
1079fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
1080 if snapshot.proposals().contains_proposed(short_id) {
1081 TxStatus::Proposed
1082 } else if snapshot.proposals().contains_gap(short_id) {
1083 TxStatus::Gap
1084 } else {
1085 TxStatus::Fresh
1086 }
1087}
1088
1089fn check_rtx(
1090 tx_pool: &TxPool,
1091 snapshot: &Snapshot,
1092 rtx: &ResolvedTransaction,
1093) -> Result<TxStatus, Reject> {
1094 let short_id = rtx.transaction.proposal_short_id();
1095 let tx_status = get_tx_status(snapshot, &short_id);
1096 tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
1097}
1098
1099fn resolve_tx(
1100 tx_pool: &TxPool,
1101 snapshot: &Snapshot,
1102 tx: TransactionView,
1103 rbf: bool,
1104) -> ResolveResult {
1105 let short_id = tx.proposal_short_id();
1106 let tx_status = get_tx_status(snapshot, &short_id);
1107 tx_pool
1108 .resolve_tx_from_pool(tx, rbf)
1109 .map(|rtx| (rtx, tx_status))
1110}
1111
1112fn _submit_entry(
1113 tx_pool: &mut TxPool,
1114 status: TxStatus,
1115 entry: TxEntry,
1116 callbacks: &Callbacks,
1117) -> Result<HashSet<TxEntry>, Reject> {
1118 let tx_hash = entry.transaction().hash();
1119 debug!("submit_entry {:?} {}", status, tx_hash);
1120 let (succ, evicts) = match status {
1121 TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
1122 TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
1123 TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
1124 };
1125 if succ {
1126 match status {
1127 TxStatus::Fresh => callbacks.call_pending(&entry),
1128 TxStatus::Gap => callbacks.call_pending(&entry),
1129 TxStatus::Proposed => callbacks.call_proposed(&entry),
1130 }
1131 }
1132 Ok(evicts)
1133}
1134
1135fn _update_tx_pool_for_reorg(
1136 tx_pool: &mut TxPool,
1137 attached: &LinkedHashSet<TransactionView>,
1138 detached_headers: &HashSet<Byte32>,
1139 detached_proposal_id: HashSet<ProposalShortId>,
1140 snapshot: Arc<Snapshot>,
1141 callbacks: &Callbacks,
1142 mine_mode: bool,
1143) {
1144 tx_pool.snapshot = Arc::clone(&snapshot);
1145
1146 tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1152 tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1153
1154 if mine_mode {
1158 let mut proposals = Vec::new();
1159 let mut gaps = Vec::new();
1160
1161 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1162 let short_id = entry.inner.proposal_short_id();
1163 if snapshot.proposals().contains_proposed(&short_id) {
1164 proposals.push((short_id, entry.inner.clone()));
1165 }
1166 }
1167
1168 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1169 let short_id = entry.inner.proposal_short_id();
1170 let elem = (short_id.clone(), entry.inner.clone());
1171 if snapshot.proposals().contains_proposed(&short_id) {
1172 proposals.push(elem);
1173 } else if snapshot.proposals().contains_gap(&short_id) {
1174 gaps.push(elem);
1175 }
1176 }
1177
1178 for (id, entry) in proposals {
1179 debug!("begin to proposed: {:x}", id);
1180 if let Err(e) = tx_pool.proposed_rtx(&id) {
1181 debug!(
1182 "Failed to add proposed tx {}, reason: {}",
1183 entry.transaction().hash(),
1184 e
1185 );
1186 callbacks.call_reject(tx_pool, &entry, e);
1187 } else {
1188 callbacks.call_proposed(&entry)
1189 }
1190 }
1191
1192 for (id, entry) in gaps {
1193 debug!("begin to gap: {:x}", id);
1194 if let Err(e) = tx_pool.gap_rtx(&id) {
1195 debug!(
1196 "Failed to add tx to gap {}, reason: {}",
1197 entry.transaction().hash(),
1198 e
1199 );
1200 callbacks.call_reject(tx_pool, &entry, e.clone());
1201 }
1202 }
1203 }
1204
1205 tx_pool.remove_expired(callbacks);
1207
1208 let _ = tx_pool.limit_size(callbacks, None);
1210}