1use super::{
10 dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
11 import_notification_sink::MultiViewImportNotificationSink,
12 metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
13 multi_view_listener::MultiViewListener,
14 tx_mem_pool::{InsertionInfo, TxMemPool},
15 view::View,
16 view_store::ViewStore,
17};
18use crate::{
19 api::FullChainApi,
20 common::{
21 sliding_stat::DurationSlidingStats,
22 tracing_log_xt::{log_xt_debug, log_xt_trace},
23 STAT_SLIDING_WINDOW,
24 },
25 enactment_state::{EnactmentAction, EnactmentState},
26 fork_aware_txpool::{
27 dropped_watcher::{DroppedReason, DroppedTransaction},
28 revalidation_worker,
29 },
30 graph::{
31 self,
32 base_pool::{TimedTransactionSource, Transaction},
33 BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
34 },
35 insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
36 LOG_TARGET_STAT,
37};
38use async_trait::async_trait;
39use futures::{
40 channel::oneshot,
41 future::{self},
42 prelude::*,
43 FutureExt,
44};
45use parking_lot::Mutex;
46use soil_prometheus::Registry as PrometheusRegistry;
47use soil_client::blockchain::{HashAndNumber, TreeRoute};
48use soil_client::transaction_pool::{
49 error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
50 MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
51 TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
52};
53use std::{
54 collections::{BTreeMap, HashMap, HashSet},
55 pin::Pin,
56 sync::Arc,
57 time::{Duration, Instant},
58};
59use subsoil::core::traits::SpawnEssentialNamed;
60use subsoil::runtime::{
61 generic::BlockId,
62 traits::{Block as BlockT, NumberFor},
63 transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
64 Saturating,
65};
66use tokio::select;
67use tracing::{debug, instrument, trace, warn, Level};
68
69const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
73
74const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
79
80pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
82
83struct ReadyPoll<T, Block>
86where
87 Block: BlockT,
88{
89 pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
90}
91
92impl<T, Block> ReadyPoll<T, Block>
93where
94 Block: BlockT,
95{
96 fn new() -> Self {
98 Self { pollers: Default::default() }
99 }
100
101 fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
104 let (s, r) = oneshot::channel();
105 self.pollers.entry(at).or_default().push(s);
106 r
107 }
108
109 fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
114 debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
115 let Some(pollers) = self.pollers.remove(&at) else { return };
116 pollers.into_iter().for_each(|p| {
117 debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
118 let _ = p.send(ready_iterator());
119 });
120 }
121
122 fn remove_cancelled(&mut self) {
124 self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
125 }
126}
127
128pub struct ForkAwareTxPool<ChainApi, Block>
132where
133 Block: BlockT,
134 ChainApi: graph::ChainApi<Block = Block> + 'static,
135{
136 api: Arc<ChainApi>,
138
139 mempool: Arc<TxMemPool<ChainApi, Block>>,
141
142 view_store: Arc<ViewStore<ChainApi, Block>>,
144
145 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
147
148 metrics: PrometheusMetrics,
150
151 events_metrics_collector: EventsMetricsCollector<ChainApi>,
153
154 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
156
157 revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
159
160 import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
163
164 options: Options,
166
167 is_validator: IsValidator,
169
170 finality_timeout_threshold: usize,
176
177 included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
185
186 submit_stats: DurationSlidingStats,
188
189 submit_and_watch_stats: DurationSlidingStats,
191}
192
193impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
194where
195 Block: BlockT,
196 ChainApi: graph::ChainApi<Block = Block> + 'static,
197 <Block as BlockT>::Hash: Unpin,
198{
199 fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
203 if let Some(block_number) =
204 self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
205 {
206 let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
207 let tree_route =
208 &TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
209 let view = self.build_and_plug_view(None, &at_best, &tree_route);
210 self.view_store.insert_new_view_sync(view.into(), &tree_route);
211 trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
212 };
213 self
214 }
215
216 pub fn new_test(
219 pool_api: Arc<ChainApi>,
220 best_block_hash: Block::Hash,
221 finalized_hash: Block::Hash,
222 finality_timeout_threshold: Option<usize>,
223 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
224 Self::new_test_with_limits(
225 pool_api,
226 best_block_hash,
227 finalized_hash,
228 Options::default().ready,
229 Options::default().future,
230 usize::MAX,
231 finality_timeout_threshold,
232 )
233 }
234
235 pub fn new_test_with_limits(
238 pool_api: Arc<ChainApi>,
239 best_block_hash: Block::Hash,
240 finalized_hash: Block::Hash,
241 ready_limits: crate::PoolLimit,
242 future_limits: crate::PoolLimit,
243 mempool_max_transactions_count: usize,
244 finality_timeout_threshold: Option<usize>,
245 ) -> (Self, [ForkAwareTxPoolTask; 2]) {
246 let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
247 let listener = Arc::new(listener);
248
249 let (import_notification_sink, import_notification_sink_task) =
250 MultiViewImportNotificationSink::new_with_worker();
251
252 let (mempool, mempool_task) = TxMemPool::new(
253 pool_api.clone(),
254 listener.clone(),
255 Default::default(),
256 mempool_max_transactions_count,
257 ready_limits.total_bytes + future_limits.total_bytes,
258 );
259 let mempool = Arc::from(mempool);
260
261 let (dropped_stream_controller, dropped_stream) =
262 MultiViewDroppedWatcherController::<ChainApi>::new();
263
264 let view_store = Arc::new(ViewStore::new(
265 pool_api.clone(),
266 listener,
267 dropped_stream_controller,
268 import_notification_sink.clone(),
269 ));
270
271 let dropped_monitor_task = Self::dropped_monitor_task(
272 dropped_stream,
273 mempool.clone(),
274 view_store.clone(),
275 import_notification_sink.clone(),
276 );
277
278 let combined_tasks = async move {
279 tokio::select! {
280 _ = listener_task => {},
281 _ = import_notification_sink_task => {},
282 _ = dropped_monitor_task => {}
283 }
284 }
285 .boxed();
286
287 let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
288
289 (
290 Self {
291 mempool,
292 api: pool_api,
293 view_store,
294 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
295 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
296 best_block_hash,
297 finalized_hash,
298 ))),
299 revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
300 import_notification_sink,
301 options,
302 is_validator: false.into(),
303 metrics: Default::default(),
304 events_metrics_collector: EventsMetricsCollector::default(),
305 finality_timeout_threshold: finality_timeout_threshold
306 .unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
307 included_transactions: Default::default(),
308 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
309 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
310 STAT_SLIDING_WINDOW,
311 )),
312 }
313 .inject_initial_view(best_block_hash),
314 [combined_tasks, mempool_task],
315 )
316 }
317
318 async fn dropped_monitor_task(
326 mut dropped_stream: StreamOfDropped<ChainApi>,
327 mempool: Arc<TxMemPool<ChainApi, Block>>,
328 view_store: Arc<ViewStore<ChainApi, Block>>,
329 import_notification_sink: MultiViewImportNotificationSink<
330 Block::Hash,
331 ExtrinsicHash<ChainApi>,
332 >,
333 ) {
334 let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
335 loop {
336 let Some(dropped) = dropped_stream.next().await else {
337 debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
338 break;
339 };
340 let start = Instant::now();
341 let tx_hash = dropped.tx_hash;
342 trace!(
343 target: LOG_TARGET,
344 ?tx_hash,
345 reason = ?dropped.reason,
346 "fatp::dropped notification, removing"
347 );
348 match dropped.reason {
349 DroppedReason::Usurped(new_tx_hash) => {
350 if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
351 view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
352 } else {
353 trace!(
354 target: LOG_TARGET,
355 tx_hash = ?new_tx_hash,
356 "error: dropped_monitor_task: no entry in mempool for new transaction"
357 );
358 };
359 },
360 DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
361 view_store.remove_transaction_subtree(tx_hash, |_, _| {});
362 },
363 };
364
365 mempool.remove_transactions(&[tx_hash]).await;
366 import_notification_sink.clean_notified_items(&[tx_hash]);
367 view_store.listener.transaction_dropped(dropped);
368 insert_and_log_throttled!(
369 Level::DEBUG,
370 target:LOG_TARGET_STAT,
371 prefix:"dropped_stats",
372 dropped_stats,
373 start.elapsed().into()
374 );
375 }
376 }
377
378 pub fn new_with_background_worker(
383 options: Options,
384 is_validator: IsValidator,
385 pool_api: Arc<ChainApi>,
386 prometheus: Option<&PrometheusRegistry>,
387 spawner: impl SpawnEssentialNamed,
388 best_block_hash: Block::Hash,
389 finalized_hash: Block::Hash,
390 ) -> Self {
391 let metrics = PrometheusMetrics::new(prometheus);
392 let (events_metrics_collector, event_metrics_task) =
393 EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
394
395 let (listener, listener_task) =
396 MultiViewListener::new_with_worker(events_metrics_collector.clone());
397 let listener = Arc::new(listener);
398
399 let (revalidation_queue, revalidation_task) =
400 revalidation_worker::RevalidationQueue::new_with_worker();
401
402 let (import_notification_sink, import_notification_sink_task) =
403 MultiViewImportNotificationSink::new_with_worker();
404
405 let (mempool, blocking_mempool_task) = TxMemPool::new(
406 pool_api.clone(),
407 listener.clone(),
408 metrics.clone(),
409 options.total_count(),
410 options.ready.total_bytes + options.future.total_bytes,
411 );
412 let mempool = Arc::from(mempool);
413
414 let (dropped_stream_controller, dropped_stream) =
415 MultiViewDroppedWatcherController::<ChainApi>::new();
416
417 let view_store = Arc::new(ViewStore::new(
418 pool_api.clone(),
419 listener,
420 dropped_stream_controller,
421 import_notification_sink.clone(),
422 ));
423
424 let dropped_monitor_task = Self::dropped_monitor_task(
425 dropped_stream,
426 mempool.clone(),
427 view_store.clone(),
428 import_notification_sink.clone(),
429 );
430
431 let combined_tasks = async move {
432 tokio::select! {
433 _ = listener_task => {}
434 _ = revalidation_task => {},
435 _ = import_notification_sink_task => {},
436 _ = dropped_monitor_task => {}
437 _ = event_metrics_task => {},
438 }
439 }
440 .boxed();
441 spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
442 spawner.spawn_essential_blocking(
443 "txpool-background",
444 Some("transaction-pool"),
445 blocking_mempool_task,
446 );
447
448 Self {
449 mempool,
450 api: pool_api,
451 view_store,
452 ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
453 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
454 best_block_hash,
455 finalized_hash,
456 ))),
457 revalidation_queue: Arc::from(revalidation_queue),
458 import_notification_sink,
459 options,
460 metrics,
461 events_metrics_collector,
462 is_validator,
463 finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
464 included_transactions: Default::default(),
465 submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
466 submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
467 STAT_SLIDING_WINDOW,
468 )),
469 }
470 .inject_initial_view(best_block_hash)
471 }
472
473 pub fn api(&self) -> &ChainApi {
475 &self.api
476 }
477
478 pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
480 self.view_store.status()
481 }
482
483 pub fn active_views_count(&self) -> usize {
485 self.view_store.active_views.read().len()
486 }
487
488 pub fn inactive_views_count(&self) -> usize {
490 self.view_store.inactive_views.read().len()
491 }
492
493 fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
498 self.view_store
499 .active_views
500 .read()
501 .iter()
502 .map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
503 .collect()
504 }
505
506 pub fn has_view(&self, hash: &Block::Hash) -> bool {
508 self.view_store.active_views.read().contains_key(hash)
509 }
510
511 pub async fn mempool_len(&self) -> (usize, usize) {
515 self.mempool.unwatched_and_watched_count().await
516 }
517
518 pub fn futures_at(
522 &self,
523 at: Block::Hash,
524 ) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
525 self.view_store.futures_at(at)
526 }
527
528 pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
541 let start = Instant::now();
542 let api = self.api.clone();
543 debug!(
544 target: LOG_TARGET,
545 ?at,
546 "fatp::ready_at_light"
547 );
548
549 let at_number = self.api.resolve_block_number(at).ok();
550 let finalized_number = self
551 .api
552 .resolve_block_number(self.enactment_state.lock().recent_finalized_block())
553 .ok();
554
555 if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
558 let at_hn = HashAndNumber { hash: at, number: at_number };
559 finalized_number.and_then(|finalized_number| {
560 self.view_store
561 .find_view_descendent_up_to_number(&at_hn, finalized_number)
562 .map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
563 })
564 }) {
565 let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
566 let mut all_extrinsics = vec![];
567 for h in enacted_blocks {
568 let extrinsics = api
569 .block_body(h)
570 .await
571 .unwrap_or_else(|error| {
572 warn!(
573 target: LOG_TARGET,
574 %error,
575 "Compute ready light transactions: error request"
576 );
577 None
578 })
579 .unwrap_or_default()
580 .into_iter()
581 .map(|t| api.hash_and_length(&t).0);
582 all_extrinsics.extend(extrinsics);
583 }
584
585 let before_count = tmp_view.pool.validated_pool().status().ready;
586 let tags = tmp_view
587 .pool
588 .validated_pool()
589 .extrinsics_tags(&all_extrinsics)
590 .into_iter()
591 .flatten()
592 .flatten()
593 .collect::<Vec<_>>();
594 let _ = tmp_view.pool.validated_pool().prune_tags(tags);
595
596 let after_count = tmp_view.pool.validated_pool().status().ready;
597 debug!(
598 target: LOG_TARGET,
599 ?at,
600 best_view_hash = ?view.at.hash,
601 before_count,
602 to_be_removed = all_extrinsics.len(),
603 after_count,
604 duration = ?start.elapsed(),
605 "fatp::ready_at_light -> light"
606 );
607 Box::new(tmp_view.pool.validated_pool().ready())
608 } else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
609 debug!(
614 target: LOG_TARGET,
615 ?at,
616 duration = ?start.elapsed(),
617 "fatp::ready_at_light -> most_recent_view"
618 );
619 Box::new(most_recent_view.pool.validated_pool().ready())
620 } else {
621 let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
622 debug!(
623 target: LOG_TARGET,
624 ?at,
625 duration = ?start.elapsed(),
626 "fatp::ready_at_light -> empty"
627 );
628 empty
629 }
630 }
631
632 async fn ready_at_with_timeout_internal(
643 &self,
644 at: Block::Hash,
645 timeout: std::time::Duration,
646 ) -> ReadyIteratorFor<ChainApi> {
647 debug!(
648 target: LOG_TARGET,
649 ?at,
650 ?timeout,
651 "fatp::ready_at_with_timeout"
652 );
653 let timeout = futures_timer::Delay::new(timeout);
654 let (view_already_exists, ready_at) = self.ready_at_internal(at);
655
656 if view_already_exists {
657 return ready_at.await;
658 }
659
660 let maybe_ready = async move {
661 select! {
662 ready = ready_at => Some(ready),
663 _ = timeout => {
664 debug!(
665 target: LOG_TARGET,
666 ?at,
667 "Timeout fired waiting for transaction pool at block. Proceeding with production."
668 );
669 None
670 }
671 }
672 };
673
674 let fall_back_ready = self.ready_at_light(at);
675 let (maybe_ready, fall_back_ready) =
676 futures::future::join(maybe_ready, fall_back_ready).await;
677 maybe_ready.unwrap_or(fall_back_ready)
678 }
679
680 fn ready_at_internal(
681 &self,
682 at: Block::Hash,
683 ) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
684 let mut ready_poll = self.ready_poll.lock();
685
686 if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
687 debug!(
688 target: LOG_TARGET,
689 ?at,
690 ?inactive,
691 "fatp::ready_at_internal"
692 );
693 let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
694 return (true, async move { iterator }.boxed());
695 }
696
697 let pending = ready_poll
698 .add(at)
699 .map(|received| {
700 received.unwrap_or_else(|error| {
701 warn!(
702 target: LOG_TARGET,
703 %error,
704 "Error receiving ready-set iterator"
705 );
706 Box::new(std::iter::empty())
707 })
708 })
709 .boxed();
710 debug!(
711 target: LOG_TARGET,
712 ?at,
713 pending_keys = ?ready_poll.pollers.keys(),
714 "fatp::ready_at_internal"
715 );
716 (false, pending)
717 }
718
719 async fn submit_and_watch_inner(
721 &self,
722 at: Block::Hash,
723 source: TransactionSource,
724 xt: TransactionFor<Self>,
725 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
726 let xt = Arc::from(xt);
727
728 let at_number = self
729 .api
730 .block_id_to_number(&BlockId::Hash(at))
731 .ok()
732 .flatten()
733 .unwrap_or_default()
734 .into()
735 .as_u64();
736
737 let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
738 Ok(result) => result,
739 Err(TxPoolApiError::ImmediatelyDropped) => {
740 self.attempt_transaction_replacement(source, at_number, true, xt.clone())
741 .await?
742 },
743 Err(e) => return Err(e.into()),
744 };
745
746 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
747 self.events_metrics_collector.report_submitted(&insertion);
748
749 match self.view_store.submit_and_watch(at, insertion.source, xt).await {
750 Err(e) => {
751 self.mempool.remove_transactions(&[insertion.hash]).await;
752 Err(e.into())
753 },
754 Ok(mut outcome) => {
755 self.mempool
756 .update_transaction_priority(outcome.hash(), outcome.priority())
757 .await;
758 Ok(outcome.expect_watcher())
759 },
760 }
761 }
762
763 async fn submit_at_inner(
765 &self,
766 at: Block::Hash,
767 source: TransactionSource,
768 xts: Vec<TransactionFor<Self>>,
769 ) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
770 let at_number = self
771 .api
772 .block_id_to_number(&BlockId::Hash(at))
773 .ok()
774 .flatten()
775 .unwrap_or_default()
776 .into()
777 .as_u64();
778 let view_store = self.view_store.clone();
779 let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
780 let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
781
782 if view_store.is_empty() {
783 return Ok(mempool_results
784 .into_iter()
785 .map(|r| r.map(|r| r.hash).map_err(Into::into))
786 .collect::<Vec<_>>());
787 }
788
789 let retries = mempool_results
791 .into_iter()
792 .zip(xts.clone())
793 .map(|(result, xt)| async move {
794 match result {
795 Err(TxPoolApiError::ImmediatelyDropped) => {
796 self.attempt_transaction_replacement(source, at_number, false, xt).await
797 },
798 _ => result,
799 }
800 })
801 .collect::<Vec<_>>();
802
803 let mempool_results = futures::future::join_all(retries).await;
804
805 let to_be_submitted = mempool_results
807 .iter()
808 .zip(xts)
809 .filter_map(|(result, xt)| {
810 result.as_ref().ok().map(|insertion| {
811 self.events_metrics_collector.report_submitted(&insertion);
812 (insertion.source.clone(), xt)
813 })
814 })
815 .collect::<Vec<_>>();
816
817 self.metrics
818 .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
819
820 let mempool = self.mempool.clone();
823 let results_map = view_store.submit(to_be_submitted.into_iter()).await;
824 let mut submission_results = reduce_multiview_result(results_map).into_iter();
825
826 const RESULTS_ASSUMPTION : &str =
843 "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
844 let merged_results = mempool_results.into_iter().map(|result| {
845 result.map_err(Into::into).and_then(|insertion| {
846 Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
847 })
848 });
849
850 let mut final_results = vec![];
851 for r in merged_results {
852 match r {
853 Ok((hash, submission_result)) => match submission_result {
854 Ok(r) => {
855 mempool.update_transaction_priority(r.hash(), r.priority()).await;
856 final_results.push(Ok(r.hash()));
857 },
858 Err(e) => {
859 mempool.remove_transactions(&[hash]).await;
860 final_results.push(Err(e));
861 },
862 },
863 Err(e) => final_results.push(Err(e)),
864 }
865 }
866
867 Ok(final_results)
868 }
869
870 pub fn import_notification_sink_len(&self) -> usize {
874 self.import_notification_sink.notified_items_len()
875 }
876}
877
878fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
905 let mut values = input.values();
906 let Some(first) = values.next() else {
907 return Default::default();
908 };
909 let length = first.len();
910 debug_assert!(values.all(|x| length == x.len()));
911
912 input
913 .into_values()
914 .reduce(|mut agg_results, results| {
915 agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
916 if agg_r.is_err() {
917 *agg_r = r;
918 }
919 });
920 agg_results
921 })
922 .unwrap_or_default()
923}
924
925#[async_trait]
926impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
927where
928 Block: BlockT,
929 ChainApi: 'static + graph::ChainApi<Block = Block>,
930 <Block as BlockT>::Hash: Unpin,
931{
932 type Block = ChainApi::Block;
933 type Hash = ExtrinsicHash<ChainApi>;
934 type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
935 type Error = ChainApi::Error;
936
937 async fn submit_at(
944 &self,
945 at: <Self::Block as BlockT>::Hash,
946 source: TransactionSource,
947 xts: Vec<TransactionFor<Self>>,
948 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
949 let start = Instant::now();
950 trace!(
951 target: LOG_TARGET,
952 count = xts.len(),
953 active_views_count = self.active_views_count(),
954 "fatp::submit_at"
955 );
956 log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
957 let result = self.submit_at_inner(at, source, xts).await;
958 insert_and_log_throttled!(
959 Level::DEBUG,
960 target:LOG_TARGET_STAT,
961 prefix:"submit_stats",
962 self.submit_stats,
963 start.elapsed().into()
964 );
965 result
966 }
967
968 async fn submit_one(
972 &self,
973 _at: <Self::Block as BlockT>::Hash,
974 source: TransactionSource,
975 xt: TransactionFor<Self>,
976 ) -> Result<TxHash<Self>, Self::Error> {
977 trace!(
978 target: LOG_TARGET,
979 tx_hash = ?self.tx_hash(&xt),
980 active_views_count = self.active_views_count(),
981 "fatp::submit_one"
982 );
983 match self.submit_at(_at, source, vec![xt]).await {
984 Ok(mut v) => {
985 v.pop().expect("There is exactly one element in result of submit_at. qed.")
986 },
987 Err(e) => Err(e),
988 }
989 }
990
991 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
996 async fn submit_and_watch(
997 &self,
998 at: <Self::Block as BlockT>::Hash,
999 source: TransactionSource,
1000 xt: TransactionFor<Self>,
1001 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1002 let start = Instant::now();
1003 trace!(
1004 target: LOG_TARGET,
1005 tx_hash = ?self.tx_hash(&xt),
1006 views = self.active_views_count(),
1007 "fatp::submit_and_watch"
1008 );
1009 let result = self.submit_and_watch_inner(at, source, xt).await;
1010 insert_and_log_throttled!(
1011 Level::DEBUG,
1012 target:LOG_TARGET_STAT,
1013 prefix:"submit_and_watch_stats",
1014 self.submit_and_watch_stats,
1015 start.elapsed().into()
1016 );
1017 result
1018 }
1019
1020 async fn report_invalid(
1029 &self,
1030 at: Option<<Self::Block as BlockT>::Hash>,
1031 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1032 ) -> Vec<Arc<Self::InPoolTransaction>> {
1033 debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1034 log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1035 self.metrics
1036 .report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1037
1038 let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1039
1040 let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1041 self.mempool.remove_transactions(&removed_hashes).await;
1042 self.import_notification_sink.clean_notified_items(&removed_hashes);
1043
1044 self.metrics
1045 .report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1046
1047 removed
1048 }
1049
1050 fn status(&self) -> PoolStatus {
1058 self.view_store
1059 .most_recent_view
1060 .read()
1061 .as_ref()
1062 .map(|v| v.status())
1063 .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1064 }
1065
1066 fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1071 self.import_notification_sink.event_stream()
1072 }
1073
1074 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1076 self.api().hash_and_length(xt).0
1077 }
1078
1079 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1081 self.view_store.listener.transactions_broadcasted(propagations);
1082 }
1083
1084 fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1090 let most_recent_view_hash =
1091 self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1092 let result = most_recent_view_hash
1093 .and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1094 trace!(
1095 target: LOG_TARGET,
1096 ?tx_hash,
1097 is_ready = result.is_some(),
1098 most_recent_view = ?most_recent_view_hash,
1099 "ready_transaction"
1100 );
1101 result
1102 }
1103
1104 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1106 let (_, result) = self.ready_at_internal(at);
1107 result.await
1108 }
1109
1110 fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1115 self.view_store.ready()
1116 }
1117
1118 fn futures(&self) -> Vec<Self::InPoolTransaction> {
1123 self.view_store.futures()
1124 }
1125
1126 async fn ready_at_with_timeout(
1131 &self,
1132 at: <Self::Block as BlockT>::Hash,
1133 timeout: std::time::Duration,
1134 ) -> ReadyIteratorFor<ChainApi> {
1135 self.ready_at_with_timeout_internal(at, timeout).await
1136 }
1137}
1138
1139impl<ChainApi, Block> soil_client::transaction_pool::LocalTransactionPool
1140 for ForkAwareTxPool<ChainApi, Block>
1141where
1142 Block: BlockT,
1143 ChainApi: 'static + graph::ChainApi<Block = Block>,
1144 <Block as BlockT>::Hash: Unpin,
1145{
1146 type Block = Block;
1147 type Hash = ExtrinsicHash<ChainApi>;
1148 type Error = ChainApi::Error;
1149
1150 fn submit_local(
1151 &self,
1152 at: Block::Hash,
1153 xt: soil_client::transaction_pool::LocalTransactionFor<Self>,
1154 ) -> Result<Self::Hash, Self::Error> {
1155 trace!(
1156 target: LOG_TARGET,
1157 active_views_count = self.active_views_count(),
1158 "fatp::submit_local"
1159 );
1160 let xt = Arc::from(xt);
1161 let at_number = self
1162 .api
1163 .block_id_to_number(&BlockId::Hash(at))
1164 .ok()
1165 .flatten()
1166 .unwrap_or_default()
1167 .into()
1168 .as_u64();
1169
1170 let result = self
1172 .mempool
1173 .clone()
1174 .extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1175 .remove(0);
1176
1177 let insertion = match result {
1178 Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1179 TransactionSource::Local,
1180 false,
1181 xt.clone(),
1182 ),
1183 _ => result,
1184 }?;
1185
1186 self.view_store
1187 .submit_local(xt)
1188 .inspect_err(|_| {
1189 self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1190 })
1191 .map(|outcome| {
1192 self.mempool
1193 .clone()
1194 .update_transaction_priority_sync(outcome.hash(), outcome.priority());
1195 outcome.hash()
1196 })
1197 .or_else(|_| Ok(insertion.hash))
1198 }
1199}
1200
1201impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1202where
1203 Block: BlockT,
1204 ChainApi: graph::ChainApi<Block = Block> + 'static,
1205 <Block as BlockT>::Hash: Unpin,
1206{
1207 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1215 async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1216 let hash_and_number = match tree_route.last() {
1217 Some(hash_and_number) => hash_and_number,
1218 None => {
1219 warn!(
1220 target: LOG_TARGET,
1221 ?tree_route,
1222 "Skipping ChainEvent - no last block in tree route"
1223 );
1224 return;
1225 },
1226 };
1227
1228 if self.has_view(&hash_and_number.hash) {
1229 debug!(
1230 target: LOG_TARGET,
1231 ?hash_and_number,
1232 "view already exists for block"
1233 );
1234 return;
1235 }
1236
1237 let best_view = self.view_store.find_best_view(tree_route);
1238 let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1239
1240 if let Some(view) = new_view {
1241 {
1242 let view = view.clone();
1243 self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1244 Box::from(view.pool.validated_pool().ready())
1245 });
1246 }
1247
1248 View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1249 }
1250
1251 self.finality_stall_cleanup(hash_and_number).await;
1252 }
1253
1254 async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1264 let (oldest_block_number, finality_timedout_blocks) = {
1265 let mut included_transactions = self.included_transactions.lock();
1266
1267 let Some(oldest_block_number) =
1268 included_transactions.first_key_value().map(|(k, _)| k.number)
1269 else {
1270 return;
1271 };
1272
1273 if at.number.saturating_sub(oldest_block_number).into()
1274 <= self.finality_timeout_threshold.into()
1275 {
1276 return;
1277 }
1278
1279 let mut finality_timedout_blocks =
1280 indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1281
1282 included_transactions.retain(
1283 |HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1284 let diff = at.number.saturating_sub(*view_number);
1285 if diff.into() > self.finality_timeout_threshold.into() {
1286 finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1287 false
1288 } else {
1289 true
1290 }
1291 },
1292 );
1293
1294 (oldest_block_number, finality_timedout_blocks)
1295 };
1296
1297 if !finality_timedout_blocks.is_empty() {
1298 self.ready_poll.lock().remove_cancelled();
1299 self.view_store.listener.remove_stale_controllers();
1300 }
1301
1302 let finality_timedout_blocks_len = finality_timedout_blocks.len();
1303
1304 for (block_hash, tx_hashes) in finality_timedout_blocks {
1305 self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1306
1307 self.mempool.remove_transactions(&tx_hashes).await;
1308 self.import_notification_sink.clean_notified_items(&tx_hashes);
1309 self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1310 }
1311
1312 self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1313
1314 debug!(
1315 target: LOG_TARGET,
1316 ?at,
1317 included_transactions_len = ?self.included_transactions.lock().len(),
1318 finality_timedout_blocks_len,
1319 ?oldest_block_number,
1320 "finality_stall_cleanup"
1321 );
1322 }
1323
1324 fn build_and_plug_view(
1333 &self,
1334 origin_view: Option<Arc<View<ChainApi>>>,
1335 at: &HashAndNumber<Block>,
1336 tree_route: &TreeRoute<Block>,
1337 ) -> View<ChainApi> {
1338 let enter = Instant::now();
1339 let (view, view_dropped_stream, view_aggregated_stream) =
1340 if let Some(origin_view) = origin_view {
1341 let (mut view, view_dropped_stream, view_aggragated_stream) =
1342 View::new_from_other(&origin_view, at);
1343 if !tree_route.retracted().is_empty() {
1344 view.pool.clear_recently_pruned();
1345 }
1346 (view, view_dropped_stream, view_aggragated_stream)
1347 } else {
1348 debug!(
1349 target: LOG_TARGET,
1350 ?at,
1351 "creating non-cloned view"
1352 );
1353 View::new(
1354 self.api.clone(),
1355 at.clone(),
1356 self.options.clone(),
1357 self.metrics.clone(),
1358 self.is_validator.clone(),
1359 )
1360 };
1361 debug!(
1362 target: LOG_TARGET,
1363 ?at,
1364 duration = ?enter.elapsed(),
1365 "build_new_view::clone_view"
1366 );
1367
1368 self.import_notification_sink.add_view(
1371 view.at.hash,
1372 view.pool.validated_pool().import_notification_stream().boxed(),
1373 );
1374
1375 self.view_store
1376 .dropped_stream_controller
1377 .add_view(view.at.hash, view_dropped_stream.boxed());
1378
1379 self.view_store
1380 .listener
1381 .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1382
1383 view
1384 }
1385
1386 async fn build_and_update_view(
1394 &self,
1395 origin_view: Option<Arc<View<ChainApi>>>,
1396 at: &HashAndNumber<Block>,
1397 tree_route: &TreeRoute<Block>,
1398 ) -> Option<Arc<View<ChainApi>>> {
1399 let start = Instant::now();
1400 debug!(
1401 target: LOG_TARGET,
1402 ?at,
1403 origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1404 ?tree_route,
1405 "build_new_view"
1406 );
1407
1408 let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1409
1410 view.pool.validated_pool().retrigger_notifications();
1413 debug!(
1414 target: LOG_TARGET,
1415 ?at,
1416 duration = ?start.elapsed(),
1417 "register_listeners"
1418 );
1419
1420 let start = Instant::now();
1423 self.update_view_with_fork(&view, tree_route, at.clone()).await;
1424 debug!(
1425 target: LOG_TARGET,
1426 ?at,
1427 duration = ?start.elapsed(),
1428 "update_view_with_fork"
1429 );
1430
1431 let start = Instant::now();
1433 self.update_view_with_mempool(&mut view).await;
1434 debug!(
1435 target: LOG_TARGET,
1436 ?at,
1437 duration= ?start.elapsed(),
1438 "update_view_with_mempool"
1439 );
1440 let view = Arc::from(view);
1441 self.view_store.insert_new_view(view.clone(), tree_route).await;
1442
1443 debug!(
1444 target: LOG_TARGET,
1445 duration = ?start.elapsed(),
1446 ?at,
1447 "build_new_view"
1448 );
1449 Some(view)
1450 }
1451
1452 async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1457 if let Some(txs) = self.included_transactions.lock().get(at) {
1458 return txs.clone();
1459 };
1460
1461 debug!(
1462 target: LOG_TARGET,
1463 ?at,
1464 "fetch_block_transactions from api"
1465 );
1466
1467 self.api
1468 .block_body(at.hash)
1469 .await
1470 .unwrap_or_else(|error| {
1471 warn!(
1472 target: LOG_TARGET,
1473 %error,
1474 "fetch_block_transactions: error request"
1475 );
1476 None
1477 })
1478 .unwrap_or_default()
1479 .into_iter()
1480 .map(|t| self.hash_of(&t))
1481 .collect::<Vec<_>>()
1482 }
1483
1484 async fn txs_included_since_finalized(
1489 &self,
1490 at: &HashAndNumber<Block>,
1491 ) -> HashSet<TxHash<Self>> {
1492 let start = Instant::now();
1493 let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1494
1495 let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1496 return Default::default();
1497 };
1498
1499 let mut all_txs = HashSet::new();
1500
1501 for block in tree_route.enacted().iter() {
1502 if at.number.saturating_sub(block.number).into()
1506 <= self.finality_timeout_threshold.into()
1507 {
1508 all_txs.extend(self.fetch_block_transactions(block).await);
1509 }
1510 }
1511
1512 debug!(
1513 target: LOG_TARGET,
1514 ?at,
1515 ?recent_finalized_block,
1516 extrinsics_count = all_txs.len(),
1517 duration = ?start.elapsed(),
1518 "fatp::txs_included_since_finalized"
1519 );
1520 all_txs
1521 }
1522
1523 async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1532 let xts_count = self.mempool.unwatched_and_watched_count().await;
1533 debug!(
1534 target: LOG_TARGET,
1535 view_at = ?view.at,
1536 ?xts_count,
1537 active_views_count = self.active_views_count(),
1538 "update_view_with_mempool"
1539 );
1540 let included_xts = self.txs_included_since_finalized(&view.at).await;
1541
1542 let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1543 .mempool
1544 .with_transactions(|iter| {
1545 iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1546 .map(|(k, v)| (*k, v.clone()))
1547 .take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1549 .collect::<HashMap<_, _>>()
1550 })
1551 .await
1552 .into_iter()
1553 .map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1554 .unzip();
1555
1556 let results = view
1557 .submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1558 .await
1559 .into_iter()
1560 .zip(hashes)
1561 .map(|(result, tx_hash)| async move {
1562 if let Ok(outcome) = result {
1563 Ok(self
1564 .mempool
1565 .update_transaction_priority(outcome.hash(), outcome.priority())
1566 .await)
1567 } else {
1568 Err(tx_hash)
1569 }
1570 })
1571 .collect::<Vec<_>>();
1572
1573 let results = futures::future::join_all(results).await;
1574
1575 let submitted_count = results.len();
1576
1577 debug!(
1578 target: LOG_TARGET,
1579 view_at_hash = ?view.at.hash,
1580 submitted_count,
1581 mempool_len = self.mempool.len(),
1582 "update_view_with_mempool"
1583 );
1584
1585 self.metrics
1586 .report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1587
1588 if self.view_store.is_empty() {
1591 for result in results {
1592 if let Err(tx_hash) = result {
1593 self.view_store.listener.transactions_invalidated(&[tx_hash]);
1594 self.mempool.remove_transactions(&[tx_hash]).await;
1595 }
1596 }
1597 }
1598 }
1599
1600 async fn collect_provides_tags_from_view_store(
1607 &self,
1608 tree_route: &TreeRoute<Block>,
1609 xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1610 ) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1611 let blocks_hashes = tree_route
1612 .retracted()
1613 .iter()
1614 .skip(1)
1616 .chain(
1618 std::iter::once(tree_route.common_block())
1619 .chain(tree_route.enacted().iter().rev().skip(1)),
1620 )
1621 .collect::<Vec<&HashAndNumber<Block>>>();
1622
1623 self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1624 }
1625
1626 pub async fn collect_extrinsics(
1628 &self,
1629 blocks: &[HashAndNumber<Block>],
1630 ) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1631 future::join_all(blocks.iter().map(|hn| async move {
1632 (
1633 hn.hash,
1634 self.api
1635 .block_body(hn.hash)
1636 .await
1637 .unwrap_or_else(|e| {
1638 warn!(target: LOG_TARGET, %e, ": block_body error request");
1639 None
1640 })
1641 .unwrap_or_default(),
1642 )
1643 }))
1644 .await
1645 .into_iter()
1646 .collect()
1647 }
1648
1649 async fn update_view_with_fork(
1654 &self,
1655 view: &View<ChainApi>,
1656 tree_route: &TreeRoute<Block>,
1657 hash_and_number: HashAndNumber<Block>,
1658 ) {
1659 debug!(
1660 target: LOG_TARGET,
1661 ?tree_route,
1662 at = ?view.at,
1663 "update_view_with_fork"
1664 );
1665 let api = self.api.clone();
1666
1667 let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1669
1670 let known_provides_tags = Arc::new(
1673 self.collect_provides_tags_from_view_store(
1674 tree_route,
1675 extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1676 )
1677 .await,
1678 );
1679
1680 debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1681
1682 let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1685 future::join_all(tree_route.enacted().iter().map(|hn| {
1686 let api = api.clone();
1687 let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1688 let known_provides_tags = known_provides_tags.clone();
1689 async move {
1690 (
1691 hn,
1692 crate::prune_known_txs_for_block(
1693 hn,
1694 &*api,
1695 &view.pool,
1696 Some(xts),
1697 Some(known_provides_tags),
1698 )
1699 .await,
1700 )
1701 }
1702 }))
1703 .await
1704 .into_iter()
1705 .for_each(|(key, enacted_log)| {
1706 pruned_log.extend(enacted_log.clone());
1707 self.included_transactions.lock().insert(key.clone(), enacted_log);
1708 });
1709
1710 let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1711 self.metrics
1712 .report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1713
1714 {
1716 let mut resubmit_transactions = Vec::new();
1717
1718 for retracted in tree_route.retracted() {
1719 let hash = retracted.hash;
1720
1721 let block_transactions = api
1722 .block_body(hash)
1723 .await
1724 .unwrap_or_else(|error| {
1725 warn!(
1726 target: LOG_TARGET,
1727 %error,
1728 "Failed to fetch block body"
1729 );
1730 None
1731 })
1732 .unwrap_or_default()
1733 .into_iter();
1734
1735 let mut resubmitted_to_report = 0;
1736
1737 let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1738 |(tx_hash, _)| {
1739 let contains = pruned_log.contains(&tx_hash);
1740
1741 resubmitted_to_report += 1;
1743
1744 if !contains {
1745 trace!(
1746 target: LOG_TARGET,
1747 ?tx_hash,
1748 ?hash,
1749 "Resubmitting from retracted block"
1750 );
1751 }
1752 !contains
1753 },
1754 );
1755 let mut result = vec![];
1756 for (tx_hash, tx) in txs {
1757 result.push(
1758 self.mempool
1760 .get_by_hash(tx_hash)
1761 .await
1762 .map(|tx| (tx.source(), tx.tx()))
1763 .unwrap_or_else(|| {
1764 (TimedTransactionSource::new_external(true), Arc::from(tx))
1767 }),
1768 );
1769 }
1770 resubmit_transactions.extend(result);
1771
1772 self.metrics.report(|metrics| {
1773 metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1774 });
1775 }
1776
1777 let _ = view
1778 .pool
1779 .resubmit_at(
1780 &hash_and_number,
1781 resubmit_transactions,
1782 ValidateTransactionPriority::Maintained,
1783 )
1784 .await;
1785 }
1786 }
1787
1788 async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1794 let start = Instant::now();
1795 let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1796 debug!(
1797 target: LOG_TARGET,
1798 ?finalized_number,
1799 ?tree_route,
1800 active_views_count = self.active_views_count(),
1801 "handle_finalized"
1802 );
1803 let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1804
1805 self.mempool.purge_finalized_transactions(&finalized_xts).await;
1806 self.import_notification_sink.clean_notified_items(&finalized_xts);
1807
1808 self.metrics
1809 .report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1810
1811 if let Ok(Some(finalized_number)) = finalized_number {
1812 self.included_transactions
1813 .lock()
1814 .retain(|cached_block, _| finalized_number < cached_block.number);
1815 self.revalidation_queue
1816 .revalidate_mempool(
1817 self.mempool.clone(),
1818 self.view_store.clone(),
1819 HashAndNumber { hash: finalized_hash, number: finalized_number },
1820 )
1821 .await;
1822 } else {
1823 debug!(
1824 target: LOG_TARGET,
1825 ?finalized_number,
1826 "handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1827 );
1828 }
1829
1830 self.ready_poll.lock().remove_cancelled();
1831
1832 debug!(
1833 target: LOG_TARGET,
1834 active_views_count = self.active_views_count(),
1835 included_transactions_len = ?self.included_transactions.lock().len(),
1836 duration = ?start.elapsed(),
1837 "handle_finalized after"
1838 );
1839 }
1840
1841 fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1843 self.api.hash_and_length(xt).0
1844 }
1845
1846 #[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1856 async fn attempt_transaction_replacement(
1857 &self,
1858 source: TransactionSource,
1859 at_number: u64,
1860 watched: bool,
1861 xt: ExtrinsicFor<ChainApi>,
1862 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1863 let best_view = self
1864 .view_store
1865 .most_recent_view
1866 .read()
1867 .as_ref()
1868 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1869 .clone();
1870
1871 let (xt_hash, validated_tx) = best_view
1872 .pool
1873 .verify_one(
1874 best_view.at.hash,
1875 best_view.at.number,
1876 TimedTransactionSource::from_transaction_source(source, false),
1877 xt.clone(),
1878 crate::graph::CheckBannedBeforeVerify::Yes,
1879 ValidateTransactionPriority::Submitted,
1880 )
1881 .await;
1882
1883 let Some(priority) = validated_tx.priority() else {
1884 return Err(TxPoolApiError::ImmediatelyDropped);
1885 };
1886
1887 let insertion_info = self
1888 .mempool
1889 .try_insert_with_replacement(xt, priority, source, at_number, watched)
1890 .await?;
1891 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1892 }
1893
1894 fn attempt_transaction_replacement_sync(
1896 &self,
1897 source: TransactionSource,
1898 watched: bool,
1899 xt: ExtrinsicFor<ChainApi>,
1900 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1901 let HashAndNumber { number: at_number, hash: at_hash } = self
1902 .view_store
1903 .most_recent_view
1904 .read()
1905 .as_ref()
1906 .ok_or(TxPoolApiError::ImmediatelyDropped)?
1907 .at;
1908
1909 let ValidTransaction { priority, .. } = self
1910 .api
1911 .validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1912 .map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1913 .map_err(|e| match e {
1914 TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1915 TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1916 })?;
1917 let xt_hash = self.hash_of(&xt);
1918
1919 let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1920 xt,
1921 priority,
1922 source,
1923 at_number.into().as_u64(),
1924 watched,
1925 )?;
1926 self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1927 }
1928
1929 fn post_attempt_transaction_replacement(
1930 &self,
1931 tx_hash: ExtrinsicHash<ChainApi>,
1932 insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1933 ) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1934 for worst_hash in &insertion_info.removed {
1935 trace!(
1936 target: LOG_TARGET,
1937 tx_hash = ?worst_hash,
1938 new_tx_hash = ?tx_hash,
1939 "removed: replaced by"
1940 );
1941 self.view_store
1942 .listener
1943 .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1944
1945 self.view_store
1946 .remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1947 listener.limits_enforced(&removed_tx_hash);
1948 });
1949 }
1950
1951 return Ok(insertion_info);
1952 }
1953}
1954
1955#[async_trait]
1956impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1957where
1958 Block: BlockT,
1959 ChainApi: 'static + graph::ChainApi<Block = Block>,
1960 <Block as BlockT>::Hash: Unpin,
1961{
1962 async fn maintain(&self, event: ChainEvent<Self::Block>) {
1964 let start = Instant::now();
1965 debug!(
1966 target: LOG_TARGET,
1967 ?event,
1968 "processing event"
1969 );
1970
1971 self.view_store.finish_background_revalidations().await;
1972
1973 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1974
1975 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1976 match self.api.tree_route(from, to) {
1977 Ok(tree_route) => Ok(tree_route),
1978 Err(e) => {
1979 return Err(format!(
1980 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1981 ))
1982 },
1983 }
1984 };
1985 let block_id_to_number =
1986 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1987
1988 let result =
1989 self.enactment_state
1990 .lock()
1991 .update(&event, &compute_tree_route, &block_id_to_number);
1992
1993 match result {
1994 Err(error) => {
1995 debug!(
1996 target: LOG_TARGET,
1997 %error,
1998 "enactment_state::update error"
1999 );
2000 self.enactment_state.lock().force_update(&event);
2001 },
2002 Ok(EnactmentAction::Skip) => return,
2003 Ok(EnactmentAction::HandleFinalization) => {
2004 },
2012 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2013 self.handle_new_block(&tree_route).await;
2014 },
2015 };
2016
2017 match event {
2018 ChainEvent::NewBestBlock { .. } => {},
2019 ChainEvent::Finalized { hash, ref tree_route } => {
2020 self.handle_finalized(hash, tree_route).await;
2021
2022 debug!(
2023 target: LOG_TARGET,
2024 ?tree_route,
2025 ?prev_finalized_block,
2026 "on-finalized enacted"
2027 );
2028 },
2029 }
2030
2031 let duration = start.elapsed();
2032 let mempool_len = self.mempool_len().await;
2033 debug!(
2034 target: LOG_TARGET,
2035 txs = ?mempool_len,
2036 a = self.active_views_count(),
2037 i = self.inactive_views_count(),
2038 views = ?self.views_stats(),
2039 ?event,
2040 ?duration,
2041 "maintain"
2042 );
2043
2044 self.metrics.report(|metrics| {
2045 let (unwatched, watched) = mempool_len;
2046 let _ = (
2047 self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2048 self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2049 watched.try_into().map(|v| metrics.watched_txs.set(v)),
2050 unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2051 );
2052 metrics.maintain_duration.observe(duration.as_secs_f64());
2053 });
2054 }
2055}
2056
2057impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2058where
2059 Block: BlockT,
2060 Client: subsoil::api::ProvideRuntimeApi<Block>
2061 + soil_client::client_api::BlockBackend<Block>
2062 + soil_client::client_api::blockchain::HeaderBackend<Block>
2063 + subsoil::runtime::traits::BlockIdTo<Block>
2064 + soil_client::client_api::ExecutorProvider<Block>
2065 + soil_client::client_api::UsageProvider<Block>
2066 + soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>
2067 + Send
2068 + Sync
2069 + 'static,
2070 Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
2071 <Block as BlockT>::Hash: std::marker::Unpin,
2072{
2073 pub fn new_full(
2075 options: Options,
2076 is_validator: IsValidator,
2077 prometheus: Option<&PrometheusRegistry>,
2078 spawner: impl SpawnEssentialNamed,
2079 client: Arc<Client>,
2080 ) -> Self {
2081 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2082 let pool = Self::new_with_background_worker(
2083 options,
2084 is_validator,
2085 pool_api,
2086 prometheus,
2087 spawner,
2088 client.usage_info().chain.best_hash,
2089 client.usage_info().chain.finalized_hash,
2090 );
2091
2092 pool
2093 }
2094}
2095
2096#[cfg(test)]
2097mod reduce_multiview_result_tests {
2098 use super::*;
2099 use subsoil::core::H256;
2100 #[derive(Debug, PartialEq, Clone)]
2101 enum Error {
2102 Custom(u8),
2103 }
2104
2105 #[test]
2106 fn empty() {
2107 subsoil::tracing::try_init_simple();
2108 let input = HashMap::default();
2109 let r = reduce_multiview_result::<H256, H256, Error>(input);
2110 assert!(r.is_empty());
2111 }
2112
2113 #[test]
2114 fn errors_only() {
2115 subsoil::tracing::try_init_simple();
2116 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2117 (
2118 H256::repeat_byte(0x13),
2119 vec![
2120 Err(Error::Custom(10)),
2121 Err(Error::Custom(11)),
2122 Err(Error::Custom(12)),
2123 Err(Error::Custom(13)),
2124 ],
2125 ),
2126 (
2127 H256::repeat_byte(0x14),
2128 vec![
2129 Err(Error::Custom(20)),
2130 Err(Error::Custom(21)),
2131 Err(Error::Custom(22)),
2132 Err(Error::Custom(23)),
2133 ],
2134 ),
2135 (
2136 H256::repeat_byte(0x15),
2137 vec![
2138 Err(Error::Custom(30)),
2139 Err(Error::Custom(31)),
2140 Err(Error::Custom(32)),
2141 Err(Error::Custom(33)),
2142 ],
2143 ),
2144 ];
2145 let input = HashMap::from_iter(v.clone());
2146 let r = reduce_multiview_result(input);
2147
2148 assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2150 }
2151
2152 #[test]
2153 #[should_panic]
2154 #[cfg(debug_assertions)]
2155 fn invalid_lengths() {
2156 subsoil::tracing::try_init_simple();
2157 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2158 (H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2159 (H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2160 ];
2161 let input = HashMap::from_iter(v);
2162 let _ = reduce_multiview_result(input);
2163 }
2164
2165 #[test]
2166 fn only_hashes() {
2167 subsoil::tracing::try_init_simple();
2168
2169 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2170 (
2171 H256::repeat_byte(0x13),
2172 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2173 ),
2174 (
2175 H256::repeat_byte(0x14),
2176 vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2177 ),
2178 ];
2179 let input = HashMap::from_iter(v);
2180 let r = reduce_multiview_result(input);
2181
2182 assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2183 }
2184
2185 #[test]
2186 fn one_view() {
2187 subsoil::tracing::try_init_simple();
2188 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2189 H256::repeat_byte(0x13),
2190 vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2191 )];
2192 let input = HashMap::from_iter(v);
2193 let r = reduce_multiview_result(input);
2194
2195 assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2196 }
2197
2198 #[test]
2199 fn mix() {
2200 subsoil::tracing::try_init_simple();
2201 let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2202 (
2203 H256::repeat_byte(0x13),
2204 vec![
2205 Ok(H256::repeat_byte(0x10)),
2206 Err(Error::Custom(11)),
2207 Err(Error::Custom(12)),
2208 Err(Error::Custom(33)),
2209 ],
2210 ),
2211 (
2212 H256::repeat_byte(0x14),
2213 vec![
2214 Err(Error::Custom(20)),
2215 Ok(H256::repeat_byte(0x21)),
2216 Err(Error::Custom(22)),
2217 Err(Error::Custom(33)),
2218 ],
2219 ),
2220 (
2221 H256::repeat_byte(0x15),
2222 vec![
2223 Err(Error::Custom(30)),
2224 Err(Error::Custom(31)),
2225 Ok(H256::repeat_byte(0x32)),
2226 Err(Error::Custom(33)),
2227 ],
2228 ),
2229 ];
2230 let input = HashMap::from_iter(v);
2231 let r = reduce_multiview_result(input);
2232
2233 assert_eq!(
2234 r,
2235 vec![
2236 Ok(H256::repeat_byte(0x10)),
2237 Ok(H256::repeat_byte(0x21)),
2238 Ok(H256::repeat_byte(0x32)),
2239 Err(Error::Custom(33))
2240 ]
2241 );
2242 }
2243}