1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10 check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11 time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target, warn};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23 core::{
24 BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25 cell::ResolvedTransaction,
26 },
27 packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31 TxVerifyEnv,
32 cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40pub enum PlugTarget {
42 Pending,
44 Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50 Fresh,
51 Gap,
52 Proposed,
53}
54
55impl TxStatus {
56 fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57 match self {
58 TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59 TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60 TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61 }
62 }
63}
64
65impl TxPoolService {
66 pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67 if let Some(ref block_assembler) = self.block_assembler {
68 Ok(block_assembler.get_current().await)
69 } else {
70 Err(InternalErrorKind::Config
71 .other("BlockAssembler disabled")
72 .into())
73 }
74 }
75
76 pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77 let guard = self.txs_verify_cache.read().await;
78 guard.peek(&tx.witness_hash()).cloned()
79 }
80
81 async fn fetch_txs_verify_cache(
82 &self,
83 txs: impl Iterator<Item = &TransactionView>,
84 ) -> HashMap<Byte32, CacheEntry> {
85 let guard = self.txs_verify_cache.read().await;
86 txs.filter_map(|tx| {
87 let wtx_hash = tx.witness_hash();
88 guard
89 .peek(&wtx_hash)
90 .cloned()
91 .map(|value| (wtx_hash, value))
92 })
93 .collect()
94 }
95
96 pub(crate) async fn submit_entry(
97 &self,
98 pre_resolve_tip: Byte32,
99 entry: TxEntry,
100 mut status: TxStatus,
101 ) -> (Result<(), Reject>, Arc<Snapshot>) {
102 let (ret, snapshot) = self
103 .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104 let conflicts = if tx_pool.enable_rbf() {
106 tx_pool.check_rbf(&snapshot, &entry)?
107 } else {
108 let conflicted_outpoint =
111 tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112 if let Some(outpoint) = conflicted_outpoint {
113 return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114 }
115 HashSet::new()
116 };
117
118 let tip_hash = snapshot.tip_hash();
120 if pre_resolve_tip != tip_hash {
121 debug!(
122 "submit_entry {} context changed. previous:{} now:{}",
123 entry.proposal_short_id(),
124 pre_resolve_tip,
125 tip_hash
126 );
127
128 status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131 let tip_header = snapshot.tip_header();
132 let tx_env = status.with_env(tip_header);
133 time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134 }
135
136 let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137 let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139 for evict in evicted {
142 let reject = Reject::Invalidated(format!(
143 "invalidated by tx {}",
144 evict.transaction().hash()
145 ));
146 self.callbacks.call_reject(tx_pool, &evict, reject);
147 }
148
149 tx_pool.remove_conflict(&entry.proposal_short_id());
150 tx_pool
151 .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152 .map_or(Ok(()), Err)?;
153
154 if !may_recovered_txs.is_empty() {
155 let self_clone = self.clone();
156 tokio::spawn(async move {
157 let mut queue = self_clone.verify_queue.write().await;
159 for tx in may_recovered_txs {
160 debug!("recover back: {:?}", tx.proposal_short_id());
161 let _ = queue.add_tx(tx, false, None);
162 }
163 });
164 }
165 Ok(())
166 })
167 .await;
168
169 (ret, snapshot)
170 }
171
172 pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173 if self.should_notify_block_assembler() {
174 let message = match status {
175 TxStatus::Fresh => Some(BlockAssemblerMessage::Pending),
176 TxStatus::Proposed => Some(BlockAssemblerMessage::Proposed),
177 _ => None,
178 };
179
180 if let Some(message) = message
181 && self.block_assembler_sender.send(message).await.is_err()
182 {
183 error!("block_assembler receiver dropped");
184 }
185 }
186 }
187
188 fn process_rbf(
191 &self,
192 tx_pool: &mut TxPool,
193 entry: &TxEntry,
194 conflicts: &HashSet<ProposalShortId>,
195 ) -> Vec<TransactionView> {
196 let mut may_recovered_txs = vec![];
197 let mut available_inputs = HashSet::new();
198
199 if conflicts.is_empty() {
200 return may_recovered_txs;
201 }
202
203 let all_removed: Vec<_> = conflicts
204 .iter()
205 .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
206 .collect();
207
208 available_inputs.extend(
209 all_removed
210 .iter()
211 .flat_map(|removed| removed.transaction().input_pts_iter()),
212 );
213
214 for input in entry.transaction().input_pts_iter() {
215 available_inputs.remove(&input);
216 }
217
218 may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
219 for old in all_removed {
220 debug!(
221 "remove conflict tx {} for RBF by new tx {}",
222 old.transaction().hash(),
223 entry.transaction().hash()
224 );
225 let reject =
226 Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
227
228 tx_pool.record_conflict(old.transaction().clone());
230 self.callbacks.call_reject(tx_pool, &old, reject);
232 }
233 assert!(!may_recovered_txs.contains(entry.transaction()));
234 may_recovered_txs
235 }
236
237 pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
238 let queue = self.verify_queue.read().await;
239 queue.contains_key(&tx.proposal_short_id())
240 }
241
242 pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
243 let orphan = self.orphan.read().await;
244 orphan.contains_key(&tx.proposal_short_id())
245 }
246
247 pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
248 &self,
249 mut f: F,
250 ) -> (U, Arc<Snapshot>) {
251 let tx_pool = self.tx_pool.read().await;
252 let snapshot = tx_pool.cloned_snapshot();
253
254 let ret = f(&tx_pool, Arc::clone(&snapshot));
255 (ret, snapshot)
256 }
257
258 pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
259 &self,
260 mut f: F,
261 ) -> (U, Arc<Snapshot>) {
262 let mut tx_pool = self.tx_pool.write().await;
263 let snapshot = tx_pool.cloned_snapshot();
264
265 let ret = f(&mut tx_pool, Arc::clone(&snapshot));
266 (ret, snapshot)
267 }
268
269 pub(crate) async fn pre_check(
270 &self,
271 tx: &TransactionView,
272 ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
273 let tx_size = tx.data().serialized_size_in_block();
275
276 let (ret, snapshot) = self
277 .with_tx_pool_read_lock(|tx_pool, snapshot| {
278 let tip_hash = snapshot.tip_hash();
279
280 check_txid_collision(tx_pool, tx)?;
283
284 let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
287 match res {
288 Ok((rtx, status)) => {
289 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
290 Ok((tip_hash, rtx, status, fee, tx_size))
291 }
292 Err(Reject::Resolve(OutPointError::Dead(out))) => {
293 let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
294 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
295 let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
296 if conflicts.is_none() {
297 error!(
300 "{} is resolved as Dead, but there is no conflicted tx",
301 rtx.transaction.proposal_short_id()
302 );
303 return Err(Reject::Resolve(OutPointError::Dead(out)));
304 }
305 Ok((tip_hash, rtx, status, fee, tx_size))
310 }
311 Err(err) => Err(err),
312 }
313 })
314 .await;
315 (ret, snapshot)
316 }
317
318 pub(crate) async fn non_contextual_verify(
319 &self,
320 tx: &TransactionView,
321 remote: Option<(Cycle, PeerIndex)>,
322 ) -> Result<(), Reject> {
323 if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
324 if reject.is_malformed_tx()
325 && let Some(remote) = remote
326 {
327 self.ban_malformed(remote.1, format!("reject {reject}"))
328 .await;
329 }
330 return Err(reject);
331 }
332 Ok(())
333 }
334
335 pub(crate) async fn resumeble_process_tx(
336 &self,
337 tx: TransactionView,
338 is_proposal_tx: bool,
339 remote: Option<(Cycle, PeerIndex)>,
340 ) -> Result<bool, Reject> {
341 self.non_contextual_verify(&tx, remote).await?;
343
344 if self.orphan_contains(&tx).await {
345 debug!("reject tx {} already in orphan pool", tx.hash());
346 return Err(Reject::Duplicated(tx.hash()));
347 }
348
349 if self.verify_queue_contains(&tx).await {
350 return Err(Reject::Duplicated(tx.hash()));
351 }
352 self.enqueue_verify_queue(tx, is_proposal_tx, remote).await
353 }
354
355 pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
356 self.non_contextual_verify(&tx, None).await?;
358
359 if self.verify_queue_contains(&tx).await {
360 return Err(Reject::Duplicated(tx.hash()));
361 }
362
363 if self.orphan_contains(&tx).await {
364 debug!("reject tx {} already in orphan pool", tx.hash());
365 return Err(Reject::Duplicated(tx.hash()));
366 }
367 self._test_accept_tx(tx.clone()).await
368 }
369
370 pub(crate) async fn process_tx(
371 &self,
372 tx: TransactionView,
373 remote: Option<(Cycle, PeerIndex)>,
374 ) -> Result<Completed, Reject> {
375 self.non_contextual_verify(&tx, remote).await?;
377
378 if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
379 return Err(Reject::Duplicated(tx.hash()));
380 }
381
382 if let Some((ret, snapshot)) = self
383 ._process_tx(tx.clone(), remote.map(|r| r.0), None)
384 .await
385 {
386 self.after_process(tx, remote, &snapshot, &ret).await;
387 ret
388 } else {
389 Ok(Completed {
391 cycles: 0,
392 fee: Capacity::zero(),
393 })
394 }
395 }
396
397 pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
398 let mut tx_pool = self.tx_pool.write().await;
399 if let Some(ref mut recent_reject) = tx_pool.recent_reject
400 && let Err(e) = recent_reject.put(tx_hash, reject.clone())
401 {
402 error!(
403 "Failed to record recent_reject {} {} {}",
404 tx_hash, reject, e
405 );
406 }
407 }
408
409 pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
410 let id = ProposalShortId::from_tx_hash(&tx_hash);
411 {
412 let mut queue = self.verify_queue.write().await;
413 if queue.remove_tx(&id).is_some() {
414 return true;
415 }
416 }
417 {
418 let mut orphan = self.orphan.write().await;
419 if orphan.remove_orphan_tx(&id).is_some() {
420 return true;
421 }
422 }
423 let mut tx_pool = self.tx_pool.write().await;
424 tx_pool.remove_tx(&id)
425 }
426
427 pub(crate) async fn after_process(
428 &self,
429 tx: TransactionView,
430 remote: Option<(Cycle, PeerIndex)>,
431 _snapshot: &Snapshot,
432 ret: &Result<Completed, Reject>,
433 ) {
434 let tx_hash = tx.hash();
435
436 if log_enabled_target!("ckb_tx_monitor", Trace)
438 && let Ok(c) = ret
439 {
440 trace_target!(
441 "ckb_tx_monitor",
442 r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
443 tx_hash,
444 c.cycles
445 );
446 }
447
448 if matches!(
449 ret,
450 Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
451 ) {
452 let mut tx_pool = self.tx_pool.write().await;
453 if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
454 tx_pool.record_conflict(tx.clone());
455 }
456 }
457
458 match remote {
459 Some((declared_cycle, peer)) => match ret {
460 Ok(_) => {
461 debug!(
462 "after_process remote send_result_to_relayer {} {}",
463 tx_hash, peer
464 );
465 self.send_result_to_relayer(TxVerificationResult::Ok {
466 original_peer: Some(peer),
467 tx_hash,
468 });
469 self.process_orphan_tx(&tx).await;
470 }
471 Err(reject) => {
472 info!(
473 "after_process {} {} remote reject: {} ",
474 tx_hash, peer, reject
475 );
476 if is_missing_input(reject) {
477 self.send_result_to_relayer(TxVerificationResult::UnknownParents {
478 peer,
479 parents: tx.unique_parents(),
480 });
481 self.add_orphan(tx, peer, declared_cycle).await;
482 } else {
483 if reject.is_malformed_tx() {
484 self.ban_malformed(peer, format!("reject {reject}")).await;
485 }
486 if reject.is_allowed_relay() {
487 self.send_result_to_relayer(TxVerificationResult::Reject {
488 tx_hash: tx_hash.clone(),
489 });
490 }
491 if reject.should_recorded() {
492 self.put_recent_reject(&tx_hash, reject).await;
493 }
494 }
495 }
496 },
497 None => {
498 match ret {
499 Ok(_) => {
500 debug!("after_process local send_result_to_relayer {}", tx_hash);
501 self.send_result_to_relayer(TxVerificationResult::Ok {
502 original_peer: None,
503 tx_hash,
504 });
505 self.process_orphan_tx(&tx).await;
506 }
507 Err(Reject::Duplicated(_)) => {
508 debug!("after_process {} duplicated", tx_hash);
509 self.send_result_to_relayer(TxVerificationResult::Ok {
511 original_peer: None,
512 tx_hash,
513 });
514 }
515 Err(reject) => {
516 debug!("after_process {} reject: {} ", tx_hash, reject);
517 if reject.should_recorded() {
518 self.put_recent_reject(&tx_hash, reject).await;
519 }
520 }
521 }
522 }
523 }
524 }
525
526 pub(crate) async fn add_orphan(
527 &self,
528 tx: TransactionView,
529 peer: PeerIndex,
530 declared_cycle: Cycle,
531 ) {
532 let evicted_txs = self
533 .orphan
534 .write()
535 .await
536 .add_orphan_tx(tx, peer, declared_cycle);
537 for tx_hash in evicted_txs {
540 self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
541 }
542 }
543
544 pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
545 let orphan = self.orphan.read().await;
546 orphan
547 .find_by_previous(tx)
548 .iter()
549 .filter_map(|id| orphan.get(id).cloned())
550 .collect::<Vec<_>>()
551 }
552
553 pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
554 self.orphan.write().await.remove_orphan_tx(id);
555 }
556
557 pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
561 let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
562 orphan_queue.push_back(tx.clone());
563
564 while let Some(previous) = orphan_queue.pop_front() {
565 let orphans = self.find_orphan_by_previous(&previous).await;
566 for orphan in orphans.into_iter() {
567 if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
568 debug!(
569 "process_orphan {} added to verify queue; find previous from {}",
570 orphan.tx.hash(),
571 tx.hash(),
572 );
573 let orphan_id = orphan.tx.proposal_short_id();
574 match self
575 .enqueue_verify_queue(
576 orphan.tx.clone(),
577 false,
578 Some((orphan.cycle, orphan.peer)),
579 )
580 .await
581 {
582 Ok(_) => {
583 self.remove_orphan_tx(&orphan_id).await;
584 }
585 Err(reject) => {
586 warn!(
587 "process_orphan {} failed to enqueue verify queue: {}; keep orphan from {}",
588 orphan.tx.hash(),
589 reject,
590 tx.hash(),
591 );
592 }
593 }
594 } else if let Some((ret, _snapshot)) = self
595 ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
596 .await
597 {
598 match ret {
599 Ok(_) => {
600 self.send_result_to_relayer(TxVerificationResult::Ok {
601 original_peer: Some(orphan.peer),
602 tx_hash: orphan.tx.hash(),
603 });
604 debug!(
605 "process_orphan {} success, find previous from {}",
606 orphan.tx.hash(),
607 tx.hash()
608 );
609 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
610 orphan_queue.push_back(orphan.tx);
611 }
612 Err(reject) => {
613 debug!(
614 "process_orphan {} reject {}, find previous from {}",
615 orphan.tx.hash(),
616 reject,
617 tx.hash(),
618 );
619
620 if !is_missing_input(&reject) {
621 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
622 if reject.is_malformed_tx() {
623 self.ban_malformed(orphan.peer, format!("reject {reject}"))
624 .await;
625 }
626 if reject.is_allowed_relay() {
627 self.send_result_to_relayer(TxVerificationResult::Reject {
628 tx_hash: orphan.tx.hash(),
629 });
630 }
631 if reject.should_recorded() {
632 self.put_recent_reject(&orphan.tx.hash(), &reject).await;
633 }
634 }
635 }
636 }
637 }
638 }
639 }
640 }
641
642 pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
643 if let Err(e) = self.tx_relay_sender.send(result) {
644 error!("tx-pool tx_relay_sender internal error {}", e);
645 }
646 }
647
648 async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
649 const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
650
651 #[cfg(feature = "with_sentry")]
652 use sentry::{Level, capture_message, with_scope};
653
654 #[cfg(feature = "with_sentry")]
655 with_scope(
656 |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
657 || {
658 capture_message(
659 &format!(
660 "Ban peer {} for {} seconds, reason: \
661 {}",
662 peer,
663 DEFAULT_BAN_TIME.as_secs(),
664 reason
665 ),
666 Level::Info,
667 )
668 },
669 );
670 self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
671 self.verify_queue.write().await.remove_txs_by_peer(&peer);
672 }
673
674 pub(crate) async fn _process_tx(
675 &self,
676 tx: TransactionView,
677 declared_cycles: Option<Cycle>,
678 command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
679 ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
680 let wtx_hash = tx.witness_hash();
681 let instant = Instant::now();
682 let is_sync_process = command_rx.is_none();
683
684 let (ret, snapshot) = self.pre_check(&tx).await;
685
686 let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
687
688 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
689 let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
690 let tip_header = snapshot.tip_header();
691 let tx_env = Arc::new(status.with_env(tip_header));
692
693 let verified_ret = verify_rtx(
694 Arc::clone(&snapshot),
695 Arc::clone(&rtx),
696 tx_env,
697 &verify_cache,
698 max_cycles,
699 command_rx,
700 )
701 .await;
702
703 let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
704
705 if let Some(declared) = declared_cycles
706 && declared != verified.cycles
707 {
708 info!(
709 "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
710 declared, verified.cycles, tx
711 );
712 return Some((
713 Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
714 snapshot,
715 ));
716 }
717
718 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
719
720 let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
721 try_or_return_with_snapshot!(ret, submit_snapshot);
722
723 self.notify_block_assembler(status).await;
724
725 if verify_cache.is_none() {
726 let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
728 tokio::spawn(async move {
729 let mut guard = txs_verify_cache.write().await;
730 guard.put(wtx_hash, verified);
731 });
732 }
733
734 if let Some(metrics) = ckb_metrics::handle() {
735 let elapsed = instant.elapsed().as_secs_f64();
736 if is_sync_process {
737 metrics.ckb_tx_pool_sync_process.observe(elapsed);
738 } else {
739 metrics.ckb_tx_pool_async_process.observe(elapsed);
740 }
741 }
742
743 Some((Ok(verified), submit_snapshot))
744 }
745
746 pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
747 let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
748
749 let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
750
751 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
754 let max_cycles = self.consensus.max_block_cycles();
755 let tip_header = snapshot.tip_header();
756 let tx_env = Arc::new(status.with_env(tip_header));
757
758 verify_rtx(
759 Arc::clone(&snapshot),
760 Arc::clone(&rtx),
761 tx_env,
762 &verify_cache,
763 max_cycles,
764 None,
765 )
766 .await
767 }
768
769 pub(crate) async fn update_tx_pool_for_reorg(
770 &self,
771 detached_blocks: VecDeque<BlockView>,
772 attached_blocks: VecDeque<BlockView>,
773 detached_proposal_id: HashSet<ProposalShortId>,
774 snapshot: Arc<Snapshot>,
775 ) {
776 let mine_mode = self.block_assembler.is_some();
777 let mut detached = LinkedHashSet::default();
778 let mut attached = LinkedHashSet::default();
779
780 let detached_headers: HashSet<Byte32> = detached_blocks
781 .iter()
782 .map(|blk| blk.header().hash())
783 .collect();
784
785 for blk in detached_blocks {
786 detached.extend(blk.transactions().into_iter().skip(1))
787 }
788
789 for blk in attached_blocks {
790 self.fee_estimator.commit_block(&blk);
791 attached.extend(blk.transactions().into_iter().skip(1));
792 }
793 let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
794
795 let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
796
797 {
802 let mut tx_pool = self.tx_pool.write().await;
804
805 _update_tx_pool_for_reorg(
806 &mut tx_pool,
807 &attached,
808 &detached_headers,
809 detached_proposal_id,
810 snapshot,
811 &self.callbacks,
812 mine_mode,
813 );
814
815 self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
817 .await;
818 }
819
820 self.remove_orphan_txs_by_attach(&attached).await;
821 {
822 let mut queue = self.verify_queue.write().await;
823 queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
824 }
825 }
826
827 async fn enqueue_verify_queue(
828 &self,
829 tx: TransactionView,
830 is_proposal_tx: bool,
831 remote: Option<(Cycle, PeerIndex)>,
832 ) -> Result<bool, Reject> {
833 let mut queue = self.verify_queue.write().await;
834 queue.add_tx(tx, is_proposal_tx, remote)
835 }
836
837 async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
838 for tx in txs.iter() {
839 self.process_orphan_tx(tx).await;
840 }
841 let mut orphan = self.orphan.write().await;
842 orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
843 }
844
845 async fn readd_detached_tx(
846 &self,
847 tx_pool: &mut TxPool,
848 txs: Vec<TransactionView>,
849 fetched_cache: HashMap<Byte32, CacheEntry>,
850 ) {
851 let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
852 for tx in txs {
853 let tx_size = tx.data().serialized_size_in_block();
854 let tx_hash = tx.hash();
855 if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false)
856 && let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size)
857 {
858 let verify_cache = fetched_cache.get(&tx_hash).cloned();
859 let snapshot = tx_pool.cloned_snapshot();
860 let tip_header = snapshot.tip_header();
861 let tx_env = Arc::new(status.with_env(tip_header));
862 if let Ok(verified) = verify_rtx(
863 snapshot,
864 Arc::clone(&rtx),
865 tx_env,
866 &verify_cache,
867 max_cycles,
868 None,
869 )
870 .await
871 {
872 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
873 if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
874 error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
875 } else {
876 debug!("readd_detached_tx submit_entry {}", tx_hash);
877 }
878 }
879 }
880 }
881 }
882
883 pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
884 {
885 let mut tx_pool = self.tx_pool.write().await;
886 tx_pool.clear(Arc::clone(&new_snapshot));
887 }
888 if self
890 .block_assembler_sender
891 .send(BlockAssemblerMessage::Reset(new_snapshot))
892 .await
893 .is_err()
894 {
895 error!("block_assembler receiver dropped");
896 }
897 }
898
899 pub(crate) async fn save_pool(&self) {
900 let mut tx_pool = self.tx_pool.write().await;
901 if let Err(err) = tx_pool.save_into_file() {
902 error!("failed to save pool, error: {:?}", err)
903 } else {
904 info!("TxPool saved successfully")
905 }
906 }
907
908 pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
909 self.fee_estimator.update_ibd_state(in_ibd);
910 }
911
912 pub(crate) async fn estimate_fee_rate(
913 &self,
914 estimate_mode: EstimateMode,
915 enable_fallback: bool,
916 ) -> Result<FeeRate, AnyError> {
917 let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
918 match self
919 .fee_estimator
920 .estimate_fee_rate(estimate_mode, all_entry_info)
921 {
922 Ok(fee_rate) => Ok(fee_rate),
923 Err(err) => {
924 if enable_fallback {
925 let target_blocks =
926 FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
927 self.tx_pool
928 .read()
929 .await
930 .estimate_fee_rate(target_blocks)
931 .map_err(Into::into)
932 } else {
933 Err(err.into())
934 }
935 }
936 }
937 }
938}
939
940type PreCheckedTx = (
941 Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
947
948type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
949
950fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
951 if snapshot.proposals().contains_proposed(short_id) {
952 TxStatus::Proposed
953 } else if snapshot.proposals().contains_gap(short_id) {
954 TxStatus::Gap
955 } else {
956 TxStatus::Fresh
957 }
958}
959
960fn check_rtx(
961 tx_pool: &TxPool,
962 snapshot: &Snapshot,
963 rtx: &ResolvedTransaction,
964) -> Result<TxStatus, Reject> {
965 let short_id = rtx.transaction.proposal_short_id();
966 let tx_status = get_tx_status(snapshot, &short_id);
967 tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
968}
969
970fn resolve_tx(
971 tx_pool: &TxPool,
972 snapshot: &Snapshot,
973 tx: TransactionView,
974 rbf: bool,
975) -> ResolveResult {
976 let short_id = tx.proposal_short_id();
977 let tx_status = get_tx_status(snapshot, &short_id);
978 tx_pool
979 .resolve_tx_from_pool(tx, rbf)
980 .map(|rtx| (rtx, tx_status))
981}
982
983fn _submit_entry(
984 tx_pool: &mut TxPool,
985 status: TxStatus,
986 entry: TxEntry,
987 callbacks: &Callbacks,
988) -> Result<HashSet<TxEntry>, Reject> {
989 let tx_hash = entry.transaction().hash();
990 debug!("submit_entry {:?} {}", status, tx_hash);
991 let (succ, evicts) = match status {
992 TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
993 TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
994 TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
995 };
996 if succ {
997 match status {
998 TxStatus::Fresh => callbacks.call_pending(&entry),
999 TxStatus::Gap => callbacks.call_pending(&entry),
1000 TxStatus::Proposed => callbacks.call_proposed(&entry),
1001 }
1002 }
1003 Ok(evicts)
1004}
1005
1006fn _update_tx_pool_for_reorg(
1007 tx_pool: &mut TxPool,
1008 attached: &LinkedHashSet<TransactionView>,
1009 detached_headers: &HashSet<Byte32>,
1010 detached_proposal_id: HashSet<ProposalShortId>,
1011 snapshot: Arc<Snapshot>,
1012 callbacks: &Callbacks,
1013 mine_mode: bool,
1014) {
1015 tx_pool.snapshot = Arc::clone(&snapshot);
1016
1017 tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1023 tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1024
1025 if mine_mode {
1029 let mut proposals = Vec::new();
1030 let mut gaps = Vec::new();
1031
1032 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1033 let short_id = entry.inner.proposal_short_id();
1034 if snapshot.proposals().contains_proposed(&short_id) {
1035 proposals.push((short_id, entry.inner.clone()));
1036 }
1037 }
1038
1039 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1040 let short_id = entry.inner.proposal_short_id();
1041 let elem = (short_id.clone(), entry.inner.clone());
1042 if snapshot.proposals().contains_proposed(&short_id) {
1043 proposals.push(elem);
1044 } else if snapshot.proposals().contains_gap(&short_id) {
1045 gaps.push(elem);
1046 }
1047 }
1048
1049 for (id, entry) in proposals {
1050 debug!("begin to proposed: {:x}", id);
1051 if let Err(e) = tx_pool.proposed_rtx(&id) {
1052 debug!(
1053 "Failed to add proposed tx {}, reason: {}",
1054 entry.transaction().hash(),
1055 e
1056 );
1057 callbacks.call_reject(tx_pool, &entry, e);
1058 } else {
1059 callbacks.call_proposed(&entry)
1060 }
1061 }
1062
1063 for (id, entry) in gaps {
1064 debug!("begin to gap: {:x}", id);
1065 if let Err(e) = tx_pool.gap_rtx(&id) {
1066 debug!(
1067 "Failed to add tx to gap {}, reason: {}",
1068 entry.transaction().hash(),
1069 e
1070 );
1071 callbacks.call_reject(tx_pool, &entry, e.clone());
1072 }
1073 }
1074 }
1075
1076 tx_pool.remove_expired(callbacks);
1078
1079 let _ = tx_pool.limit_size(callbacks, None);
1081}