1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10 after_delay_window, check_tx_fee, check_txid_collision, is_missing_input,
11 non_contextual_verify, time_relative_verify, verify_rtx,
12};
13use ckb_chain_spec::consensus::MAX_BLOCK_PROPOSALS_LIMIT;
14use ckb_error::{AnyError, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_jsonrpc_types::BlockTemplate;
17use ckb_logger::Level::Trace;
18use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
19use ckb_network::PeerIndex;
20use ckb_script::ChunkCommand;
21use ckb_snapshot::Snapshot;
22use ckb_types::core::error::OutPointError;
23use ckb_types::{
24 core::{
25 cell::ResolvedTransaction, BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView,
26 TransactionView,
27 },
28 packed::{Byte32, ProposalShortId},
29};
30use ckb_util::LinkedHashSet;
31use ckb_verification::{
32 cache::{CacheEntry, Completed},
33 TxVerifyEnv,
34};
35use std::collections::HashSet;
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::watch;
40
41const DELAY_LIMIT: usize = 1_500 * 21; pub enum PlugTarget {
45 Pending,
47 Proposed,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TxStatus {
53 Fresh,
54 Gap,
55 Proposed,
56}
57
58impl TxStatus {
59 fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
60 match self {
61 TxStatus::Fresh => TxVerifyEnv::new_submit(header),
62 TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
63 TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
64 }
65 }
66}
67
68impl TxPoolService {
69 pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
70 if let Some(ref block_assembler) = self.block_assembler {
71 Ok(block_assembler.get_current().await)
72 } else {
73 Err(InternalErrorKind::Config
74 .other("BlockAssembler disabled")
75 .into())
76 }
77 }
78
79 pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
80 let guard = self.txs_verify_cache.read().await;
81 guard.peek(&tx.witness_hash()).cloned()
82 }
83
84 async fn fetch_txs_verify_cache(
85 &self,
86 txs: impl Iterator<Item = &TransactionView>,
87 ) -> HashMap<Byte32, CacheEntry> {
88 let guard = self.txs_verify_cache.read().await;
89 txs.filter_map(|tx| {
90 let wtx_hash = tx.witness_hash();
91 guard
92 .peek(&wtx_hash)
93 .cloned()
94 .map(|value| (wtx_hash, value))
95 })
96 .collect()
97 }
98
99 pub(crate) async fn submit_entry(
100 &self,
101 pre_resolve_tip: Byte32,
102 entry: TxEntry,
103 mut status: TxStatus,
104 ) -> (Result<(), Reject>, Arc<Snapshot>) {
105 let (ret, snapshot) = self
106 .with_tx_pool_write_lock(move |tx_pool, snapshot| {
107 let conflicts = if tx_pool.enable_rbf() {
109 tx_pool.check_rbf(&snapshot, &entry)?
110 } else {
111 let conflicted_outpoint =
114 tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
115 if let Some(outpoint) = conflicted_outpoint {
116 return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
117 }
118 HashSet::new()
119 };
120
121 let tip_hash = snapshot.tip_hash();
123 if pre_resolve_tip != tip_hash {
124 debug!(
125 "submit_entry {} context changed. previous:{} now:{}",
126 entry.proposal_short_id(),
127 pre_resolve_tip,
128 tip_hash
129 );
130
131 status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
133
134 let tip_header = snapshot.tip_header();
135 let tx_env = status.with_env(tip_header);
136 time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
137 }
138
139 let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
140 let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
141
142 for evict in evicted {
145 let reject = Reject::Invalidated(format!(
146 "invalidated by tx {}",
147 evict.transaction().hash()
148 ));
149 self.callbacks.call_reject(tx_pool, &evict, reject);
150 }
151
152 tx_pool.remove_conflict(&entry.proposal_short_id());
153 tx_pool
154 .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
155 .map_or(Ok(()), Err)?;
156
157 if !may_recovered_txs.is_empty() {
158 let self_clone = self.clone();
159 tokio::spawn(async move {
160 let mut queue = self_clone.verify_queue.write().await;
162 for tx in may_recovered_txs {
163 debug!("recover back: {:?}", tx.proposal_short_id());
164 let _ = queue.add_tx(tx, None);
165 }
166 });
167 }
168 Ok(())
169 })
170 .await;
171
172 (ret, snapshot)
173 }
174
175 pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
176 if self.should_notify_block_assembler() {
177 match status {
178 TxStatus::Fresh => {
179 if self
180 .block_assembler_sender
181 .send(BlockAssemblerMessage::Pending)
182 .await
183 .is_err()
184 {
185 error!("block_assembler receiver dropped");
186 }
187 }
188 TxStatus::Proposed => {
189 if self
190 .block_assembler_sender
191 .send(BlockAssemblerMessage::Proposed)
192 .await
193 .is_err()
194 {
195 error!("block_assembler receiver dropped");
196 }
197 }
198 _ => {}
199 }
200 }
201 }
202
203 fn process_rbf(
206 &self,
207 tx_pool: &mut TxPool,
208 entry: &TxEntry,
209 conflicts: &HashSet<ProposalShortId>,
210 ) -> Vec<TransactionView> {
211 let mut may_recovered_txs = vec![];
212 let mut available_inputs = HashSet::new();
213
214 if conflicts.is_empty() {
215 return may_recovered_txs;
216 }
217
218 let all_removed: Vec<_> = conflicts
219 .iter()
220 .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
221 .collect();
222
223 available_inputs.extend(
224 all_removed
225 .iter()
226 .flat_map(|removed| removed.transaction().input_pts_iter()),
227 );
228
229 for input in entry.transaction().input_pts_iter() {
230 available_inputs.remove(&input);
231 }
232
233 may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
234 for old in all_removed {
235 debug!(
236 "remove conflict tx {} for RBF by new tx {}",
237 old.transaction().hash(),
238 entry.transaction().hash()
239 );
240 let reject =
241 Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
242
243 tx_pool.record_conflict(old.transaction().clone());
245 self.callbacks.call_reject(tx_pool, &old, reject);
247 }
248 assert!(!may_recovered_txs.contains(entry.transaction()));
249 may_recovered_txs
250 }
251
252 pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
253 let queue = self.verify_queue.read().await;
254 queue.contains_key(&tx.proposal_short_id())
255 }
256
257 pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
258 let orphan = self.orphan.read().await;
259 orphan.contains_key(&tx.proposal_short_id())
260 }
261
262 pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
263 &self,
264 mut f: F,
265 ) -> (U, Arc<Snapshot>) {
266 let tx_pool = self.tx_pool.read().await;
267 let snapshot = tx_pool.cloned_snapshot();
268
269 let ret = f(&tx_pool, Arc::clone(&snapshot));
270 (ret, snapshot)
271 }
272
273 pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
274 &self,
275 mut f: F,
276 ) -> (U, Arc<Snapshot>) {
277 let mut tx_pool = self.tx_pool.write().await;
278 let snapshot = tx_pool.cloned_snapshot();
279
280 let ret = f(&mut tx_pool, Arc::clone(&snapshot));
281 (ret, snapshot)
282 }
283
284 pub(crate) async fn pre_check(
285 &self,
286 tx: &TransactionView,
287 ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
288 let tx_size = tx.data().serialized_size_in_block();
290
291 let (ret, snapshot) = self
292 .with_tx_pool_read_lock(|tx_pool, snapshot| {
293 let tip_hash = snapshot.tip_hash();
294
295 check_txid_collision(tx_pool, tx)?;
298
299 let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
302 match res {
303 Ok((rtx, status)) => {
304 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
305 Ok((tip_hash, rtx, status, fee, tx_size))
306 }
307 Err(Reject::Resolve(OutPointError::Dead(out))) => {
308 let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
309 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
310 let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
311 if conflicts.is_none() {
312 error!(
315 "{} is resolved as Dead, but there is no conflicted tx",
316 rtx.transaction.proposal_short_id()
317 );
318 return Err(Reject::Resolve(OutPointError::Dead(out)));
319 }
320 Ok((tip_hash, rtx, status, fee, tx_size))
325 }
326 Err(err) => Err(err),
327 }
328 })
329 .await;
330 (ret, snapshot)
331 }
332
333 pub(crate) fn non_contextual_verify(
334 &self,
335 tx: &TransactionView,
336 remote: Option<(Cycle, PeerIndex)>,
337 ) -> Result<(), Reject> {
338 if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
339 if reject.is_malformed_tx() {
340 if let Some(remote) = remote {
341 self.ban_malformed(remote.1, format!("reject {reject}"));
342 }
343 }
344 return Err(reject);
345 }
346 Ok(())
347 }
348
349 pub(crate) async fn resumeble_process_tx(
350 &self,
351 tx: TransactionView,
352 remote: Option<(Cycle, PeerIndex)>,
353 ) -> Result<bool, Reject> {
354 self.non_contextual_verify(&tx, remote)?;
356
357 if self.orphan_contains(&tx).await {
358 debug!("reject tx {} already in orphan pool", tx.hash());
359 return Err(Reject::Duplicated(tx.hash()));
360 }
361
362 if self.verify_queue_contains(&tx).await {
363 return Err(Reject::Duplicated(tx.hash()));
364 }
365 self.enqueue_verify_queue(tx, remote).await
366 }
367
368 pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
369 self.non_contextual_verify(&tx, None)?;
371
372 if self.verify_queue_contains(&tx).await {
373 return Err(Reject::Duplicated(tx.hash()));
374 }
375
376 if self.orphan_contains(&tx).await {
377 debug!("reject tx {} already in orphan pool", tx.hash());
378 return Err(Reject::Duplicated(tx.hash()));
379 }
380 self._test_accept_tx(tx.clone()).await
381 }
382
383 pub(crate) async fn process_tx(
384 &self,
385 tx: TransactionView,
386 remote: Option<(Cycle, PeerIndex)>,
387 ) -> Result<Completed, Reject> {
388 self.non_contextual_verify(&tx, remote)?;
390
391 if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
392 return Err(Reject::Duplicated(tx.hash()));
393 }
394
395 if let Some((ret, snapshot)) = self
396 ._process_tx(tx.clone(), remote.map(|r| r.0), None)
397 .await
398 {
399 self.after_process(tx, remote, &snapshot, &ret).await;
400 ret
401 } else {
402 Ok(Completed {
404 cycles: 0,
405 fee: Capacity::zero(),
406 })
407 }
408 }
409
410 pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
411 let mut tx_pool = self.tx_pool.write().await;
412 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
413 if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
414 error!(
415 "Failed to record recent_reject {} {} {}",
416 tx_hash, reject, e
417 );
418 }
419 }
420 }
421
422 pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
423 let id = ProposalShortId::from_tx_hash(&tx_hash);
424 {
425 let mut queue = self.verify_queue.write().await;
426 if queue.remove_tx(&id).is_some() {
427 return true;
428 }
429 }
430 {
431 let mut orphan = self.orphan.write().await;
432 if orphan.remove_orphan_tx(&id).is_some() {
433 return true;
434 }
435 }
436 let mut tx_pool = self.tx_pool.write().await;
437 tx_pool.remove_tx(&id)
438 }
439
440 pub(crate) async fn after_process(
441 &self,
442 tx: TransactionView,
443 remote: Option<(Cycle, PeerIndex)>,
444 snapshot: &Snapshot,
445 ret: &Result<Completed, Reject>,
446 ) {
447 let tx_hash = tx.hash();
448
449 let with_vm_2023 = {
452 let epoch = snapshot
453 .tip_header()
454 .epoch()
455 .minimum_epoch_number_after_n_blocks(1);
456
457 self.consensus
458 .hardfork_switch
459 .ckb2023
460 .is_vm_version_2_and_syscalls_3_enabled(epoch)
461 };
462
463 if log_enabled_target!("ckb_tx_monitor", Trace) {
465 if let Ok(c) = ret {
466 trace_target!(
467 "ckb_tx_monitor",
468 r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
469 tx_hash,
470 c.cycles
471 );
472 }
473 }
474
475 if matches!(
476 ret,
477 Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
478 ) {
479 let mut tx_pool = self.tx_pool.write().await;
480 if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
481 tx_pool.record_conflict(tx.clone());
482 }
483 }
484
485 match remote {
486 Some((declared_cycle, peer)) => match ret {
487 Ok(_) => {
488 debug!(
489 "after_process remote send_result_to_relayer {} {}",
490 tx_hash, peer
491 );
492 self.send_result_to_relayer(TxVerificationResult::Ok {
493 original_peer: Some(peer),
494 with_vm_2023,
495 tx_hash,
496 });
497 self.process_orphan_tx(&tx).await;
498 }
499 Err(reject) => {
500 info!(
501 "after_process {} {} remote reject: {} ",
502 tx_hash, peer, reject
503 );
504 if is_missing_input(reject) {
505 self.send_result_to_relayer(TxVerificationResult::UnknownParents {
506 peer,
507 parents: tx.unique_parents(),
508 });
509 self.add_orphan(tx, peer, declared_cycle).await;
510 } else {
511 if reject.is_malformed_tx() {
512 self.ban_malformed(peer, format!("reject {reject}"));
513 }
514 if reject.is_allowed_relay() {
515 self.send_result_to_relayer(TxVerificationResult::Reject {
516 tx_hash: tx_hash.clone(),
517 });
518 }
519 if reject.should_recorded() {
520 self.put_recent_reject(&tx_hash, reject).await;
521 }
522 }
523 }
524 },
525 None => {
526 match ret {
527 Ok(_) => {
528 debug!("after_process local send_result_to_relayer {}", tx_hash);
529 self.send_result_to_relayer(TxVerificationResult::Ok {
530 original_peer: None,
531 with_vm_2023,
532 tx_hash,
533 });
534 self.process_orphan_tx(&tx).await;
535 }
536 Err(Reject::Duplicated(_)) => {
537 debug!("after_process {} duplicated", tx_hash);
538 self.send_result_to_relayer(TxVerificationResult::Ok {
540 original_peer: None,
541 with_vm_2023,
542 tx_hash,
543 });
544 }
545 Err(reject) => {
546 debug!("after_process {} reject: {} ", tx_hash, reject);
547 if reject.should_recorded() {
548 self.put_recent_reject(&tx_hash, reject).await;
549 }
550 }
551 }
552 }
553 }
554 }
555
556 pub(crate) async fn add_orphan(
557 &self,
558 tx: TransactionView,
559 peer: PeerIndex,
560 declared_cycle: Cycle,
561 ) {
562 let evicted_txs = self
563 .orphan
564 .write()
565 .await
566 .add_orphan_tx(tx, peer, declared_cycle);
567 for tx_hash in evicted_txs {
570 self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
571 }
572 }
573
574 pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
575 let orphan = self.orphan.read().await;
576 orphan
577 .find_by_previous(tx)
578 .iter()
579 .filter_map(|id| orphan.get(id).cloned())
580 .collect::<Vec<_>>()
581 }
582
583 pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
584 self.orphan.write().await.remove_orphan_tx(id);
585 }
586
587 pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
591 let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
592 orphan_queue.push_back(tx.clone());
593
594 while let Some(previous) = orphan_queue.pop_front() {
595 let orphans = self.find_orphan_by_previous(&previous).await;
596 for orphan in orphans.into_iter() {
597 if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
598 debug!(
599 "process_orphan {} added to verify queue; find previous from {}",
600 orphan.tx.hash(),
601 tx.hash(),
602 );
603 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
604 self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
605 .await
606 .expect("enqueue suspended tx");
607 } else if let Some((ret, snapshot)) = self
608 ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
609 .await
610 {
611 match ret {
612 Ok(_) => {
613 let with_vm_2023 = {
614 let epoch = snapshot
615 .tip_header()
616 .epoch()
617 .minimum_epoch_number_after_n_blocks(1);
618
619 self.consensus
620 .hardfork_switch
621 .ckb2023
622 .is_vm_version_2_and_syscalls_3_enabled(epoch)
623 };
624 self.send_result_to_relayer(TxVerificationResult::Ok {
625 original_peer: Some(orphan.peer),
626 with_vm_2023,
627 tx_hash: orphan.tx.hash(),
628 });
629 debug!(
630 "process_orphan {} success, find previous from {}",
631 orphan.tx.hash(),
632 tx.hash()
633 );
634 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
635 orphan_queue.push_back(orphan.tx);
636 }
637 Err(reject) => {
638 debug!(
639 "process_orphan {} reject {}, find previous from {}",
640 orphan.tx.hash(),
641 reject,
642 tx.hash(),
643 );
644
645 if !is_missing_input(&reject) {
646 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
647 if reject.is_malformed_tx() {
648 self.ban_malformed(orphan.peer, format!("reject {reject}"));
649 }
650 if reject.is_allowed_relay() {
651 self.send_result_to_relayer(TxVerificationResult::Reject {
652 tx_hash: orphan.tx.hash(),
653 });
654 }
655 if reject.should_recorded() {
656 self.put_recent_reject(&orphan.tx.hash(), &reject).await;
657 }
658 }
659 }
660 }
661 }
662 }
663 }
664 }
665
666 pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
667 if let Err(e) = self.tx_relay_sender.send(result) {
668 error!("tx-pool tx_relay_sender internal error {}", e);
669 }
670 }
671
672 fn ban_malformed(&self, peer: PeerIndex, reason: String) {
673 const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
674
675 #[cfg(feature = "with_sentry")]
676 use sentry::{capture_message, with_scope, Level};
677
678 #[cfg(feature = "with_sentry")]
679 with_scope(
680 |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
681 || {
682 capture_message(
683 &format!(
684 "Ban peer {} for {} seconds, reason: \
685 {}",
686 peer,
687 DEFAULT_BAN_TIME.as_secs(),
688 reason
689 ),
690 Level::Info,
691 )
692 },
693 );
694 self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
695 }
696
697 pub(crate) async fn _process_tx(
698 &self,
699 tx: TransactionView,
700 declared_cycles: Option<Cycle>,
701 command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
702 ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
703 let wtx_hash = tx.witness_hash();
704 let instant = Instant::now();
705 let is_sync_process = command_rx.is_none();
706
707 let (ret, snapshot) = self.pre_check(&tx).await;
708
709 let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
710
711 if self.is_in_delay_window(&snapshot) {
712 let mut delay = self.delay.write().await;
713 if delay.len() < DELAY_LIMIT {
714 delay.insert(tx.proposal_short_id(), tx);
715 }
716 return None;
717 }
718
719 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
720 let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
721 let tip_header = snapshot.tip_header();
722 let tx_env = Arc::new(status.with_env(tip_header));
723
724 let verified_ret = verify_rtx(
725 Arc::clone(&snapshot),
726 Arc::clone(&rtx),
727 tx_env,
728 &verify_cache,
729 max_cycles,
730 command_rx,
731 )
732 .await;
733
734 let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
735
736 if let Some(declared) = declared_cycles {
737 if declared != verified.cycles {
738 info!(
739 "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
740 declared, verified.cycles, tx
741 );
742 return Some((
743 Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
744 snapshot,
745 ));
746 }
747 }
748
749 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
750
751 let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
752 try_or_return_with_snapshot!(ret, submit_snapshot);
753
754 self.notify_block_assembler(status).await;
755
756 if verify_cache.is_none() {
757 let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
759 tokio::spawn(async move {
760 let mut guard = txs_verify_cache.write().await;
761 guard.put(wtx_hash, verified);
762 });
763 }
764
765 if let Some(metrics) = ckb_metrics::handle() {
766 let elapsed = instant.elapsed().as_secs_f64();
767 if is_sync_process {
768 metrics.ckb_tx_pool_sync_process.observe(elapsed);
769 } else {
770 metrics.ckb_tx_pool_async_process.observe(elapsed);
771 }
772 }
773
774 Some((Ok(verified), submit_snapshot))
775 }
776
777 pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
778 let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
779
780 let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
781
782 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
785 let max_cycles = self.consensus.max_block_cycles();
786 let tip_header = snapshot.tip_header();
787 let tx_env = Arc::new(status.with_env(tip_header));
788
789 verify_rtx(
790 Arc::clone(&snapshot),
791 Arc::clone(&rtx),
792 tx_env,
793 &verify_cache,
794 max_cycles,
795 None,
796 )
797 .await
798 }
799
800 pub(crate) async fn update_tx_pool_for_reorg(
801 &self,
802 detached_blocks: VecDeque<BlockView>,
803 attached_blocks: VecDeque<BlockView>,
804 detached_proposal_id: HashSet<ProposalShortId>,
805 snapshot: Arc<Snapshot>,
806 ) {
807 let mine_mode = self.block_assembler.is_some();
808 let mut detached = LinkedHashSet::default();
809 let mut attached = LinkedHashSet::default();
810
811 let epoch_of_next_block = snapshot
812 .tip_header()
813 .epoch()
814 .minimum_epoch_number_after_n_blocks(1);
815
816 let new_tip_after_delay = after_delay_window(&snapshot);
817 let is_in_delay_window = self.is_in_delay_window(&snapshot);
818
819 let detached_headers: HashSet<Byte32> = detached_blocks
820 .iter()
821 .map(|blk| blk.header().hash())
822 .collect();
823
824 for blk in detached_blocks {
825 detached.extend(blk.transactions().into_iter().skip(1))
826 }
827
828 for blk in attached_blocks {
829 self.fee_estimator.commit_block(&blk);
830 attached.extend(blk.transactions().into_iter().skip(1));
831 }
832 let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
833
834 let fetched_cache = if is_in_delay_window {
835 HashMap::new()
837 } else {
838 self.fetch_txs_verify_cache(retain.iter()).await
839 };
840
841 let txs_opt = {
846 let mut tx_pool = self.tx_pool.write().await;
848
849 let txs_opt = if is_in_delay_window {
850 {
851 self.verify_queue.write().await.clear();
852 }
853 Some(tx_pool.drain_all_transactions())
854 } else {
855 None
856 };
857
858 _update_tx_pool_for_reorg(
859 &mut tx_pool,
860 &attached,
861 &detached_headers,
862 detached_proposal_id,
863 snapshot,
864 &self.callbacks,
865 mine_mode,
866 );
867
868 if !self.network.load_ckb2023()
873 && self
874 .consensus
875 .hardfork_switch
876 .ckb2023
877 .is_vm_version_2_and_syscalls_3_enabled(epoch_of_next_block)
878 {
879 self.network.init_ckb2023()
880 }
881
882 self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
884 .await;
885
886 txs_opt
887 };
888
889 if let Some(txs) = txs_opt {
890 let mut delay = self.delay.write().await;
891 if delay.len() < DELAY_LIMIT {
892 for tx in txs {
893 delay.insert(tx.proposal_short_id(), tx);
894 }
895 }
896 }
897
898 {
899 let delay_txs = if !self.after_delay() && new_tip_after_delay {
900 let limit = MAX_BLOCK_PROPOSALS_LIMIT as usize;
901 let mut txs = Vec::with_capacity(limit);
902 let mut delay = self.delay.write().await;
903 let keys: Vec<_> = { delay.keys().take(limit).cloned().collect() };
904 for k in keys {
905 if let Some(v) = delay.remove(&k) {
906 txs.push(v);
907 }
908 }
909 if delay.is_empty() {
910 self.set_after_delay_true();
911 }
912 Some(txs)
913 } else {
914 None
915 };
916 if let Some(txs) = delay_txs {
917 self.try_process_txs(txs).await;
918 }
919 }
920
921 self.remove_orphan_txs_by_attach(&attached).await;
922 {
923 let mut queue = self.verify_queue.write().await;
924 queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
925 }
926 }
927
928 async fn enqueue_verify_queue(
929 &self,
930 tx: TransactionView,
931 remote: Option<(Cycle, PeerIndex)>,
932 ) -> Result<bool, Reject> {
933 let mut queue = self.verify_queue.write().await;
934 queue.add_tx(tx, remote)
935 }
936
937 async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
938 for tx in txs.iter() {
939 self.process_orphan_tx(tx).await;
940 }
941 let mut orphan = self.orphan.write().await;
942 orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
943 }
944
945 async fn readd_detached_tx(
946 &self,
947 tx_pool: &mut TxPool,
948 txs: Vec<TransactionView>,
949 fetched_cache: HashMap<Byte32, CacheEntry>,
950 ) {
951 let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
952 for tx in txs {
953 let tx_size = tx.data().serialized_size_in_block();
954 let tx_hash = tx.hash();
955 if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
956 if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
957 let verify_cache = fetched_cache.get(&tx_hash).cloned();
958 let snapshot = tx_pool.cloned_snapshot();
959 let tip_header = snapshot.tip_header();
960 let tx_env = Arc::new(status.with_env(tip_header));
961 if let Ok(verified) = verify_rtx(
962 snapshot,
963 Arc::clone(&rtx),
964 tx_env,
965 &verify_cache,
966 max_cycles,
967 None,
968 )
969 .await
970 {
971 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
972 if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
973 error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
974 } else {
975 debug!("readd_detached_tx submit_entry {}", tx_hash);
976 }
977 }
978 }
979 }
980 }
981 }
982
983 pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
984 {
985 let mut tx_pool = self.tx_pool.write().await;
986 tx_pool.clear(Arc::clone(&new_snapshot));
987 }
988 if self
990 .block_assembler_sender
991 .send(BlockAssemblerMessage::Reset(new_snapshot))
992 .await
993 .is_err()
994 {
995 error!("block_assembler receiver dropped");
996 }
997 }
998
999 pub(crate) async fn save_pool(&self) {
1000 let mut tx_pool = self.tx_pool.write().await;
1001 if let Err(err) = tx_pool.save_into_file() {
1002 error!("failed to save pool, error: {:?}", err)
1003 } else {
1004 info!("TxPool saved successfully")
1005 }
1006 }
1007
1008 pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
1009 self.fee_estimator.update_ibd_state(in_ibd);
1010 }
1011
1012 pub(crate) async fn estimate_fee_rate(
1013 &self,
1014 estimate_mode: EstimateMode,
1015 enable_fallback: bool,
1016 ) -> Result<FeeRate, AnyError> {
1017 let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
1018 match self
1019 .fee_estimator
1020 .estimate_fee_rate(estimate_mode, all_entry_info)
1021 {
1022 Ok(fee_rate) => Ok(fee_rate),
1023 Err(err) => {
1024 if enable_fallback {
1025 let target_blocks =
1026 FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
1027 self.tx_pool
1028 .read()
1029 .await
1030 .estimate_fee_rate(target_blocks)
1031 .map_err(Into::into)
1032 } else {
1033 Err(err.into())
1034 }
1035 }
1036 }
1037 }
1038
1039 async fn try_process_txs(&self, txs: Vec<TransactionView>) {
1043 if txs.is_empty() {
1044 return;
1045 }
1046 let total = txs.len();
1047 let mut count = 0usize;
1048 for tx in txs {
1049 let tx_hash = tx.hash();
1050 if let Err(err) = self.process_tx(tx, None).await {
1051 error!("failed to process {:#x}, error: {:?}", tx_hash, err);
1052 count += 1;
1053 }
1054 }
1055 if count != 0 {
1056 info!("{}/{} transaction process failed.", count, total);
1057 }
1058 }
1059
1060 pub(crate) fn is_in_delay_window(&self, snapshot: &Snapshot) -> bool {
1061 let epoch = snapshot.tip_header().epoch();
1062 self.consensus.is_in_delay_window(&epoch)
1063 }
1064}
1065
1066type PreCheckedTx = (
1067 Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
1073
1074type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
1075
1076fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
1077 if snapshot.proposals().contains_proposed(short_id) {
1078 TxStatus::Proposed
1079 } else if snapshot.proposals().contains_gap(short_id) {
1080 TxStatus::Gap
1081 } else {
1082 TxStatus::Fresh
1083 }
1084}
1085
1086fn check_rtx(
1087 tx_pool: &TxPool,
1088 snapshot: &Snapshot,
1089 rtx: &ResolvedTransaction,
1090) -> Result<TxStatus, Reject> {
1091 let short_id = rtx.transaction.proposal_short_id();
1092 let tx_status = get_tx_status(snapshot, &short_id);
1093 tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
1094}
1095
1096fn resolve_tx(
1097 tx_pool: &TxPool,
1098 snapshot: &Snapshot,
1099 tx: TransactionView,
1100 rbf: bool,
1101) -> ResolveResult {
1102 let short_id = tx.proposal_short_id();
1103 let tx_status = get_tx_status(snapshot, &short_id);
1104 tx_pool
1105 .resolve_tx_from_pool(tx, rbf)
1106 .map(|rtx| (rtx, tx_status))
1107}
1108
1109fn _submit_entry(
1110 tx_pool: &mut TxPool,
1111 status: TxStatus,
1112 entry: TxEntry,
1113 callbacks: &Callbacks,
1114) -> Result<HashSet<TxEntry>, Reject> {
1115 let tx_hash = entry.transaction().hash();
1116 debug!("submit_entry {:?} {}", status, tx_hash);
1117 let (succ, evicts) = match status {
1118 TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
1119 TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
1120 TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
1121 };
1122 if succ {
1123 match status {
1124 TxStatus::Fresh => callbacks.call_pending(&entry),
1125 TxStatus::Gap => callbacks.call_pending(&entry),
1126 TxStatus::Proposed => callbacks.call_proposed(&entry),
1127 }
1128 }
1129 Ok(evicts)
1130}
1131
1132fn _update_tx_pool_for_reorg(
1133 tx_pool: &mut TxPool,
1134 attached: &LinkedHashSet<TransactionView>,
1135 detached_headers: &HashSet<Byte32>,
1136 detached_proposal_id: HashSet<ProposalShortId>,
1137 snapshot: Arc<Snapshot>,
1138 callbacks: &Callbacks,
1139 mine_mode: bool,
1140) {
1141 tx_pool.snapshot = Arc::clone(&snapshot);
1142
1143 tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1149 tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1150
1151 if mine_mode {
1155 let mut proposals = Vec::new();
1156 let mut gaps = Vec::new();
1157
1158 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1159 let short_id = entry.inner.proposal_short_id();
1160 if snapshot.proposals().contains_proposed(&short_id) {
1161 proposals.push((short_id, entry.inner.clone()));
1162 }
1163 }
1164
1165 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1166 let short_id = entry.inner.proposal_short_id();
1167 let elem = (short_id.clone(), entry.inner.clone());
1168 if snapshot.proposals().contains_proposed(&short_id) {
1169 proposals.push(elem);
1170 } else if snapshot.proposals().contains_gap(&short_id) {
1171 gaps.push(elem);
1172 }
1173 }
1174
1175 for (id, entry) in proposals {
1176 debug!("begin to proposed: {:x}", id);
1177 if let Err(e) = tx_pool.proposed_rtx(&id) {
1178 debug!(
1179 "Failed to add proposed tx {}, reason: {}",
1180 entry.transaction().hash(),
1181 e
1182 );
1183 callbacks.call_reject(tx_pool, &entry, e);
1184 } else {
1185 callbacks.call_proposed(&entry)
1186 }
1187 }
1188
1189 for (id, entry) in gaps {
1190 debug!("begin to gap: {:x}", id);
1191 if let Err(e) = tx_pool.gap_rtx(&id) {
1192 debug!(
1193 "Failed to add tx to gap {}, reason: {}",
1194 entry.transaction().hash(),
1195 e
1196 );
1197 callbacks.call_reject(tx_pool, &entry, e.clone());
1198 }
1199 }
1200 }
1201
1202 tx_pool.remove_expired(callbacks);
1204
1205 let _ = tx_pool.limit_size(callbacks, None);
1207}