1use crate::callback::Callbacks;
2use crate::component::entry::TxEntry;
3use crate::component::orphan::Entry as OrphanEntry;
4use crate::component::pool_map::Status;
5use crate::error::Reject;
6use crate::pool::TxPool;
7use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
8use crate::try_or_return_with_snapshot;
9use crate::util::{
10 check_tx_fee, check_txid_collision, is_missing_input, non_contextual_verify,
11 time_relative_verify, verify_rtx,
12};
13use ckb_error::{AnyError, InternalErrorKind};
14use ckb_fee_estimator::FeeEstimator;
15use ckb_jsonrpc_types::BlockTemplate;
16use ckb_logger::Level::Trace;
17use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
18use ckb_network::PeerIndex;
19use ckb_script::ChunkCommand;
20use ckb_snapshot::Snapshot;
21use ckb_types::core::error::OutPointError;
22use ckb_types::{
23 core::{
24 BlockView, Capacity, Cycle, EstimateMode, FeeRate, HeaderView, TransactionView,
25 cell::ResolvedTransaction,
26 },
27 packed::{Byte32, ProposalShortId},
28};
29use ckb_util::LinkedHashSet;
30use ckb_verification::{
31 TxVerifyEnv,
32 cache::{CacheEntry, Completed},
33};
34use std::collections::HashSet;
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use tokio::sync::watch;
39
40pub enum PlugTarget {
42 Pending,
44 Proposed,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TxStatus {
50 Fresh,
51 Gap,
52 Proposed,
53}
54
55impl TxStatus {
56 fn with_env(self, header: &HeaderView) -> TxVerifyEnv {
57 match self {
58 TxStatus::Fresh => TxVerifyEnv::new_submit(header),
59 TxStatus::Gap => TxVerifyEnv::new_proposed(header, 0),
60 TxStatus::Proposed => TxVerifyEnv::new_proposed(header, 1),
61 }
62 }
63}
64
65impl TxPoolService {
66 pub(crate) async fn get_block_template(&self) -> Result<BlockTemplate, AnyError> {
67 if let Some(ref block_assembler) = self.block_assembler {
68 Ok(block_assembler.get_current().await)
69 } else {
70 Err(InternalErrorKind::Config
71 .other("BlockAssembler disabled")
72 .into())
73 }
74 }
75
76 pub(crate) async fn fetch_tx_verify_cache(&self, tx: &TransactionView) -> Option<CacheEntry> {
77 let guard = self.txs_verify_cache.read().await;
78 guard.peek(&tx.witness_hash()).cloned()
79 }
80
81 async fn fetch_txs_verify_cache(
82 &self,
83 txs: impl Iterator<Item = &TransactionView>,
84 ) -> HashMap<Byte32, CacheEntry> {
85 let guard = self.txs_verify_cache.read().await;
86 txs.filter_map(|tx| {
87 let wtx_hash = tx.witness_hash();
88 guard
89 .peek(&wtx_hash)
90 .cloned()
91 .map(|value| (wtx_hash, value))
92 })
93 .collect()
94 }
95
96 pub(crate) async fn submit_entry(
97 &self,
98 pre_resolve_tip: Byte32,
99 entry: TxEntry,
100 mut status: TxStatus,
101 ) -> (Result<(), Reject>, Arc<Snapshot>) {
102 let (ret, snapshot) = self
103 .with_tx_pool_write_lock(move |tx_pool, snapshot| {
104 let conflicts = if tx_pool.enable_rbf() {
106 tx_pool.check_rbf(&snapshot, &entry)?
107 } else {
108 let conflicted_outpoint =
111 tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
112 if let Some(outpoint) = conflicted_outpoint {
113 return Err(Reject::Resolve(OutPointError::Dead(outpoint)));
114 }
115 HashSet::new()
116 };
117
118 let tip_hash = snapshot.tip_hash();
120 if pre_resolve_tip != tip_hash {
121 debug!(
122 "submit_entry {} context changed. previous:{} now:{}",
123 entry.proposal_short_id(),
124 pre_resolve_tip,
125 tip_hash
126 );
127
128 status = check_rtx(tx_pool, &snapshot, &entry.rtx)?;
130
131 let tip_header = snapshot.tip_header();
132 let tx_env = status.with_env(tip_header);
133 time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
134 }
135
136 let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
137 let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;
138
139 for evict in evicted {
142 let reject = Reject::Invalidated(format!(
143 "invalidated by tx {}",
144 evict.transaction().hash()
145 ));
146 self.callbacks.call_reject(tx_pool, &evict, reject);
147 }
148
149 tx_pool.remove_conflict(&entry.proposal_short_id());
150 tx_pool
151 .limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
152 .map_or(Ok(()), Err)?;
153
154 if !may_recovered_txs.is_empty() {
155 let self_clone = self.clone();
156 tokio::spawn(async move {
157 let mut queue = self_clone.verify_queue.write().await;
159 for tx in may_recovered_txs {
160 debug!("recover back: {:?}", tx.proposal_short_id());
161 let _ = queue.add_tx(tx, false, None);
162 }
163 });
164 }
165 Ok(())
166 })
167 .await;
168
169 (ret, snapshot)
170 }
171
172 pub(crate) async fn notify_block_assembler(&self, status: TxStatus) {
173 if self.should_notify_block_assembler() {
174 match status {
175 TxStatus::Fresh => {
176 if self
177 .block_assembler_sender
178 .send(BlockAssemblerMessage::Pending)
179 .await
180 .is_err()
181 {
182 error!("block_assembler receiver dropped");
183 }
184 }
185 TxStatus::Proposed => {
186 if self
187 .block_assembler_sender
188 .send(BlockAssemblerMessage::Proposed)
189 .await
190 .is_err()
191 {
192 error!("block_assembler receiver dropped");
193 }
194 }
195 _ => {}
196 }
197 }
198 }
199
200 fn process_rbf(
203 &self,
204 tx_pool: &mut TxPool,
205 entry: &TxEntry,
206 conflicts: &HashSet<ProposalShortId>,
207 ) -> Vec<TransactionView> {
208 let mut may_recovered_txs = vec![];
209 let mut available_inputs = HashSet::new();
210
211 if conflicts.is_empty() {
212 return may_recovered_txs;
213 }
214
215 let all_removed: Vec<_> = conflicts
216 .iter()
217 .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
218 .collect();
219
220 available_inputs.extend(
221 all_removed
222 .iter()
223 .flat_map(|removed| removed.transaction().input_pts_iter()),
224 );
225
226 for input in entry.transaction().input_pts_iter() {
227 available_inputs.remove(&input);
228 }
229
230 may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
231 for old in all_removed {
232 debug!(
233 "remove conflict tx {} for RBF by new tx {}",
234 old.transaction().hash(),
235 entry.transaction().hash()
236 );
237 let reject =
238 Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));
239
240 tx_pool.record_conflict(old.transaction().clone());
242 self.callbacks.call_reject(tx_pool, &old, reject);
244 }
245 assert!(!may_recovered_txs.contains(entry.transaction()));
246 may_recovered_txs
247 }
248
249 pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
250 let queue = self.verify_queue.read().await;
251 queue.contains_key(&tx.proposal_short_id())
252 }
253
254 pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
255 let orphan = self.orphan.read().await;
256 orphan.contains_key(&tx.proposal_short_id())
257 }
258
259 pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
260 &self,
261 mut f: F,
262 ) -> (U, Arc<Snapshot>) {
263 let tx_pool = self.tx_pool.read().await;
264 let snapshot = tx_pool.cloned_snapshot();
265
266 let ret = f(&tx_pool, Arc::clone(&snapshot));
267 (ret, snapshot)
268 }
269
270 pub(crate) async fn with_tx_pool_write_lock<U, F: FnMut(&mut TxPool, Arc<Snapshot>) -> U>(
271 &self,
272 mut f: F,
273 ) -> (U, Arc<Snapshot>) {
274 let mut tx_pool = self.tx_pool.write().await;
275 let snapshot = tx_pool.cloned_snapshot();
276
277 let ret = f(&mut tx_pool, Arc::clone(&snapshot));
278 (ret, snapshot)
279 }
280
281 pub(crate) async fn pre_check(
282 &self,
283 tx: &TransactionView,
284 ) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
285 let tx_size = tx.data().serialized_size_in_block();
287
288 let (ret, snapshot) = self
289 .with_tx_pool_read_lock(|tx_pool, snapshot| {
290 let tip_hash = snapshot.tip_hash();
291
292 check_txid_collision(tx_pool, tx)?;
295
296 let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false);
299 match res {
300 Ok((rtx, status)) => {
301 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
302 Ok((tip_hash, rtx, status, fee, tx_size))
303 }
304 Err(Reject::Resolve(OutPointError::Dead(out))) => {
305 let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
306 let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
307 let conflicts = tx_pool.pool_map.find_conflict_outpoint(tx);
308 if conflicts.is_none() {
309 error!(
312 "{} is resolved as Dead, but there is no conflicted tx",
313 rtx.transaction.proposal_short_id()
314 );
315 return Err(Reject::Resolve(OutPointError::Dead(out)));
316 }
317 Ok((tip_hash, rtx, status, fee, tx_size))
322 }
323 Err(err) => Err(err),
324 }
325 })
326 .await;
327 (ret, snapshot)
328 }
329
330 pub(crate) async fn non_contextual_verify(
331 &self,
332 tx: &TransactionView,
333 remote: Option<(Cycle, PeerIndex)>,
334 ) -> Result<(), Reject> {
335 if let Err(reject) = non_contextual_verify(&self.consensus, tx) {
336 if reject.is_malformed_tx()
337 && let Some(remote) = remote
338 {
339 self.ban_malformed(remote.1, format!("reject {reject}"))
340 .await;
341 }
342 return Err(reject);
343 }
344 Ok(())
345 }
346
347 pub(crate) async fn resumeble_process_tx(
348 &self,
349 tx: TransactionView,
350 is_proposal_tx: bool,
351 remote: Option<(Cycle, PeerIndex)>,
352 ) -> Result<bool, Reject> {
353 self.non_contextual_verify(&tx, remote).await?;
355
356 if self.orphan_contains(&tx).await {
357 debug!("reject tx {} already in orphan pool", tx.hash());
358 return Err(Reject::Duplicated(tx.hash()));
359 }
360
361 if self.verify_queue_contains(&tx).await {
362 return Err(Reject::Duplicated(tx.hash()));
363 }
364 self.enqueue_verify_queue(tx, is_proposal_tx, remote).await
365 }
366
367 pub(crate) async fn test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
368 self.non_contextual_verify(&tx, None).await?;
370
371 if self.verify_queue_contains(&tx).await {
372 return Err(Reject::Duplicated(tx.hash()));
373 }
374
375 if self.orphan_contains(&tx).await {
376 debug!("reject tx {} already in orphan pool", tx.hash());
377 return Err(Reject::Duplicated(tx.hash()));
378 }
379 self._test_accept_tx(tx.clone()).await
380 }
381
382 pub(crate) async fn process_tx(
383 &self,
384 tx: TransactionView,
385 remote: Option<(Cycle, PeerIndex)>,
386 ) -> Result<Completed, Reject> {
387 self.non_contextual_verify(&tx, remote).await?;
389
390 if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
391 return Err(Reject::Duplicated(tx.hash()));
392 }
393
394 if let Some((ret, snapshot)) = self
395 ._process_tx(tx.clone(), remote.map(|r| r.0), None)
396 .await
397 {
398 self.after_process(tx, remote, &snapshot, &ret).await;
399 ret
400 } else {
401 Ok(Completed {
403 cycles: 0,
404 fee: Capacity::zero(),
405 })
406 }
407 }
408
409 pub(crate) async fn put_recent_reject(&self, tx_hash: &Byte32, reject: &Reject) {
410 let mut tx_pool = self.tx_pool.write().await;
411 if let Some(ref mut recent_reject) = tx_pool.recent_reject
412 && let Err(e) = recent_reject.put(tx_hash, reject.clone())
413 {
414 error!(
415 "Failed to record recent_reject {} {} {}",
416 tx_hash, reject, e
417 );
418 }
419 }
420
421 pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
422 let id = ProposalShortId::from_tx_hash(&tx_hash);
423 {
424 let mut queue = self.verify_queue.write().await;
425 if queue.remove_tx(&id).is_some() {
426 return true;
427 }
428 }
429 {
430 let mut orphan = self.orphan.write().await;
431 if orphan.remove_orphan_tx(&id).is_some() {
432 return true;
433 }
434 }
435 let mut tx_pool = self.tx_pool.write().await;
436 tx_pool.remove_tx(&id)
437 }
438
439 pub(crate) async fn after_process(
440 &self,
441 tx: TransactionView,
442 remote: Option<(Cycle, PeerIndex)>,
443 _snapshot: &Snapshot,
444 ret: &Result<Completed, Reject>,
445 ) {
446 let tx_hash = tx.hash();
447
448 if log_enabled_target!("ckb_tx_monitor", Trace)
450 && let Ok(c) = ret
451 {
452 trace_target!(
453 "ckb_tx_monitor",
454 r#"{{"tx_hash":"{:#x}","cycles":{}}}"#,
455 tx_hash,
456 c.cycles
457 );
458 }
459
460 if matches!(
461 ret,
462 Err(Reject::RBFRejected(..) | Reject::Resolve(OutPointError::Dead(_)))
463 ) {
464 let mut tx_pool = self.tx_pool.write().await;
465 if tx_pool.pool_map.find_conflict_outpoint(&tx).is_some() {
466 tx_pool.record_conflict(tx.clone());
467 }
468 }
469
470 match remote {
471 Some((declared_cycle, peer)) => match ret {
472 Ok(_) => {
473 debug!(
474 "after_process remote send_result_to_relayer {} {}",
475 tx_hash, peer
476 );
477 self.send_result_to_relayer(TxVerificationResult::Ok {
478 original_peer: Some(peer),
479 tx_hash,
480 });
481 self.process_orphan_tx(&tx).await;
482 }
483 Err(reject) => {
484 info!(
485 "after_process {} {} remote reject: {} ",
486 tx_hash, peer, reject
487 );
488 if is_missing_input(reject) {
489 self.send_result_to_relayer(TxVerificationResult::UnknownParents {
490 peer,
491 parents: tx.unique_parents(),
492 });
493 self.add_orphan(tx, peer, declared_cycle).await;
494 } else {
495 if reject.is_malformed_tx() {
496 self.ban_malformed(peer, format!("reject {reject}")).await;
497 }
498 if reject.is_allowed_relay() {
499 self.send_result_to_relayer(TxVerificationResult::Reject {
500 tx_hash: tx_hash.clone(),
501 });
502 }
503 if reject.should_recorded() {
504 self.put_recent_reject(&tx_hash, reject).await;
505 }
506 }
507 }
508 },
509 None => {
510 match ret {
511 Ok(_) => {
512 debug!("after_process local send_result_to_relayer {}", tx_hash);
513 self.send_result_to_relayer(TxVerificationResult::Ok {
514 original_peer: None,
515 tx_hash,
516 });
517 self.process_orphan_tx(&tx).await;
518 }
519 Err(Reject::Duplicated(_)) => {
520 debug!("after_process {} duplicated", tx_hash);
521 self.send_result_to_relayer(TxVerificationResult::Ok {
523 original_peer: None,
524 tx_hash,
525 });
526 }
527 Err(reject) => {
528 debug!("after_process {} reject: {} ", tx_hash, reject);
529 if reject.should_recorded() {
530 self.put_recent_reject(&tx_hash, reject).await;
531 }
532 }
533 }
534 }
535 }
536 }
537
538 pub(crate) async fn add_orphan(
539 &self,
540 tx: TransactionView,
541 peer: PeerIndex,
542 declared_cycle: Cycle,
543 ) {
544 let evicted_txs = self
545 .orphan
546 .write()
547 .await
548 .add_orphan_tx(tx, peer, declared_cycle);
549 for tx_hash in evicted_txs {
552 self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash });
553 }
554 }
555
556 pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
557 let orphan = self.orphan.read().await;
558 orphan
559 .find_by_previous(tx)
560 .iter()
561 .filter_map(|id| orphan.get(id).cloned())
562 .collect::<Vec<_>>()
563 }
564
565 pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
566 self.orphan.write().await.remove_orphan_tx(id);
567 }
568
569 pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) {
573 let mut orphan_queue: VecDeque<TransactionView> = VecDeque::new();
574 orphan_queue.push_back(tx.clone());
575
576 while let Some(previous) = orphan_queue.pop_front() {
577 let orphans = self.find_orphan_by_previous(&previous).await;
578 for orphan in orphans.into_iter() {
579 if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
580 debug!(
581 "process_orphan {} added to verify queue; find previous from {}",
582 orphan.tx.hash(),
583 tx.hash(),
584 );
585 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
586 self.enqueue_verify_queue(orphan.tx, false, Some((orphan.cycle, orphan.peer)))
587 .await
588 .expect("enqueue suspended tx");
589 } else if let Some((ret, _snapshot)) = self
590 ._process_tx(orphan.tx.clone(), Some(orphan.cycle), None)
591 .await
592 {
593 match ret {
594 Ok(_) => {
595 self.send_result_to_relayer(TxVerificationResult::Ok {
596 original_peer: Some(orphan.peer),
597 tx_hash: orphan.tx.hash(),
598 });
599 debug!(
600 "process_orphan {} success, find previous from {}",
601 orphan.tx.hash(),
602 tx.hash()
603 );
604 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
605 orphan_queue.push_back(orphan.tx);
606 }
607 Err(reject) => {
608 debug!(
609 "process_orphan {} reject {}, find previous from {}",
610 orphan.tx.hash(),
611 reject,
612 tx.hash(),
613 );
614
615 if !is_missing_input(&reject) {
616 self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
617 if reject.is_malformed_tx() {
618 self.ban_malformed(orphan.peer, format!("reject {reject}"))
619 .await;
620 }
621 if reject.is_allowed_relay() {
622 self.send_result_to_relayer(TxVerificationResult::Reject {
623 tx_hash: orphan.tx.hash(),
624 });
625 }
626 if reject.should_recorded() {
627 self.put_recent_reject(&orphan.tx.hash(), &reject).await;
628 }
629 }
630 }
631 }
632 }
633 }
634 }
635 }
636
637 pub(crate) fn send_result_to_relayer(&self, result: TxVerificationResult) {
638 if let Err(e) = self.tx_relay_sender.send(result) {
639 error!("tx-pool tx_relay_sender internal error {}", e);
640 }
641 }
642
643 async fn ban_malformed(&self, peer: PeerIndex, reason: String) {
644 const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
645
646 #[cfg(feature = "with_sentry")]
647 use sentry::{Level, capture_message, with_scope};
648
649 #[cfg(feature = "with_sentry")]
650 with_scope(
651 |scope| scope.set_fingerprint(Some(&["ckb-tx-pool", "receive-invalid-remote-tx"])),
652 || {
653 capture_message(
654 &format!(
655 "Ban peer {} for {} seconds, reason: \
656 {}",
657 peer,
658 DEFAULT_BAN_TIME.as_secs(),
659 reason
660 ),
661 Level::Info,
662 )
663 },
664 );
665 self.network.ban_peer(peer, DEFAULT_BAN_TIME, reason);
666 self.verify_queue.write().await.remove_txs_by_peer(&peer);
667 }
668
669 pub(crate) async fn _process_tx(
670 &self,
671 tx: TransactionView,
672 declared_cycles: Option<Cycle>,
673 command_rx: Option<&mut watch::Receiver<ChunkCommand>>,
674 ) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
675 let wtx_hash = tx.witness_hash();
676 let instant = Instant::now();
677 let is_sync_process = command_rx.is_none();
678
679 let (ret, snapshot) = self.pre_check(&tx).await;
680
681 let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
682
683 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
684 let max_cycles = declared_cycles.unwrap_or_else(|| self.consensus.max_block_cycles());
685 let tip_header = snapshot.tip_header();
686 let tx_env = Arc::new(status.with_env(tip_header));
687
688 let verified_ret = verify_rtx(
689 Arc::clone(&snapshot),
690 Arc::clone(&rtx),
691 tx_env,
692 &verify_cache,
693 max_cycles,
694 command_rx,
695 )
696 .await;
697
698 let verified = try_or_return_with_snapshot!(verified_ret, snapshot);
699
700 if let Some(declared) = declared_cycles
701 && declared != verified.cycles
702 {
703 info!(
704 "process_tx declared cycles not match verified cycles, declared: {:?} verified: {:?}, tx: {:?}",
705 declared, verified.cycles, tx
706 );
707 return Some((
708 Err(Reject::DeclaredWrongCycles(declared, verified.cycles)),
709 snapshot,
710 ));
711 }
712
713 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
714
715 let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await;
716 try_or_return_with_snapshot!(ret, submit_snapshot);
717
718 self.notify_block_assembler(status).await;
719
720 if verify_cache.is_none() {
721 let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
723 tokio::spawn(async move {
724 let mut guard = txs_verify_cache.write().await;
725 guard.put(wtx_hash, verified);
726 });
727 }
728
729 if let Some(metrics) = ckb_metrics::handle() {
730 let elapsed = instant.elapsed().as_secs_f64();
731 if is_sync_process {
732 metrics.ckb_tx_pool_sync_process.observe(elapsed);
733 } else {
734 metrics.ckb_tx_pool_async_process.observe(elapsed);
735 }
736 }
737
738 Some((Ok(verified), submit_snapshot))
739 }
740
741 pub(crate) async fn _test_accept_tx(&self, tx: TransactionView) -> Result<Completed, Reject> {
742 let (pre_check_ret, snapshot) = self.pre_check(&tx).await;
743
744 let (_tip_hash, rtx, status, _fee, _tx_size) = pre_check_ret?;
745
746 let verify_cache = self.fetch_tx_verify_cache(&tx).await;
749 let max_cycles = self.consensus.max_block_cycles();
750 let tip_header = snapshot.tip_header();
751 let tx_env = Arc::new(status.with_env(tip_header));
752
753 verify_rtx(
754 Arc::clone(&snapshot),
755 Arc::clone(&rtx),
756 tx_env,
757 &verify_cache,
758 max_cycles,
759 None,
760 )
761 .await
762 }
763
764 pub(crate) async fn update_tx_pool_for_reorg(
765 &self,
766 detached_blocks: VecDeque<BlockView>,
767 attached_blocks: VecDeque<BlockView>,
768 detached_proposal_id: HashSet<ProposalShortId>,
769 snapshot: Arc<Snapshot>,
770 ) {
771 let mine_mode = self.block_assembler.is_some();
772 let mut detached = LinkedHashSet::default();
773 let mut attached = LinkedHashSet::default();
774
775 let detached_headers: HashSet<Byte32> = detached_blocks
776 .iter()
777 .map(|blk| blk.header().hash())
778 .collect();
779
780 for blk in detached_blocks {
781 detached.extend(blk.transactions().into_iter().skip(1))
782 }
783
784 for blk in attached_blocks {
785 self.fee_estimator.commit_block(&blk);
786 attached.extend(blk.transactions().into_iter().skip(1));
787 }
788 let retain: Vec<TransactionView> = detached.difference(&attached).cloned().collect();
789
790 let fetched_cache = self.fetch_txs_verify_cache(retain.iter()).await;
791
792 {
797 let mut tx_pool = self.tx_pool.write().await;
799
800 _update_tx_pool_for_reorg(
801 &mut tx_pool,
802 &attached,
803 &detached_headers,
804 detached_proposal_id,
805 snapshot,
806 &self.callbacks,
807 mine_mode,
808 );
809
810 self.readd_detached_tx(&mut tx_pool, retain, fetched_cache)
812 .await;
813 }
814
815 self.remove_orphan_txs_by_attach(&attached).await;
816 {
817 let mut queue = self.verify_queue.write().await;
818 queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
819 }
820 }
821
822 async fn enqueue_verify_queue(
823 &self,
824 tx: TransactionView,
825 is_proposal_tx: bool,
826 remote: Option<(Cycle, PeerIndex)>,
827 ) -> Result<bool, Reject> {
828 let mut queue = self.verify_queue.write().await;
829 queue.add_tx(tx, is_proposal_tx, remote)
830 }
831
832 async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet<TransactionView>) {
833 for tx in txs.iter() {
834 self.process_orphan_tx(tx).await;
835 }
836 let mut orphan = self.orphan.write().await;
837 orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id()));
838 }
839
840 async fn readd_detached_tx(
841 &self,
842 tx_pool: &mut TxPool,
843 txs: Vec<TransactionView>,
844 fetched_cache: HashMap<Byte32, CacheEntry>,
845 ) {
846 let max_cycles = self.tx_pool_config.max_tx_verify_cycles;
847 for tx in txs {
848 let tx_size = tx.data().serialized_size_in_block();
849 let tx_hash = tx.hash();
850 if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false)
851 && let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size)
852 {
853 let verify_cache = fetched_cache.get(&tx_hash).cloned();
854 let snapshot = tx_pool.cloned_snapshot();
855 let tip_header = snapshot.tip_header();
856 let tx_env = Arc::new(status.with_env(tip_header));
857 if let Ok(verified) = verify_rtx(
858 snapshot,
859 Arc::clone(&rtx),
860 tx_env,
861 &verify_cache,
862 max_cycles,
863 None,
864 )
865 .await
866 {
867 let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size);
868 if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) {
869 error!("readd_detached_tx submit_entry {} error {}", tx_hash, e);
870 } else {
871 debug!("readd_detached_tx submit_entry {}", tx_hash);
872 }
873 }
874 }
875 }
876 }
877
878 pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc<Snapshot>) {
879 {
880 let mut tx_pool = self.tx_pool.write().await;
881 tx_pool.clear(Arc::clone(&new_snapshot));
882 }
883 if self
885 .block_assembler_sender
886 .send(BlockAssemblerMessage::Reset(new_snapshot))
887 .await
888 .is_err()
889 {
890 error!("block_assembler receiver dropped");
891 }
892 }
893
894 pub(crate) async fn save_pool(&self) {
895 let mut tx_pool = self.tx_pool.write().await;
896 if let Err(err) = tx_pool.save_into_file() {
897 error!("failed to save pool, error: {:?}", err)
898 } else {
899 info!("TxPool saved successfully")
900 }
901 }
902
903 pub(crate) async fn update_ibd_state(&self, in_ibd: bool) {
904 self.fee_estimator.update_ibd_state(in_ibd);
905 }
906
907 pub(crate) async fn estimate_fee_rate(
908 &self,
909 estimate_mode: EstimateMode,
910 enable_fallback: bool,
911 ) -> Result<FeeRate, AnyError> {
912 let all_entry_info = self.tx_pool.read().await.get_all_entry_info();
913 match self
914 .fee_estimator
915 .estimate_fee_rate(estimate_mode, all_entry_info)
916 {
917 Ok(fee_rate) => Ok(fee_rate),
918 Err(err) => {
919 if enable_fallback {
920 let target_blocks =
921 FeeEstimator::target_blocks_for_estimate_mode(estimate_mode);
922 self.tx_pool
923 .read()
924 .await
925 .estimate_fee_rate(target_blocks)
926 .map_err(Into::into)
927 } else {
928 Err(err.into())
929 }
930 }
931 }
932 }
933}
934
935type PreCheckedTx = (
936 Byte32, Arc<ResolvedTransaction>, TxStatus, Capacity, usize, );
942
943type ResolveResult = Result<(Arc<ResolvedTransaction>, TxStatus), Reject>;
944
945fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus {
946 if snapshot.proposals().contains_proposed(short_id) {
947 TxStatus::Proposed
948 } else if snapshot.proposals().contains_gap(short_id) {
949 TxStatus::Gap
950 } else {
951 TxStatus::Fresh
952 }
953}
954
955fn check_rtx(
956 tx_pool: &TxPool,
957 snapshot: &Snapshot,
958 rtx: &ResolvedTransaction,
959) -> Result<TxStatus, Reject> {
960 let short_id = rtx.transaction.proposal_short_id();
961 let tx_status = get_tx_status(snapshot, &short_id);
962 tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status)
963}
964
965fn resolve_tx(
966 tx_pool: &TxPool,
967 snapshot: &Snapshot,
968 tx: TransactionView,
969 rbf: bool,
970) -> ResolveResult {
971 let short_id = tx.proposal_short_id();
972 let tx_status = get_tx_status(snapshot, &short_id);
973 tx_pool
974 .resolve_tx_from_pool(tx, rbf)
975 .map(|rtx| (rtx, tx_status))
976}
977
978fn _submit_entry(
979 tx_pool: &mut TxPool,
980 status: TxStatus,
981 entry: TxEntry,
982 callbacks: &Callbacks,
983) -> Result<HashSet<TxEntry>, Reject> {
984 let tx_hash = entry.transaction().hash();
985 debug!("submit_entry {:?} {}", status, tx_hash);
986 let (succ, evicts) = match status {
987 TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
988 TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
989 TxStatus::Proposed => tx_pool.add_proposed(entry.clone())?,
990 };
991 if succ {
992 match status {
993 TxStatus::Fresh => callbacks.call_pending(&entry),
994 TxStatus::Gap => callbacks.call_pending(&entry),
995 TxStatus::Proposed => callbacks.call_proposed(&entry),
996 }
997 }
998 Ok(evicts)
999}
1000
1001fn _update_tx_pool_for_reorg(
1002 tx_pool: &mut TxPool,
1003 attached: &LinkedHashSet<TransactionView>,
1004 detached_headers: &HashSet<Byte32>,
1005 detached_proposal_id: HashSet<ProposalShortId>,
1006 snapshot: Arc<Snapshot>,
1007 callbacks: &Callbacks,
1008 mine_mode: bool,
1009) {
1010 tx_pool.snapshot = Arc::clone(&snapshot);
1011
1012 tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
1018 tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());
1019
1020 if mine_mode {
1024 let mut proposals = Vec::new();
1025 let mut gaps = Vec::new();
1026
1027 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
1028 let short_id = entry.inner.proposal_short_id();
1029 if snapshot.proposals().contains_proposed(&short_id) {
1030 proposals.push((short_id, entry.inner.clone()));
1031 }
1032 }
1033
1034 for entry in tx_pool.pool_map.entries.get_by_status(&Status::Pending) {
1035 let short_id = entry.inner.proposal_short_id();
1036 let elem = (short_id.clone(), entry.inner.clone());
1037 if snapshot.proposals().contains_proposed(&short_id) {
1038 proposals.push(elem);
1039 } else if snapshot.proposals().contains_gap(&short_id) {
1040 gaps.push(elem);
1041 }
1042 }
1043
1044 for (id, entry) in proposals {
1045 debug!("begin to proposed: {:x}", id);
1046 if let Err(e) = tx_pool.proposed_rtx(&id) {
1047 debug!(
1048 "Failed to add proposed tx {}, reason: {}",
1049 entry.transaction().hash(),
1050 e
1051 );
1052 callbacks.call_reject(tx_pool, &entry, e);
1053 } else {
1054 callbacks.call_proposed(&entry)
1055 }
1056 }
1057
1058 for (id, entry) in gaps {
1059 debug!("begin to gap: {:x}", id);
1060 if let Err(e) = tx_pool.gap_rtx(&id) {
1061 debug!(
1062 "Failed to add tx to gap {}, reason: {}",
1063 entry.transaction().hash(),
1064 e
1065 );
1066 callbacks.call_reject(tx_pool, &entry, e.clone());
1067 }
1068 }
1069 }
1070
1071 tx_pool.remove_expired(callbacks);
1073
1074 let _ = tx_pool.limit_size(callbacks, None);
1076}