1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10 check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11 time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23 core::{
24 BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25 cell::ResolvedTransaction,
26 },
27 packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31 TxVerifyEnv,
32 cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40pub enum PlugTarget {
42 Pending,
44 Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50 Fresh,
51 Gap,
52 Proposed,
53}
54
55impl TxStatus {
56 fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57 match self {
58 TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59 TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60 TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61 }
62 }
63}
64
65impl TxPoolService {
66 pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67 if let Some(ref block_assembler) = self.block_assembler {
68 Ok(block_assembler.get_current().await)
69 } else {
70 Err(InternalErrorKind::Config
71 .other("BlockAssembler disabled")
72 .into())
73 }
74 }
75
76 pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77 let guard = self.txs_verify_cache.read().await;
78 guard.peek(&tx.witness_hash()).cloned()
79 }
80
81 async fn fetch_txs_verify_cache(
82 &self,
83 txs: impl Iterator<Item = &TransactionView>,
84 ) -> HashMap<Byte32, CacheEntry> {
85 let guard = self.txs_verify_cache.read().await;
86 txs.filter_map(|tx| {
87 let wtx_hash = tx.witness_hash();
88 guard
89 .peek(&wtx_hash)
90 .cloned()
91 .map(|value| (wtx_hash, value))
92 })
93 .collect()
94 }
95
96 pub(crate) async fn submit_entry(
97 &self,
98 pre_resolve_tip: Byte32,
99 entry: TxEntry,
100 mut status: TxStatus,
101 ) -> (Result<(), Reject>, Arc<Snapshot>) {
102 let (ret, snapshot) = self
103 .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104 let conflicts = if tx_pool.enable_rbf() {
106 tx_pool.check_rbf(&snapshot, &entry)?
107 } else {
108 let conflicted_outpoint =
111 tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112 if let Some(outpoint) = conflicted_outpoint {
113 return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114 }
115 HashSet::new()
116 };
117
118 let tip_hash = snapshot.tip_hash();
120 if pre_resolve_tip != tip_hash {
121 debug!(
122 "submit_entry {} context changed. previous:{} now:{}",
123 entry.proposal_short_id(),
124 pre_resolve_tip,
125 tip_hash
126 );
127
128 status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131 let tip_header = snapshot.tip_header();
132 let tx_env = status.with_env(tip_header);
133 time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134 }
135
136 let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137 let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139 for evict in evicted {
142 let reject = Reject::Invalidated(format!(
143 "invalidated by tx {}",
144 evict.transaction().hash()
145 ));
146 self.callbacks.call_reject(tx_pool, &evict, reject);
147 }
148
149 tx_pool.remove_conflict(&entry.proposal_short_id());
150 tx_pool
151 .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152 .map_or(Ok(()), Err)?;
153
154 if !may_recovered_txs.is_empty() {
155 let self_clone = self.clone();
156 tokio::spawn(async move {
157 let mut queue = self_clone.verify_queue.write().await;
159 for tx in may_recovered_txs {
160 debug!("recover back: {:?}", tx.proposal_short_id());
161 let _ = queue.add_tx(tx, None);
162 }
163 });
164 }
165 Ok(())
166 })
167 .await;
168
169 (ret, snapshot)
170 }
171
172 pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173 if self.should_notify_block_assembler() {
174 match status {
175 TxStatus::Fresh => {
176 if self
177 .block_assembler_sender
178 .send(BlockAssemblerMessage::Pending)
179 .await
180 .is_err()
181 {
182 error!("block_assembler receiver dropped");
183 }
184 }
185 TxStatus::Proposed => {
186 if self
187 .block_assembler_sender
188 .send(BlockAssemblerMessage::Proposed)
189 .await
190 .is_err()
191 {
192 error!("block_assembler receiver dropped");
193 }
194 }
195 _ => {}
196 }
197 }
198 }
199
200 fn process_rbf(
203 &self,
204 tx_pool: &mut TxPool,
205 entry: &TxEntry,
206 conflicts: &HashSet<ProposalShortId>,
207 ) -> Vec<TransactionView> {
208 let mut may_recovered_txs = vec![];
209 let mut available_inputs = HashSet::new();
210
211 if conflicts.is_empty() {
212 return may_recovered_txs;
213 }
214
215 let all_removed: Vec<_> = conflicts
216 .iter()
217 .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
218 .collect();
219
220 available_inputs.extend(
221 all_removed
222 .iter()
223 .flat_map(|removed| removed.transaction().input_pts_iter()),
224 );
225
226 for input in entry.transaction().input_pts_iter() {
227 available_inputs.remove(&input);
228 }
229
230 may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
231 for old in all_removed {
232 debug!(
233 "remove conflict tx {} for RBF by new tx {}",
234 old.transaction().hash(),
235 entry.transaction().hash()
236 );
237 let reject =
238 Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
239
240 tx_pool.record_conflict(old.transaction().clone());
242 self.callbacks.call_reject(tx_pool, &old, reject);
244 }
245 assert!(!may_recovered_txs.contains(entry.transaction()));
246 may_recovered_txs
247 }
248
249 pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
250 let queue = self.verify_queue.read().await;
251 queue.contains_key(&tx.proposal_short_id())
252 }
253
254 pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
255 let orphan = self.orphan.read().await;
256 orphan.contains_key(&tx.proposal_short_id())
257 }
258
259 pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
260 &self,
261 mut f: F,
262 ) -> (U, Arc<Snapshot>) {
263 let tx_pool = self.tx_pool.read().await;
264 let snapshot = tx_pool.cloned_snapshot();
265
266 let ret = f(&tx_pool, Arc::clone(&snapshot));
267 (ret, snapshot)
268 }
269
270 pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
271 &self,
272 mut f: F,
273 ) -> (U, Arc<Snapshot>) {
274 let mut tx_pool = self.tx_pool.write().await;
275 let snapshot = tx_pool.cloned_snapshot();
276
277 let ret = f(&mut tx_pool, Arc::clone(&snapshot));
278 (ret, snapshot)
279 }
280
281 pub(crate) async fn pre_check(
282 &self,
283 tx: &TransactionView,
284 ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
285 let tx_size = tx.data().serialized_size_in_block();
287
288 let (ret, snapshot) = self
289 .with_tx_pool_read_lock(|tx_pool, snapshot| {
290 let tip_hash = snapshot.tip_hash();
291
292 check_txid_collision(tx_pool, tx)?;
295
296 let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
299 match res {
300 Ok((rtx, status)) => {
301 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
302 Ok((tip_hash, rtx, status, fee, tx_size))
303 }
304 Err(Reject::Resolve(OutPointError::Dead(out))) => {
305 let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
306 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
307 let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
308 if conflicts.is_none() {
309 error!(
312 "{} is resolved as Dead, but there is no conflicted tx",
313 rtx.transaction.proposal_short_id()
314 );
315 return Err(Reject::Resolve(OutPointError::Dead(out)));
316 }
317 Ok((tip_hash, rtx, status, fee, tx_size))
322 }
323 Err(err) => Err(err),
324 }
325 })
326 .await;
327 (ret, snapshot)
328 }
329
330 pub(crate) async fn non_contextual_verify(
331 &self,
332 tx: &TransactionView,
333 remote: Option<(Cycle, PeerIndex)>,
334 ) -> Result<(), Reject> {
335 if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
336 if reject.is_malformed_tx() {
337 if let Some(remote) = remote {
338 self.ban_malformed(remote.1, format!("reject {reject}"))
339 .await;
340 }
341 }
342 return Err(reject);
343 }
344 Ok(())
345 }
346
347 pub(crate) async fn resumeble_process_tx(
348 &self,
349 tx: TransactionView,
350 remote: Option<(Cycle, PeerIndex)>,
351 ) -> Result<bool, Reject> {
352 self.non_contextual_verify(&tx, remote).await?;
354
355 if self.orphan_contains(&tx).await {
356 debug!("reject tx {} already in orphan pool", tx.hash());
357 return Err(Reject::Duplicated(tx.hash()));
358 }
359
360 if self.verify_queue_contains(&tx).await {
361 return Err(Reject::Duplicated(tx.hash()));
362 }
363 self.enqueue_verify_queue(tx, remote).await
364 }
365
366 pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
367 self.non_contextual_verify(&tx, None).await?;
369
370 if self.verify_queue_contains(&tx).await {
371 return Err(Reject::Duplicated(tx.hash()));
372 }
373
374 if self.orphan_contains(&tx).await {
375 debug!("reject tx {} already in orphan pool", tx.hash());
376 return Err(Reject::Duplicated(tx.hash()));
377 }
378 self._test_accept_tx(tx.clone()).await
379 }
380
381 pub(crate) async fn process_tx(
382 &self,
383 tx: TransactionView,
384 remote: Option<(Cycle, PeerIndex)>,
385 ) -> Result<Completed, Reject> {
386 self.non_contextual_verify(&tx, remote).await?;
388
389 if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
390 return Err(Reject::Duplicated(tx.hash()));
391 }
392
393 if let Some((ret, snapshot)) = self
394 ._process_tx(tx.clone(), remote.map(|r| r.0), None)
395 .await
396 {
397 self.after_process(tx, remote, &snapshot, &ret).await;
398 ret
399 } else {
400 Ok(Completed {
402 cycles: 0,
403 fee: Capacity::zero(),
404 })
405 }
406 }
407
408 pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
409 let mut tx_pool = self.tx_pool.write().await;
410 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
411 if let Err(e) = recent_reject.put(tx_hash, reject.clone()) {
412 error!(
413 "Failed to record recent_reject {} {} {}",
414 tx_hash, reject, e
415 );
416 }
417 }
418 }
419
420 pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
421 let id = ProposalShortId::from_tx_hash(&tx_hash);
422 {
423 let mut queue = self.verify_queue.write().await;
424 if queue.remove_tx(&id).is_some() {
425 return true;
426 }
427 }
428 {
429 let mut orphan = self.orphan.write().await;
430 if orphan.remove_orphan_tx(&id).is_some() {
431 return true;
432 }
433 }
434 let mut tx_pool = self.tx_pool.write().await;
435 tx_pool.remove_tx(&id)
436 }
437
438 pub(crate) async fn after_process(
439 &self,
440 tx: TransactionView,
441 remote: Option<(Cycle, PeerIndex)>,
442 _snapshot: &Snapshot,
443 ret: &Result<Completed, Reject>,
444 ) {
445 let tx_hash = tx.hash();
446
447 if log_enabled_target!("ckb_tx_monitor", Trace) {
449 if let Ok(c) = ret {
450 trace_target!(
451 "ckb_tx_monitor",
452 r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
453 tx_hash,
454 c.cycles
455 );
456 }
457 }
458
459 if matches!(
460 ret,
461 Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
462 ) {
463 let mut tx_pool = self.tx_pool.write().await;
464 if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
465 tx_pool.record_conflict(tx.clone());
466 }
467 }
468
469 match remote {
470 Some((declared_cycle, peer)) => match ret {
471 Ok(_) => {
472 debug!(
473 "after_process remote send_result_to_relayer {} {}",
474 tx_hash, peer
475 );
476 self.send_result_to_relayer(TxVerificationResult::Ok {
477 original_peer: Some(peer),
478 tx_hash,
479 });
480 self.process_orphan_tx(&tx).await;
481 }
482 Err(reject) => {
483 info!(
484 "after_process {} {} remote reject: {} ",
485 tx_hash, peer, reject
486 );
487 if is_missing_input(reject) {
488 self.send_result_to_relayer(TxVerificationResult::UnknownParents {
489 peer,
490 parents: tx.unique_parents(),
491 });
492 self.add_orphan(tx, peer, declared_cycle).await;
493 } else {
494 if reject.is_malformed_tx() {
495 self.ban_malformed(peer, format!("reject {reject}")).await;
496 }
497 if reject.is_allowed_relay() {
498 self.send_result_to_relayer(TxVerificationResult::Reject {
499 tx_hash: tx_hash.clone(),
500 });
501 }
502 if reject.should_recorded() {
503 self.put_recent_reject(&tx_hash, reject).await;
504 }
505 }
506 }
507 },
508 None => {
509 match ret {
510 Ok(_) => {
511 debug!("after_process local send_result_to_relayer {}", tx_hash);
512 self.send_result_to_relayer(TxVerificationResult::Ok {
513 original_peer: None,
514 tx_hash,
515 });
516 self.process_orphan_tx(&tx).await;
517 }
518 Err(Reject::Duplicated(_)) => {
519 debug!("after_process {} duplicated", tx_hash);
520 self.send_result_to_relayer(TxVerificationResult::Ok {
522 original_peer: None,
523 tx_hash,
524 });
525 }
526 Err(reject) => {
527 debug!("after_process {} reject: {} ", tx_hash, reject);
528 if reject.should_recorded() {
529 self.put_recent_reject(&tx_hash, reject).await;
530 }
531 }
532 }
533 }
534 }
535 }
536
537 pub(crate) async fn add_orphan(
538 &self,
539 tx: TransactionView,
540 peer: PeerIndex,
541 declared_cycle: Cycle,
542 ) {
543 let evicted_txs = self
544 .orphan
545 .write()
546 .await
547 .add_orphan_tx(tx, peer, declared_cycle);
548 for tx_hash in evicted_txs {
551 self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
552 }
553 }
554
555 pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
556 let orphan = self.orphan.read().await;
557 orphan
558 .find_by_previous(tx)
559 .iter()
560 .filter_map(|id| orphan.get(id).cloned())
561 .collect::<Vec<_>>()
562 }
563
564 pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
565 self.orphan.write().await.remove_orphan_tx(id);
566 }
567
568 pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
572 let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
573 orphan_queue.push_back(tx.clone());
574
575 while let Some(previous) = orphan_queue.pop_front() {
576 let orphans = self.find_orphan_by_previous(&previous).await;
577 for orphan in orphans.into_iter() {
578 if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
579 debug!(
580 "process_orphan {} added to verify queue; find previous from {}",
581 orphan.tx.hash(),
582 tx.hash(),
583 );
584 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
585 self.enqueue_verify_queue(orphan.tx, Some((orphan.cycle, orphan.peer)))
586 .await
587 .expect("enqueue suspended tx");
588 } else if let Some((ret, _snapshot)) = self
589 ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
590 .await
591 {
592 match ret {
593 Ok(_) => {
594 self.send_result_to_relayer(TxVerificationResult::Ok {
595 original_peer: Some(orphan.peer),
596 tx_hash: orphan.tx.hash(),
597 });
598 debug!(
599 "process_orphan {} success, find previous from {}",
600 orphan.tx.hash(),
601 tx.hash()
602 );
603 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
604 orphan_queue.push_back(orphan.tx);
605 }
606 Err(reject) => {
607 debug!(
608 "process_orphan {} reject {}, find previous from {}",
609 orphan.tx.hash(),
610 reject,
611 tx.hash(),
612 );
613
614 if !is_missing_input(&reject) {
615 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
616 if reject.is_malformed_tx() {
617 self.ban_malformed(orphan.peer, format!("reject {reject}"))
618 .await;
619 }
620 if reject.is_allowed_relay() {
621 self.send_result_to_relayer(TxVerificationResult::Reject {
622 tx_hash: orphan.tx.hash(),
623 });
624 }
625 if reject.should_recorded() {
626 self.put_recent_reject(&orphan.tx.hash(), &reject).await;
627 }
628 }
629 }
630 }
631 }
632 }
633 }
634 }
635
636 pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
637 if let Err(e) = self.tx_relay_sender.send(result) {
638 error!("tx-pool tx_relay_sender internal error {}", e);
639 }
640 }
641
642 async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
643 const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
644
645 #[cfg(feature = "with_sentry")]
646 use sentry::{Level, capture_message, with_scope};
647
648 #[cfg(feature = "with_sentry")]
649 with_scope(
650 |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
651 || {
652 capture_message(
653 &format!(
654 "Ban peer {} for {} seconds, reason: \
655 {}",
656 peer,
657 DEFAULT_BAN_TIME.as_secs(),
658 reason
659 ),
660 Level::Info,
661 )
662 },
663 );
664 self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
665 self.verify_queue.write().await.remove_txs_by_peer(&peer);
666 }
667
668 pub(crate) async fn _process_tx(
669 &self,
670 tx: TransactionView,
671 declared_cycles: Option<Cycle>,
672 command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
673 ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
674 let wtx_hash = tx.witness_hash();
675 let instant = Instant::now();
676 let is_sync_process = command_rx.is_none();
677
678 let (ret, snapshot) = self.pre_check(&tx).await;
679
680 let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
681
682 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
683 let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
684 let tip_header = snapshot.tip_header();
685 let tx_env = Arc::new(status.with_env(tip_header));
686
687 let verified_ret = verify_rtx(
688 Arc::clone(&snapshot),
689 Arc::clone(&rtx),
690 tx_env,
691 &verify_cache,
692 max_cycles,
693 command_rx,
694 )
695 .await;
696
697 let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
698
699 if let Some(declared) = declared_cycles {
700 if declared != verified.cycles {
701 info!(
702 "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
703 declared, verified.cycles, tx
704 );
705 return Some((
706 Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
707 snapshot,
708 ));
709 }
710 }
711
712 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
713
714 let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
715 try_or_return_with_snapshot!(ret, submit_snapshot);
716
717 self.notify_block_assembler(status).await;
718
719 if verify_cache.is_none() {
720 let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
722 tokio::spawn(async move {
723 let mut guard = txs_verify_cache.write().await;
724 guard.put(wtx_hash, verified);
725 });
726 }
727
728 if let Some(metrics) = ckb_metrics::handle() {
729 let elapsed = instant.elapsed().as_secs_f64();
730 if is_sync_process {
731 metrics.ckb_tx_pool_sync_process.observe(elapsed);
732 } else {
733 metrics.ckb_tx_pool_async_process.observe(elapsed);
734 }
735 }
736
737 Some((Ok(verified), submit_snapshot))
738 }
739
740 pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
741 let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
742
743 let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
744
745 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
748 let max_cycles = self.consensus.max_block_cycles();
749 let tip_header = snapshot.tip_header();
750 let tx_env = Arc::new(status.with_env(tip_header));
751
752 verify_rtx(
753 Arc::clone(&snapshot),
754 Arc::clone(&rtx),
755 tx_env,
756 &verify_cache,
757 max_cycles,
758 None,
759 )
760 .await
761 }
762
763 pub(crate) async fn update_tx_pool_for_reorg(
764 &self,
765 detached_blocks: VecDeque<BlockView>,
766 attached_blocks: VecDeque<BlockView>,
767 detached_proposal_id: HashSet<ProposalShortId>,
768 snapshot: Arc<Snapshot>,
769 ) {
770 let mine_mode = self.block_assembler.is_some();
771 let mut detached = LinkedHashSet::default();
772 let mut attached = LinkedHashSet::default();
773
774 let detached_headers: HashSet<Byte32> = detached_blocks
775 .iter()
776 .map(|blk| blk.header().hash())
777 .collect();
778
779 for blk in detached_blocks {
780 detached.extend(blk.transactions().into_iter().skip(1))
781 }
782
783 for blk in attached_blocks {
784 self.fee_estimator.commit_block(&blk);
785 attached.extend(blk.transactions().into_iter().skip(1));
786 }
787 let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
788
789 let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
790
791 {
796 let mut tx_pool = self.tx_pool.write().await;
798
799 _update_tx_pool_for_reorg(
800 &mut tx_pool,
801 &attached,
802 &detached_headers,
803 detached_proposal_id,
804 snapshot,
805 &self.callbacks,
806 mine_mode,
807 );
808
809 self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
811 .await;
812 }
813
814 self.remove_orphan_txs_by_attach(&attached).await;
815 {
816 let mut queue = self.verify_queue.write().await;
817 queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
818 }
819 }
820
821 async fn enqueue_verify_queue(
822 &self,
823 tx: TransactionView,
824 remote: Option<(Cycle, PeerIndex)>,
825 ) -> Result<bool, Reject> {
826 let mut queue = self.verify_queue.write().await;
827 queue.add_tx(tx, remote)
828 }
829
830 async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
831 for tx in txs.iter() {
832 self.process_orphan_tx(tx).await;
833 }
834 let mut orphan = self.orphan.write().await;
835 orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
836 }
837
838 async fn readd_detached_tx(
839 &self,
840 tx_pool: &mut TxPool,
841 txs: Vec<TransactionView>,
842 fetched_cache: HashMap<Byte32, CacheEntry>,
843 ) {
844 let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
845 for tx in txs {
846 let tx_size = tx.data().serialized_size_in_block();
847 let tx_hash = tx.hash();
848 if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) {
849 if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) {
850 let verify_cache = fetched_cache.get(&tx_hash).cloned();
851 let snapshot = tx_pool.cloned_snapshot();
852 let tip_header = snapshot.tip_header();
853 let tx_env = Arc::new(status.with_env(tip_header));
854 if let Ok(verified) = verify_rtx(
855 snapshot,
856 Arc::clone(&rtx),
857 tx_env,
858 &verify_cache,
859 max_cycles,
860 None,
861 )
862 .await
863 {
864 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
865 if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
866 error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
867 } else {
868 debug!("readd_detached_tx submit_entry {}", tx_hash);
869 }
870 }
871 }
872 }
873 }
874 }
875
876 pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
877 {
878 let mut tx_pool = self.tx_pool.write().await;
879 tx_pool.clear(Arc::clone(&new_snapshot));
880 }
881 if self
883 .block_assembler_sender
884 .send(BlockAssemblerMessage::Reset(new_snapshot))
885 .await
886 .is_err()
887 {
888 error!("block_assembler receiver dropped");
889 }
890 }
891
892 pub(crate) async fn save_pool(&self) {
893 let mut tx_pool = self.tx_pool.write().await;
894 if let Err(err) = tx_pool.save_into_file() {
895 error!("failed to save pool, error: {:?}", err)
896 } else {
897 info!("TxPool saved successfully")
898 }
899 }
900
901 pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
902 self.fee_estimator.update_ibd_state(in_ibd);
903 }
904
905 pub(crate) async fn estimate_fee_rate(
906 &self,
907 estimate_mode: EstimateMode,
908 enable_fallback: bool,
909 ) -> Result<FeeRate, AnyError> {
910 let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
911 match self
912 .fee_estimator
913 .estimate_fee_rate(estimate_mode, all_entry_info)
914 {
915 Ok(fee_rate) => Ok(fee_rate),
916 Err(err) => {
917 if enable_fallback {
918 let target_blocks =
919 FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
920 self.tx_pool
921 .read()
922 .await
923 .estimate_fee_rate(target_blocks)
924 .map_err(Into::into)
925 } else {
926 Err(err.into())
927 }
928 }
929 }
930 }
931}
932
933type PreCheckedTx = (
934 Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
940
941type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
942
943fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
944 if snapshot.proposals().contains_proposed(short_id) {
945 TxStatus::Proposed
946 } else if snapshot.proposals().contains_gap(short_id) {
947 TxStatus::Gap
948 } else {
949 TxStatus::Fresh
950 }
951}
952
953fn check_rtx(
954 tx_pool: &TxPool,
955 snapshot: &Snapshot,
956 rtx: &ResolvedTransaction,
957) -> Result<TxStatus, Reject> {
958 let short_id = rtx.transaction.proposal_short_id();
959 let tx_status = get_tx_status(snapshot, &short_id);
960 tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
961}
962
963fn resolve_tx(
964 tx_pool: &TxPool,
965 snapshot: &Snapshot,
966 tx: TransactionView,
967 rbf: bool,
968) -> ResolveResult {
969 let short_id = tx.proposal_short_id();
970 let tx_status = get_tx_status(snapshot, &short_id);
971 tx_pool
972 .resolve_tx_from_pool(tx, rbf)
973 .map(|rtx| (rtx, tx_status))
974}
975
976fn _submit_entry(
977 tx_pool: &mut TxPool,
978 status: TxStatus,
979 entry: TxEntry,
980 callbacks: &Callbacks,
981) -> Result<HashSet<TxEntry>, Reject> {
982 let tx_hash = entry.transaction().hash();
983 debug!("submit_entry {:?} {}", status, tx_hash);
984 let (succ, evicts) = match status {
985 TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
986 TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
987 TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
988 };
989 if succ {
990 match status {
991 TxStatus::Fresh => callbacks.call_pending(&entry),
992 TxStatus::Gap => callbacks.call_pending(&entry),
993 TxStatus::Proposed => callbacks.call_proposed(&entry),
994 }
995 }
996 Ok(evicts)
997}
998
999fn _update_tx_pool_for_reorg(
1000 tx_pool: &mut TxPool,
1001 attached: &LinkedHashSet<TransactionView>,
1002 detached_headers: &HashSet<Byte32>,
1003 detached_proposal_id: HashSet<ProposalShortId>,
1004 snapshot: Arc<Snapshot>,
1005 callbacks: &Callbacks,
1006 mine_mode: bool,
1007) {
1008 tx_pool.snapshot = Arc::clone(&snapshot);
1009
1010 tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1016 tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1017
1018 if mine_mode {
1022 let mut proposals = Vec::new();
1023 let mut gaps = Vec::new();
1024
1025 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1026 let short_id = entry.inner.proposal_short_id();
1027 if snapshot.proposals().contains_proposed(&short_id) {
1028 proposals.push((short_id, entry.inner.clone()));
1029 }
1030 }
1031
1032 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1033 let short_id = entry.inner.proposal_short_id();
1034 let elem = (short_id.clone(), entry.inner.clone());
1035 if snapshot.proposals().contains_proposed(&short_id) {
1036 proposals.push(elem);
1037 } else if snapshot.proposals().contains_gap(&short_id) {
1038 gaps.push(elem);
1039 }
1040 }
1041
1042 for (id, entry) in proposals {
1043 debug!("begin to proposed: {:x}", id);
1044 if let Err(e) = tx_pool.proposed_rtx(&id) {
1045 debug!(
1046 "Failed to add proposed tx {}, reason: {}",
1047 entry.transaction().hash(),
1048 e
1049 );
1050 callbacks.call_reject(tx_pool, &entry, e);
1051 } else {
1052 callbacks.call_proposed(&entry)
1053 }
1054 }
1055
1056 for (id, entry) in gaps {
1057 debug!("begin to gap: {:x}", id);
1058 if let Err(e) = tx_pool.gap_rtx(&id) {
1059 debug!(
1060 "Failed to add tx to gap {}, reason: {}",
1061 entry.transaction().hash(),
1062 e
1063 );
1064 callbacks.call_reject(tx_pool, &entry, e.clone());
1065 }
1066 }
1067 }
1068
1069 tx_pool.remove_expired(callbacks);
1071
1072 let _ = tx_pool.limit_size(callbacks, None);
1074}