Skip to main content

soil_txpool/fork_aware_txpool/
fork_aware_txpool.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate fork-aware transaction pool implementation.
8
9use super::{
10	dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped},
11	import_notification_sink::MultiViewImportNotificationSink,
12	metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics},
13	multi_view_listener::MultiViewListener,
14	tx_mem_pool::{InsertionInfo, TxMemPool},
15	view::View,
16	view_store::ViewStore,
17};
18use crate::{
19	api::FullChainApi,
20	common::{
21		sliding_stat::DurationSlidingStats,
22		tracing_log_xt::{log_xt_debug, log_xt_trace},
23		STAT_SLIDING_WINDOW,
24	},
25	enactment_state::{EnactmentAction, EnactmentState},
26	fork_aware_txpool::{
27		dropped_watcher::{DroppedReason, DroppedTransaction},
28		revalidation_worker,
29	},
30	graph::{
31		self,
32		base_pool::{TimedTransactionSource, Transaction},
33		BlockHash, ExtrinsicFor, ExtrinsicHash, IsValidator, Options, RawExtrinsicFor,
34	},
35	insert_and_log_throttled, ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
36	LOG_TARGET_STAT,
37};
38use async_trait::async_trait;
39use futures::{
40	channel::oneshot,
41	future::{self},
42	prelude::*,
43	FutureExt,
44};
45use parking_lot::Mutex;
46use soil_prometheus::Registry as PrometheusRegistry;
47use soil_client::blockchain::{HashAndNumber, TreeRoute};
48use soil_client::transaction_pool::{
49	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
50	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
51	TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
52};
53use std::{
54	collections::{BTreeMap, HashMap, HashSet},
55	pin::Pin,
56	sync::Arc,
57	time::{Duration, Instant},
58};
59use subsoil::core::traits::SpawnEssentialNamed;
60use subsoil::runtime::{
61	generic::BlockId,
62	traits::{Block as BlockT, NumberFor},
63	transaction_validity::{TransactionTag as Tag, TransactionValidityError, ValidTransaction},
64	Saturating,
65};
66use tokio::select;
67use tracing::{debug, instrument, trace, warn, Level};
68
69/// The maximum block height difference before considering a view or transaction as timed-out
70/// due to a finality stall. When the difference exceeds this threshold, elements are treated
71/// as stale and are subject to cleanup.
72const FINALITY_TIMEOUT_THRESHOLD: usize = 128;
73
74/// The number of transactions that will be sent from the mempool to the newly created view during
75/// the maintain process.
76// todo [#8835]: better approach is needed - maybe time-budget approach?
77// note: yap parachain block size.
78const MEMPOOL_TO_VIEW_BATCH_SIZE: usize = 7_000;
79
80/// Fork aware transaction pool task, that needs to be polled.
81pub type ForkAwareTxPoolTask = Pin<Box<dyn Future<Output = ()> + Send>>;
82
83/// A structure that maintains a collection of pollers associated with specific block hashes
84/// (views).
85struct ReadyPoll<T, Block>
86where
87	Block: BlockT,
88{
89	pollers: HashMap<Block::Hash, Vec<oneshot::Sender<T>>>,
90}
91
92impl<T, Block> ReadyPoll<T, Block>
93where
94	Block: BlockT,
95{
96	/// Creates a new `ReadyPoll` instance with an empty collection of pollers.
97	fn new() -> Self {
98		Self { pollers: Default::default() }
99	}
100
101	/// Adds a new poller for a specific block hash and returns the `Receiver` end of the created
102	/// oneshot channel which will be used to deliver polled result.
103	fn add(&mut self, at: <Block as BlockT>::Hash) -> oneshot::Receiver<T> {
104		let (s, r) = oneshot::channel();
105		self.pollers.entry(at).or_default().push(s);
106		r
107	}
108
109	/// Triggers all pollers associated with a specific block by sending the polled result through
110	/// each oneshot channel.
111	///
112	/// `ready_iterator` is a closure that generates the result data to be sent to the pollers.
113	fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) {
114		debug!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger");
115		let Some(pollers) = self.pollers.remove(&at) else { return };
116		pollers.into_iter().for_each(|p| {
117			debug!(target: LOG_TARGET, "fatp::trigger trigger ready signal at block {}", at);
118			let _ = p.send(ready_iterator());
119		});
120	}
121
122	/// Removes pollers that have their oneshot channels cancelled.
123	fn remove_cancelled(&mut self) {
124		self.pollers.retain(|_, v| v.iter().any(|sender| !sender.is_canceled()));
125	}
126}
127
128/// The fork-aware transaction pool.
129///
130/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
131pub struct ForkAwareTxPool<ChainApi, Block>
132where
133	Block: BlockT,
134	ChainApi: graph::ChainApi<Block = Block> + 'static,
135{
136	/// The reference to the `ChainApi` provided by client/backend.
137	api: Arc<ChainApi>,
138
139	/// Intermediate buffer for the incoming transaction.
140	mempool: Arc<TxMemPool<ChainApi, Block>>,
141
142	/// The store for all the views.
143	view_store: Arc<ViewStore<ChainApi, Block>>,
144
145	/// Utility for managing pollers of `ready_at` future.
146	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<ChainApi>, Block>>>,
147
148	/// Prometheus's metrics endpoint.
149	metrics: PrometheusMetrics,
150
151	/// Collector of transaction statuses updates, reports transaction events metrics.
152	events_metrics_collector: EventsMetricsCollector<ChainApi>,
153
154	/// Util tracking best and finalized block.
155	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
156
157	/// The channel allowing to send revalidation jobs to the background thread.
158	revalidation_queue: Arc<revalidation_worker::RevalidationQueue<ChainApi, Block>>,
159
160	/// Util providing an aggregated stream of transactions that were imported to ready queue in
161	/// any view.
162	import_notification_sink: MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
163
164	/// Externally provided pool options.
165	options: Options,
166
167	/// Is node the validator.
168	is_validator: IsValidator,
169
170	/// Finality timeout threshold.
171	///
172	/// Sets the maximum permissible block height difference between the latest block
173	/// and the oldest transactions or views in the pool. Beyond this difference,
174	/// transactions/views are considered timed out and eligible for cleanup.
175	finality_timeout_threshold: usize,
176
177	/// Transactions included in blocks since the most recently finalized block (including this
178	/// block).
179	///
180	/// Holds a mapping of block hash and number to their corresponding transaction hashes.
181	///
182	/// Intended to be used in the finality stall cleanups and also as a cache for all in-block
183	/// transactions.
184	included_transactions: Mutex<BTreeMap<HashAndNumber<Block>, Vec<ExtrinsicHash<ChainApi>>>>,
185
186	/// Stats for submit call durations
187	submit_stats: DurationSlidingStats,
188
189	/// Stats for submit_and_watch call durations
190	submit_and_watch_stats: DurationSlidingStats,
191}
192
193impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
194where
195	Block: BlockT,
196	ChainApi: graph::ChainApi<Block = Block> + 'static,
197	<Block as BlockT>::Hash: Unpin,
198{
199	// Injects a view for the given block to self.
200	//
201	// Helper for the pool new methods.
202	fn inject_initial_view(self, initial_view_hash: Block::Hash) -> Self {
203		if let Some(block_number) =
204			self.api.block_id_to_number(&BlockId::Hash(initial_view_hash)).ok().flatten()
205		{
206			let at_best = HashAndNumber { number: block_number, hash: initial_view_hash };
207			let tree_route =
208				&TreeRoute::new(vec![at_best.clone()], 0).expect("tree route is correct; qed");
209			let view = self.build_and_plug_view(None, &at_best, &tree_route);
210			self.view_store.insert_new_view_sync(view.into(), &tree_route);
211			trace!(target: LOG_TARGET, ?block_number, ?initial_view_hash, "fatp::injected initial view");
212		};
213		self
214	}
215
216	/// Create new fork aware transaction pool with provided shared instance of `ChainApi` intended
217	/// for tests.
218	pub fn new_test(
219		pool_api: Arc<ChainApi>,
220		best_block_hash: Block::Hash,
221		finalized_hash: Block::Hash,
222		finality_timeout_threshold: Option<usize>,
223	) -> (Self, [ForkAwareTxPoolTask; 2]) {
224		Self::new_test_with_limits(
225			pool_api,
226			best_block_hash,
227			finalized_hash,
228			Options::default().ready,
229			Options::default().future,
230			usize::MAX,
231			finality_timeout_threshold,
232		)
233	}
234
235	/// Create new fork aware transaction pool with given limits and with provided shared instance
236	/// of `ChainApi` intended for tests.
237	pub fn new_test_with_limits(
238		pool_api: Arc<ChainApi>,
239		best_block_hash: Block::Hash,
240		finalized_hash: Block::Hash,
241		ready_limits: crate::PoolLimit,
242		future_limits: crate::PoolLimit,
243		mempool_max_transactions_count: usize,
244		finality_timeout_threshold: Option<usize>,
245	) -> (Self, [ForkAwareTxPoolTask; 2]) {
246		let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default());
247		let listener = Arc::new(listener);
248
249		let (import_notification_sink, import_notification_sink_task) =
250			MultiViewImportNotificationSink::new_with_worker();
251
252		let (mempool, mempool_task) = TxMemPool::new(
253			pool_api.clone(),
254			listener.clone(),
255			Default::default(),
256			mempool_max_transactions_count,
257			ready_limits.total_bytes + future_limits.total_bytes,
258		);
259		let mempool = Arc::from(mempool);
260
261		let (dropped_stream_controller, dropped_stream) =
262			MultiViewDroppedWatcherController::<ChainApi>::new();
263
264		let view_store = Arc::new(ViewStore::new(
265			pool_api.clone(),
266			listener,
267			dropped_stream_controller,
268			import_notification_sink.clone(),
269		));
270
271		let dropped_monitor_task = Self::dropped_monitor_task(
272			dropped_stream,
273			mempool.clone(),
274			view_store.clone(),
275			import_notification_sink.clone(),
276		);
277
278		let combined_tasks = async move {
279			tokio::select! {
280				_ = listener_task => {},
281				_ = import_notification_sink_task => {},
282				_ = dropped_monitor_task => {}
283			}
284		}
285		.boxed();
286
287		let options = Options { ready: ready_limits, future: future_limits, ..Default::default() };
288
289		(
290			Self {
291				mempool,
292				api: pool_api,
293				view_store,
294				ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
295				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
296					best_block_hash,
297					finalized_hash,
298				))),
299				revalidation_queue: Arc::from(revalidation_worker::RevalidationQueue::new()),
300				import_notification_sink,
301				options,
302				is_validator: false.into(),
303				metrics: Default::default(),
304				events_metrics_collector: EventsMetricsCollector::default(),
305				finality_timeout_threshold: finality_timeout_threshold
306					.unwrap_or(FINALITY_TIMEOUT_THRESHOLD),
307				included_transactions: Default::default(),
308				submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
309				submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
310					STAT_SLIDING_WINDOW,
311				)),
312			}
313			.inject_initial_view(best_block_hash),
314			[combined_tasks, mempool_task],
315		)
316	}
317
318	/// Monitors the stream of dropped transactions and removes them from the mempool and
319	/// view_store.
320	///
321	/// This asynchronous task continuously listens for dropped transaction notifications provided
322	/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
323	/// and `import_notification_sink` instances. For Usurped events, the transaction is also
324	/// removed from the view_store.
325	async fn dropped_monitor_task(
326		mut dropped_stream: StreamOfDropped<ChainApi>,
327		mempool: Arc<TxMemPool<ChainApi, Block>>,
328		view_store: Arc<ViewStore<ChainApi, Block>>,
329		import_notification_sink: MultiViewImportNotificationSink<
330			Block::Hash,
331			ExtrinsicHash<ChainApi>,
332		>,
333	) {
334		let dropped_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
335		loop {
336			let Some(dropped) = dropped_stream.next().await else {
337				debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
338				break;
339			};
340			let start = Instant::now();
341			let tx_hash = dropped.tx_hash;
342			trace!(
343				target: LOG_TARGET,
344				?tx_hash,
345				reason = ?dropped.reason,
346				"fatp::dropped notification, removing"
347			);
348			match dropped.reason {
349				DroppedReason::Usurped(new_tx_hash) => {
350					if let Some(new_tx) = mempool.get_by_hash(new_tx_hash).await {
351						view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
352					} else {
353						trace!(
354							target: LOG_TARGET,
355							tx_hash = ?new_tx_hash,
356							"error: dropped_monitor_task: no entry in mempool for new transaction"
357						);
358					};
359				},
360				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
361					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
362				},
363			};
364
365			mempool.remove_transactions(&[tx_hash]).await;
366			import_notification_sink.clean_notified_items(&[tx_hash]);
367			view_store.listener.transaction_dropped(dropped);
368			insert_and_log_throttled!(
369				Level::DEBUG,
370				target:LOG_TARGET_STAT,
371				prefix:"dropped_stats",
372				dropped_stats,
373				start.elapsed().into()
374			);
375		}
376	}
377
378	/// Creates new fork aware transaction pool with the background revalidation worker.
379	///
380	/// The txpool essential tasks (including a revalidation worker) are spawned using provided
381	/// spawner.
382	pub fn new_with_background_worker(
383		options: Options,
384		is_validator: IsValidator,
385		pool_api: Arc<ChainApi>,
386		prometheus: Option<&PrometheusRegistry>,
387		spawner: impl SpawnEssentialNamed,
388		best_block_hash: Block::Hash,
389		finalized_hash: Block::Hash,
390	) -> Self {
391		let metrics = PrometheusMetrics::new(prometheus);
392		let (events_metrics_collector, event_metrics_task) =
393			EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone());
394
395		let (listener, listener_task) =
396			MultiViewListener::new_with_worker(events_metrics_collector.clone());
397		let listener = Arc::new(listener);
398
399		let (revalidation_queue, revalidation_task) =
400			revalidation_worker::RevalidationQueue::new_with_worker();
401
402		let (import_notification_sink, import_notification_sink_task) =
403			MultiViewImportNotificationSink::new_with_worker();
404
405		let (mempool, blocking_mempool_task) = TxMemPool::new(
406			pool_api.clone(),
407			listener.clone(),
408			metrics.clone(),
409			options.total_count(),
410			options.ready.total_bytes + options.future.total_bytes,
411		);
412		let mempool = Arc::from(mempool);
413
414		let (dropped_stream_controller, dropped_stream) =
415			MultiViewDroppedWatcherController::<ChainApi>::new();
416
417		let view_store = Arc::new(ViewStore::new(
418			pool_api.clone(),
419			listener,
420			dropped_stream_controller,
421			import_notification_sink.clone(),
422		));
423
424		let dropped_monitor_task = Self::dropped_monitor_task(
425			dropped_stream,
426			mempool.clone(),
427			view_store.clone(),
428			import_notification_sink.clone(),
429		);
430
431		let combined_tasks = async move {
432			tokio::select! {
433				_ = listener_task => {}
434				_ = revalidation_task => {},
435				_ = import_notification_sink_task => {},
436				_ = dropped_monitor_task => {}
437				_ = event_metrics_task => {},
438			}
439		}
440		.boxed();
441		spawner.spawn_essential("txpool-background", Some("transaction-pool"), combined_tasks);
442		spawner.spawn_essential_blocking(
443			"txpool-background",
444			Some("transaction-pool"),
445			blocking_mempool_task,
446		);
447
448		Self {
449			mempool,
450			api: pool_api,
451			view_store,
452			ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
453			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
454				best_block_hash,
455				finalized_hash,
456			))),
457			revalidation_queue: Arc::from(revalidation_queue),
458			import_notification_sink,
459			options,
460			metrics,
461			events_metrics_collector,
462			is_validator,
463			finality_timeout_threshold: FINALITY_TIMEOUT_THRESHOLD,
464			included_transactions: Default::default(),
465			submit_stats: DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW)),
466			submit_and_watch_stats: DurationSlidingStats::new(Duration::from_secs(
467				STAT_SLIDING_WINDOW,
468			)),
469		}
470		.inject_initial_view(best_block_hash)
471	}
472
473	/// Get access to the underlying api
474	pub fn api(&self) -> &ChainApi {
475		&self.api
476	}
477
478	/// Provides a status for all views at the tips of the forks.
479	pub fn status_all(&self) -> HashMap<Block::Hash, PoolStatus> {
480		self.view_store.status()
481	}
482
483	/// Provides a number of views at the tips of the forks.
484	pub fn active_views_count(&self) -> usize {
485		self.view_store.active_views.read().len()
486	}
487
488	/// Provides a number of views at the tips of the forks.
489	pub fn inactive_views_count(&self) -> usize {
490		self.view_store.inactive_views.read().len()
491	}
492
493	/// Provides internal views statistics.
494	///
495	/// Provides block number, count of ready, count of future transactions for every view. It is
496	/// suitable for printing log information.
497	fn views_stats(&self) -> Vec<(NumberFor<Block>, usize, usize)> {
498		self.view_store
499			.active_views
500			.read()
501			.iter()
502			.map(|v| (v.1.at.number, v.1.status().ready, v.1.status().future))
503			.collect()
504	}
505
506	/// Checks if there is a view at the tip of the fork with given hash.
507	pub fn has_view(&self, hash: &Block::Hash) -> bool {
508		self.view_store.active_views.read().contains_key(hash)
509	}
510
511	/// Returns a number of unwatched and watched transactions in internal mempool.
512	///
513	/// Intended for use in unit tests.
514	pub async fn mempool_len(&self) -> (usize, usize) {
515		self.mempool.unwatched_and_watched_count().await
516	}
517
518	/// Returns a set of future transactions for given block hash.
519	///
520	/// Intended for logging / tests.
521	pub fn futures_at(
522		&self,
523		at: Block::Hash,
524	) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
525		self.view_store.futures_at(at)
526	}
527
528	/// Returns a best-effort set of ready transactions for a given block, without executing full
529	/// maintain process.
530	///
531	/// The method attempts to build a temporary view and create an iterator of ready transactions
532	/// for a specific `at` hash. If a valid view is found, it collects and prunes
533	/// transactions already included in the blocks and returns the valid set. Not finding a view
534	/// returns with the ready transaction set found in the most recent view processed by the
535	/// fork-aware txpool. Not being able to query for block number for the provided `at` block hash
536	/// results in returning an empty transaction set.
537	///
538	/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
539	/// so this process shall be fast.
540	pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
541		let start = Instant::now();
542		let api = self.api.clone();
543		debug!(
544			target: LOG_TARGET,
545			?at,
546			"fatp::ready_at_light"
547		);
548
549		let at_number = self.api.resolve_block_number(at).ok();
550		let finalized_number = self
551			.api
552			.resolve_block_number(self.enactment_state.lock().recent_finalized_block())
553			.ok();
554
555		// Prune all txs from the best view found, considering the extrinsics part of the blocks
556		// that are more recent than the view itself.
557		if let Some((view, enacted_blocks, at_hn)) = at_number.and_then(|at_number| {
558			let at_hn = HashAndNumber { hash: at, number: at_number };
559			finalized_number.and_then(|finalized_number| {
560				self.view_store
561					.find_view_descendent_up_to_number(&at_hn, finalized_number)
562					.map(|(view, enacted_blocks)| (view, enacted_blocks, at_hn))
563			})
564		}) {
565			let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&view, &at_hn);
566			let mut all_extrinsics = vec![];
567			for h in enacted_blocks {
568				let extrinsics = api
569					.block_body(h)
570					.await
571					.unwrap_or_else(|error| {
572						warn!(
573							target: LOG_TARGET,
574							%error,
575							"Compute ready light transactions: error request"
576						);
577						None
578					})
579					.unwrap_or_default()
580					.into_iter()
581					.map(|t| api.hash_and_length(&t).0);
582				all_extrinsics.extend(extrinsics);
583			}
584
585			let before_count = tmp_view.pool.validated_pool().status().ready;
586			let tags = tmp_view
587				.pool
588				.validated_pool()
589				.extrinsics_tags(&all_extrinsics)
590				.into_iter()
591				.flatten()
592				.flatten()
593				.collect::<Vec<_>>();
594			let _ = tmp_view.pool.validated_pool().prune_tags(tags);
595
596			let after_count = tmp_view.pool.validated_pool().status().ready;
597			debug!(
598				target: LOG_TARGET,
599				?at,
600				best_view_hash = ?view.at.hash,
601				before_count,
602				to_be_removed = all_extrinsics.len(),
603				after_count,
604				duration = ?start.elapsed(),
605				"fatp::ready_at_light -> light"
606			);
607			Box::new(tmp_view.pool.validated_pool().ready())
608		} else if let Some(most_recent_view) = self.view_store.most_recent_view.read().clone() {
609			// Fallback for the case when `at` is not on the already known fork.
610			// Falls back to the most recent view, which may include txs which
611			// are invalid or already included in the blocks but can still yield a
612			// partially valid ready set, which is still better than including nothing.
613			debug!(
614				target: LOG_TARGET,
615				?at,
616				duration = ?start.elapsed(),
617				"fatp::ready_at_light -> most_recent_view"
618			);
619			Box::new(most_recent_view.pool.validated_pool().ready())
620		} else {
621			let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
622			debug!(
623				target: LOG_TARGET,
624				?at,
625				duration = ?start.elapsed(),
626				"fatp::ready_at_light -> empty"
627			);
628			empty
629		}
630	}
631
632	/// Waits for the set of ready transactions for a given block up to a specified timeout.
633	///
634	/// This method combines two futures:
635	/// - The `ready_at` future, which waits for the ready transactions resulting from the full
636	/// maintenance process to be available.
637	/// - The `ready_at_light` future, used as a fallback if the timeout expires before `ready_at`
638	/// completes. This provides a best-effort, ready set of transactions as a result light
639	/// maintain.
640	///
641	/// Returns a future resolving to a ready iterator of transactions.
642	async fn ready_at_with_timeout_internal(
643		&self,
644		at: Block::Hash,
645		timeout: std::time::Duration,
646	) -> ReadyIteratorFor<ChainApi> {
647		debug!(
648			target: LOG_TARGET,
649			?at,
650			?timeout,
651			"fatp::ready_at_with_timeout"
652		);
653		let timeout = futures_timer::Delay::new(timeout);
654		let (view_already_exists, ready_at) = self.ready_at_internal(at);
655
656		if view_already_exists {
657			return ready_at.await;
658		}
659
660		let maybe_ready = async move {
661			select! {
662				ready = ready_at => Some(ready),
663				_ = timeout => {
664					debug!(
665						target: LOG_TARGET,
666						?at,
667						"Timeout fired waiting for transaction pool at block. Proceeding with production."
668					);
669					None
670				}
671			}
672		};
673
674		let fall_back_ready = self.ready_at_light(at);
675		let (maybe_ready, fall_back_ready) =
676			futures::future::join(maybe_ready, fall_back_ready).await;
677		maybe_ready.unwrap_or(fall_back_ready)
678	}
679
680	fn ready_at_internal(
681		&self,
682		at: Block::Hash,
683	) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
684		let mut ready_poll = self.ready_poll.lock();
685
686		if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
687			debug!(
688				target: LOG_TARGET,
689				?at,
690				?inactive,
691				"fatp::ready_at_internal"
692			);
693			let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
694			return (true, async move { iterator }.boxed());
695		}
696
697		let pending = ready_poll
698			.add(at)
699			.map(|received| {
700				received.unwrap_or_else(|error| {
701					warn!(
702						target: LOG_TARGET,
703						%error,
704						"Error receiving ready-set iterator"
705					);
706					Box::new(std::iter::empty())
707				})
708			})
709			.boxed();
710		debug!(
711			target: LOG_TARGET,
712			?at,
713			pending_keys = ?ready_poll.pollers.keys(),
714			"fatp::ready_at_internal"
715		);
716		(false, pending)
717	}
718
719	/// Refer to [`Self::submit_and_watch`]
720	async fn submit_and_watch_inner(
721		&self,
722		at: Block::Hash,
723		source: TransactionSource,
724		xt: TransactionFor<Self>,
725	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, ChainApi::Error> {
726		let xt = Arc::from(xt);
727
728		let at_number = self
729			.api
730			.block_id_to_number(&BlockId::Hash(at))
731			.ok()
732			.flatten()
733			.unwrap_or_default()
734			.into()
735			.as_u64();
736
737		let insertion = match self.mempool.push_watched(source, at_number, xt.clone()).await {
738			Ok(result) => result,
739			Err(TxPoolApiError::ImmediatelyDropped) => {
740				self.attempt_transaction_replacement(source, at_number, true, xt.clone())
741					.await?
742			},
743			Err(e) => return Err(e.into()),
744		};
745
746		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
747		self.events_metrics_collector.report_submitted(&insertion);
748
749		match self.view_store.submit_and_watch(at, insertion.source, xt).await {
750			Err(e) => {
751				self.mempool.remove_transactions(&[insertion.hash]).await;
752				Err(e.into())
753			},
754			Ok(mut outcome) => {
755				self.mempool
756					.update_transaction_priority(outcome.hash(), outcome.priority())
757					.await;
758				Ok(outcome.expect_watcher())
759			},
760		}
761	}
762
763	/// Refer to [`Self::submit_at`]
764	async fn submit_at_inner(
765		&self,
766		at: Block::Hash,
767		source: TransactionSource,
768		xts: Vec<TransactionFor<Self>>,
769	) -> Result<Vec<Result<TxHash<Self>, ChainApi::Error>>, ChainApi::Error> {
770		let at_number = self
771			.api
772			.block_id_to_number(&BlockId::Hash(at))
773			.ok()
774			.flatten()
775			.unwrap_or_default()
776			.into()
777			.as_u64();
778		let view_store = self.view_store.clone();
779		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
780		let mempool_results = self.mempool.extend_unwatched(source, at_number, &xts).await;
781
782		if view_store.is_empty() {
783			return Ok(mempool_results
784				.into_iter()
785				.map(|r| r.map(|r| r.hash).map_err(Into::into))
786				.collect::<Vec<_>>());
787		}
788
789		// Submit all the transactions to the mempool
790		let retries = mempool_results
791			.into_iter()
792			.zip(xts.clone())
793			.map(|(result, xt)| async move {
794				match result {
795					Err(TxPoolApiError::ImmediatelyDropped) => {
796						self.attempt_transaction_replacement(source, at_number, false, xt).await
797					},
798					_ => result,
799				}
800			})
801			.collect::<Vec<_>>();
802
803		let mempool_results = futures::future::join_all(retries).await;
804
805		// Collect transactions that were successfully submitted to the mempool...
806		let to_be_submitted = mempool_results
807			.iter()
808			.zip(xts)
809			.filter_map(|(result, xt)| {
810				result.as_ref().ok().map(|insertion| {
811					self.events_metrics_collector.report_submitted(&insertion);
812					(insertion.source.clone(), xt)
813				})
814			})
815			.collect::<Vec<_>>();
816
817		self.metrics
818			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
819
820		// ... and submit them to the view_store. Please note that transactions rejected by mempool
821		// are not sent here.
822		let mempool = self.mempool.clone();
823		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
824		let mut submission_results = reduce_multiview_result(results_map).into_iter();
825
826		// Note for composing final result:
827		//
828		// For each failed insertion into the mempool, the mempool result should be placed into
829		// the returned vector.
830		//
831		// For each successful insertion into the mempool, the corresponding
832		// view_store submission result needs to be examined (merged_results):
833		// - If there is an error during view_store submission, the transaction is removed from
834		// the mempool, and the final result recorded in the vector for this transaction is the
835		// view_store submission error.
836		//
837		// - If the view_store submission is successful, the transaction priority is updated in the
838		// mempool.
839		//
840		// Finally, it collects the hashes of updated transactions or submission errors (either
841		// from the mempool or view_store) into a returned vector (final_results).
842		const RESULTS_ASSUMPTION : &str =
843			"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
844		let merged_results = mempool_results.into_iter().map(|result| {
845			result.map_err(Into::into).and_then(|insertion| {
846				Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION)))
847			})
848		});
849
850		let mut final_results = vec![];
851		for r in merged_results {
852			match r {
853				Ok((hash, submission_result)) => match submission_result {
854					Ok(r) => {
855						mempool.update_transaction_priority(r.hash(), r.priority()).await;
856						final_results.push(Ok(r.hash()));
857					},
858					Err(e) => {
859						mempool.remove_transactions(&[hash]).await;
860						final_results.push(Err(e));
861					},
862				},
863				Err(e) => final_results.push(Err(e)),
864			}
865		}
866
867		Ok(final_results)
868	}
869
870	/// Number of notified items in import_notification_sink.
871	///
872	/// Internal detail, exposed only for testing.
873	pub fn import_notification_sink_len(&self) -> usize {
874		self.import_notification_sink.notified_items_len()
875	}
876}
877
878/// Converts the input view-to-statuses map into the output vector of statuses.
879///
880/// The result of importing a bunch of transactions into a single view is the vector of statuses.
881/// Every item represents a status for single transaction. The input is the map that associates
882/// hash-views with vectors indicating the statuses of transactions imports.
883///
884/// Import to multiple views result in two-dimensional array of statuses, which is provided as
885/// input map.
886///
887/// This function converts the map into the vec of results, according to the following rules:
888/// - for given transaction if at least one status is success, then output vector contains success,
889/// - if given transaction status is error for every view, then output vector contains error.
890///
891/// The results for transactions are in the same order for every view. An output vector preserves
892/// this order.
893///
894/// ```skip
895/// in:
896/// view  |   xt0 status | xt1 status | xt2 status
897/// h1   -> [ Ok(xth0),    Ok(xth1),    Err       ]
898/// h2   -> [ Ok(xth0),    Err,         Err       ]
899/// h3   -> [ Ok(xth0),    Ok(xth1),    Err       ]
900///
901/// out:
902/// [ Ok(xth0), Ok(xth1), Err ]
903/// ```
904fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
905	let mut values = input.values();
906	let Some(first) = values.next() else {
907		return Default::default();
908	};
909	let length = first.len();
910	debug_assert!(values.all(|x| length == x.len()));
911
912	input
913		.into_values()
914		.reduce(|mut agg_results, results| {
915			agg_results.iter_mut().zip(results.into_iter()).for_each(|(agg_r, r)| {
916				if agg_r.is_err() {
917					*agg_r = r;
918				}
919			});
920			agg_results
921		})
922		.unwrap_or_default()
923}
924
925#[async_trait]
926impl<ChainApi, Block> TransactionPool for ForkAwareTxPool<ChainApi, Block>
927where
928	Block: BlockT,
929	ChainApi: 'static + graph::ChainApi<Block = Block>,
930	<Block as BlockT>::Hash: Unpin,
931{
932	type Block = ChainApi::Block;
933	type Hash = ExtrinsicHash<ChainApi>;
934	type InPoolTransaction = Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>;
935	type Error = ChainApi::Error;
936
937	/// Submits multiple transactions and returns a future resolving to the submission results.
938	///
939	/// Actual transactions submission process is delegated to the `ViewStore` internal instance.
940	///
941	/// The internal limits of the pool are checked. The results of submissions to individual views
942	/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
943	async fn submit_at(
944		&self,
945		at: <Self::Block as BlockT>::Hash,
946		source: TransactionSource,
947		xts: Vec<TransactionFor<Self>>,
948	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
949		let start = Instant::now();
950		trace!(
951			target: LOG_TARGET,
952			count = xts.len(),
953			active_views_count = self.active_views_count(),
954			"fatp::submit_at"
955		);
956		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at");
957		let result = self.submit_at_inner(at, source, xts).await;
958		insert_and_log_throttled!(
959			Level::DEBUG,
960			target:LOG_TARGET_STAT,
961			prefix:"submit_stats",
962			self.submit_stats,
963			start.elapsed().into()
964		);
965		result
966	}
967
968	/// Submits a single transaction and returns a future resolving to the submission results.
969	///
970	/// Actual transaction submission process is delegated to the `submit_at` function.
971	async fn submit_one(
972		&self,
973		_at: <Self::Block as BlockT>::Hash,
974		source: TransactionSource,
975		xt: TransactionFor<Self>,
976	) -> Result<TxHash<Self>, Self::Error> {
977		trace!(
978			target: LOG_TARGET,
979			tx_hash = ?self.tx_hash(&xt),
980			active_views_count = self.active_views_count(),
981			"fatp::submit_one"
982		);
983		match self.submit_at(_at, source, vec![xt]).await {
984			Ok(mut v) => {
985				v.pop().expect("There is exactly one element in result of submit_at. qed.")
986			},
987			Err(e) => Err(e),
988		}
989	}
990
991	/// Submits a transaction and starts to watch its progress in the pool, returning a stream of
992	/// status updates.
993	///
994	/// Actual transaction submission process is delegated to the `ViewStore` internal instance.
995	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::submit_and_watch")]
996	async fn submit_and_watch(
997		&self,
998		at: <Self::Block as BlockT>::Hash,
999		source: TransactionSource,
1000		xt: TransactionFor<Self>,
1001	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
1002		let start = Instant::now();
1003		trace!(
1004			target: LOG_TARGET,
1005			tx_hash = ?self.tx_hash(&xt),
1006			views = self.active_views_count(),
1007			"fatp::submit_and_watch"
1008		);
1009		let result = self.submit_and_watch_inner(at, source, xt).await;
1010		insert_and_log_throttled!(
1011			Level::DEBUG,
1012			target:LOG_TARGET_STAT,
1013			prefix:"submit_and_watch_stats",
1014			self.submit_and_watch_stats,
1015			start.elapsed().into()
1016		);
1017		result
1018	}
1019
1020	/// Reports invalid transactions to the transaction pool.
1021	///
1022	/// This function takes an array of tuples, each consisting of a transaction hash and the
1023	/// corresponding error that occurred during transaction execution at given block.
1024	///
1025	/// The transaction pool implementation will determine which transactions should be
1026	/// removed from the pool. Transactions that depend on invalid transactions will also
1027	/// be removed.
1028	async fn report_invalid(
1029		&self,
1030		at: Option<<Self::Block as BlockT>::Hash>,
1031		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
1032	) -> Vec<Arc<Self::InPoolTransaction>> {
1033		debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
1034		log_xt_debug!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
1035		self.metrics
1036			.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
1037
1038		let removed = self.view_store.report_invalid(at, invalid_tx_errors);
1039
1040		let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
1041		self.mempool.remove_transactions(&removed_hashes).await;
1042		self.import_notification_sink.clean_notified_items(&removed_hashes);
1043
1044		self.metrics
1045			.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
1046
1047		removed
1048	}
1049
1050	// todo [#5491]: api change?
1051	// status(Hash) -> Option<PoolStatus>
1052	/// Returns the pool status which includes information like the number of ready and future
1053	/// transactions.
1054	///
1055	/// Currently the status for the most recently notified best block is returned (for which
1056	/// maintain process was accomplished).
1057	fn status(&self) -> PoolStatus {
1058		self.view_store
1059			.most_recent_view
1060			.read()
1061			.as_ref()
1062			.map(|v| v.status())
1063			.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
1064	}
1065
1066	/// Return an event stream of notifications when transactions are imported to the pool.
1067	///
1068	/// Consumers of this stream should use the `ready` method to actually get the
1069	/// pending transactions in the right order.
1070	fn import_notification_stream(&self) -> ImportNotificationStream<ExtrinsicHash<ChainApi>> {
1071		self.import_notification_sink.event_stream()
1072	}
1073
1074	/// Returns the hash of a given transaction.
1075	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1076		self.api().hash_and_length(xt).0
1077	}
1078
1079	/// Notifies the pool about the broadcasting status of transactions.
1080	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
1081		self.view_store.listener.transactions_broadcasted(propagations);
1082	}
1083
1084	/// Return specific ready transaction by hash, if there is one.
1085	///
1086	/// Currently the ready transaction is returned if it exists for the most recently notified best
1087	/// block (for which maintain process was accomplished).
1088	// todo [#5491]: api change: we probably should have at here?
1089	fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
1090		let most_recent_view_hash =
1091			self.view_store.most_recent_view.read().as_ref().map(|v| v.at.hash);
1092		let result = most_recent_view_hash
1093			.and_then(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash));
1094		trace!(
1095			target: LOG_TARGET,
1096			?tx_hash,
1097			is_ready = result.is_some(),
1098			most_recent_view = ?most_recent_view_hash,
1099			"ready_transaction"
1100		);
1101		result
1102	}
1103
1104	/// Returns an iterator for ready transactions at a specific block, ordered by priority.
1105	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<ChainApi> {
1106		let (_, result) = self.ready_at_internal(at);
1107		result.await
1108	}
1109
1110	/// Returns an iterator for ready transactions, ordered by priority.
1111	///
1112	/// Currently the set of ready transactions is returned if it exists for the most recently
1113	/// notified best block (for which maintain process was accomplished).
1114	fn ready(&self) -> ReadyIteratorFor<ChainApi> {
1115		self.view_store.ready()
1116	}
1117
1118	/// Returns a list of future transactions in the pool.
1119	///
1120	/// Currently the set of future transactions is returned if it exists for the most recently
1121	/// notified best block (for which maintain process was accomplished).
1122	fn futures(&self) -> Vec<Self::InPoolTransaction> {
1123		self.view_store.futures()
1124	}
1125
1126	/// Returns a set of ready transactions at a given block within the specified timeout.
1127	///
1128	/// If the timeout expires before the maintain process is accomplished, a best-effort
1129	/// set of transactions is returned (refer to `ready_at_light`).
1130	async fn ready_at_with_timeout(
1131		&self,
1132		at: <Self::Block as BlockT>::Hash,
1133		timeout: std::time::Duration,
1134	) -> ReadyIteratorFor<ChainApi> {
1135		self.ready_at_with_timeout_internal(at, timeout).await
1136	}
1137}
1138
1139impl<ChainApi, Block> soil_client::transaction_pool::LocalTransactionPool
1140	for ForkAwareTxPool<ChainApi, Block>
1141where
1142	Block: BlockT,
1143	ChainApi: 'static + graph::ChainApi<Block = Block>,
1144	<Block as BlockT>::Hash: Unpin,
1145{
1146	type Block = Block;
1147	type Hash = ExtrinsicHash<ChainApi>;
1148	type Error = ChainApi::Error;
1149
1150	fn submit_local(
1151		&self,
1152		at: Block::Hash,
1153		xt: soil_client::transaction_pool::LocalTransactionFor<Self>,
1154	) -> Result<Self::Hash, Self::Error> {
1155		trace!(
1156			target: LOG_TARGET,
1157			active_views_count = self.active_views_count(),
1158			"fatp::submit_local"
1159		);
1160		let xt = Arc::from(xt);
1161		let at_number = self
1162			.api
1163			.block_id_to_number(&BlockId::Hash(at))
1164			.ok()
1165			.flatten()
1166			.unwrap_or_default()
1167			.into()
1168			.as_u64();
1169
1170		// note: would be nice to get rid of sync methods one day. See: #8912
1171		let result = self
1172			.mempool
1173			.clone()
1174			.extend_unwatched_sync(TransactionSource::Local, at_number, vec![xt.clone()])
1175			.remove(0);
1176
1177		let insertion = match result {
1178			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
1179				TransactionSource::Local,
1180				false,
1181				xt.clone(),
1182			),
1183			_ => result,
1184		}?;
1185
1186		self.view_store
1187			.submit_local(xt)
1188			.inspect_err(|_| {
1189				self.mempool.clone().remove_transactions_sync(vec![insertion.hash]);
1190			})
1191			.map(|outcome| {
1192				self.mempool
1193					.clone()
1194					.update_transaction_priority_sync(outcome.hash(), outcome.priority());
1195				outcome.hash()
1196			})
1197			.or_else(|_| Ok(insertion.hash))
1198	}
1199}
1200
1201impl<ChainApi, Block> ForkAwareTxPool<ChainApi, Block>
1202where
1203	Block: BlockT,
1204	ChainApi: graph::ChainApi<Block = Block> + 'static,
1205	<Block as BlockT>::Hash: Unpin,
1206{
1207	/// Handles a new block notification.
1208	///
1209	/// It is responsible for handling a newly notified block. It executes some sanity checks, find
1210	/// the best view to clone from and executes the new view build procedure for the notified
1211	/// block.
1212	///
1213	/// If the view is correctly created, `ready_at` pollers for this block will be triggered.
1214	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::handle_new_block")]
1215	async fn handle_new_block(&self, tree_route: &TreeRoute<Block>) {
1216		let hash_and_number = match tree_route.last() {
1217			Some(hash_and_number) => hash_and_number,
1218			None => {
1219				warn!(
1220					target: LOG_TARGET,
1221					?tree_route,
1222					"Skipping ChainEvent - no last block in tree route"
1223				);
1224				return;
1225			},
1226		};
1227
1228		if self.has_view(&hash_and_number.hash) {
1229			debug!(
1230				target: LOG_TARGET,
1231				?hash_and_number,
1232				"view already exists for block"
1233			);
1234			return;
1235		}
1236
1237		let best_view = self.view_store.find_best_view(tree_route);
1238		let new_view = self.build_and_update_view(best_view, hash_and_number, tree_route).await;
1239
1240		if let Some(view) = new_view {
1241			{
1242				let view = view.clone();
1243				self.ready_poll.lock().trigger(hash_and_number.hash, move || {
1244					Box::from(view.pool.validated_pool().ready())
1245				});
1246			}
1247
1248			View::start_background_revalidation(view, self.revalidation_queue.clone()).await;
1249		}
1250
1251		self.finality_stall_cleanup(hash_and_number).await;
1252	}
1253
1254	/// Cleans up transactions and views outdated by potential finality stalls.
1255	///
1256	/// This function removes transactions from the pool that were included in blocks but not
1257	/// finalized within a pre-defined block height threshold. Transactions not meeting finality
1258	/// within this threshold are notified with finality timed out event. The threshold is based on
1259	/// the current block number, 'at'.
1260	///
1261	/// Additionally, this method triggers the view store to handle and remove stale views caused by
1262	/// the finality stall.
1263	async fn finality_stall_cleanup(&self, at: &HashAndNumber<Block>) {
1264		let (oldest_block_number, finality_timedout_blocks) = {
1265			let mut included_transactions = self.included_transactions.lock();
1266
1267			let Some(oldest_block_number) =
1268				included_transactions.first_key_value().map(|(k, _)| k.number)
1269			else {
1270				return;
1271			};
1272
1273			if at.number.saturating_sub(oldest_block_number).into()
1274				<= self.finality_timeout_threshold.into()
1275			{
1276				return;
1277			}
1278
1279			let mut finality_timedout_blocks =
1280				indexmap::IndexMap::<BlockHash<ChainApi>, Vec<ExtrinsicHash<ChainApi>>>::default();
1281
1282			included_transactions.retain(
1283				|HashAndNumber { number: view_number, hash: view_hash }, tx_hashes| {
1284					let diff = at.number.saturating_sub(*view_number);
1285					if diff.into() > self.finality_timeout_threshold.into() {
1286						finality_timedout_blocks.insert(*view_hash, std::mem::take(tx_hashes));
1287						false
1288					} else {
1289						true
1290					}
1291				},
1292			);
1293
1294			(oldest_block_number, finality_timedout_blocks)
1295		};
1296
1297		if !finality_timedout_blocks.is_empty() {
1298			self.ready_poll.lock().remove_cancelled();
1299			self.view_store.listener.remove_stale_controllers();
1300		}
1301
1302		let finality_timedout_blocks_len = finality_timedout_blocks.len();
1303
1304		for (block_hash, tx_hashes) in finality_timedout_blocks {
1305			self.view_store.listener.transactions_finality_timeout(&tx_hashes, block_hash);
1306
1307			self.mempool.remove_transactions(&tx_hashes).await;
1308			self.import_notification_sink.clean_notified_items(&tx_hashes);
1309			self.view_store.dropped_stream_controller.remove_transactions(tx_hashes.clone());
1310		}
1311
1312		self.view_store.finality_stall_view_cleanup(at, self.finality_timeout_threshold);
1313
1314		debug!(
1315			target: LOG_TARGET,
1316			?at,
1317			included_transactions_len = ?self.included_transactions.lock().len(),
1318			finality_timedout_blocks_len,
1319			?oldest_block_number,
1320			"finality_stall_cleanup"
1321		);
1322	}
1323
1324	/// Builds a new view.
1325	///
1326	/// If `origin_view` is provided, the new view will be cloned from it. Otherwise an empty view
1327	/// will be created.
1328	///
1329	/// This method will also update multi-view listeners with newly created view.
1330	///
1331	/// The new view will not be inserted into the view store.
1332	fn build_and_plug_view(
1333		&self,
1334		origin_view: Option<Arc<View<ChainApi>>>,
1335		at: &HashAndNumber<Block>,
1336		tree_route: &TreeRoute<Block>,
1337	) -> View<ChainApi> {
1338		let enter = Instant::now();
1339		let (view, view_dropped_stream, view_aggregated_stream) =
1340			if let Some(origin_view) = origin_view {
1341				let (mut view, view_dropped_stream, view_aggragated_stream) =
1342					View::new_from_other(&origin_view, at);
1343				if !tree_route.retracted().is_empty() {
1344					view.pool.clear_recently_pruned();
1345				}
1346				(view, view_dropped_stream, view_aggragated_stream)
1347			} else {
1348				debug!(
1349					target: LOG_TARGET,
1350					?at,
1351					"creating non-cloned view"
1352				);
1353				View::new(
1354					self.api.clone(),
1355					at.clone(),
1356					self.options.clone(),
1357					self.metrics.clone(),
1358					self.is_validator.clone(),
1359				)
1360			};
1361		debug!(
1362			target: LOG_TARGET,
1363			?at,
1364			duration = ?enter.elapsed(),
1365			"build_new_view::clone_view"
1366		);
1367
1368		// 1. Capture all import notification from the very beginning, so first register all
1369		// the listeners.
1370		self.import_notification_sink.add_view(
1371			view.at.hash,
1372			view.pool.validated_pool().import_notification_stream().boxed(),
1373		);
1374
1375		self.view_store
1376			.dropped_stream_controller
1377			.add_view(view.at.hash, view_dropped_stream.boxed());
1378
1379		self.view_store
1380			.listener
1381			.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
1382
1383		view
1384	}
1385
1386	/// Builds and updates a new view.
1387	///
1388	/// This functio uses [`Self::build_new_view`] to create or clone new view.
1389	///
1390	/// The new view will be updated with transactions from the tree_route and the mempool, all
1391	/// required events will be triggered, it will be inserted to the view store (respecting all
1392	/// pre-insertion actions).
1393	async fn build_and_update_view(
1394		&self,
1395		origin_view: Option<Arc<View<ChainApi>>>,
1396		at: &HashAndNumber<Block>,
1397		tree_route: &TreeRoute<Block>,
1398	) -> Option<Arc<View<ChainApi>>> {
1399		let start = Instant::now();
1400		debug!(
1401			target: LOG_TARGET,
1402			?at,
1403			origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()),
1404			?tree_route,
1405			"build_new_view"
1406		);
1407
1408		let mut view = self.build_and_plug_view(origin_view, at, tree_route);
1409
1410		// sync the transactions statuses and referencing views in all the listeners with newly
1411		// cloned view.
1412		view.pool.validated_pool().retrigger_notifications();
1413		debug!(
1414			target: LOG_TARGET,
1415			?at,
1416			duration = ?start.elapsed(),
1417			"register_listeners"
1418		);
1419
1420		// 2. Handle transactions from the tree route. Pruning transactions from the view first
1421		// will make some space for mempool transactions in case we are at the view's limits.
1422		let start = Instant::now();
1423		self.update_view_with_fork(&view, tree_route, at.clone()).await;
1424		debug!(
1425			target: LOG_TARGET,
1426			?at,
1427			duration = ?start.elapsed(),
1428			"update_view_with_fork"
1429		);
1430
1431		// 3. Finally, submit transactions from the mempool.
1432		let start = Instant::now();
1433		self.update_view_with_mempool(&mut view).await;
1434		debug!(
1435			target: LOG_TARGET,
1436			?at,
1437			duration= ?start.elapsed(),
1438			"update_view_with_mempool"
1439		);
1440		let view = Arc::from(view);
1441		self.view_store.insert_new_view(view.clone(), tree_route).await;
1442
1443		debug!(
1444			target: LOG_TARGET,
1445			duration = ?start.elapsed(),
1446			?at,
1447			"build_new_view"
1448		);
1449		Some(view)
1450	}
1451
1452	/// Retrieves transactions hashes from a `included_transactions` cache or, if not present,
1453	/// fetches them from the blockchain API using the block's hash `at`.
1454	///
1455	/// Returns a `Vec` of transactions hashes
1456	async fn fetch_block_transactions(&self, at: &HashAndNumber<Block>) -> Vec<TxHash<Self>> {
1457		if let Some(txs) = self.included_transactions.lock().get(at) {
1458			return txs.clone();
1459		};
1460
1461		debug!(
1462			target: LOG_TARGET,
1463			?at,
1464			"fetch_block_transactions from api"
1465		);
1466
1467		self.api
1468			.block_body(at.hash)
1469			.await
1470			.unwrap_or_else(|error| {
1471				warn!(
1472					target: LOG_TARGET,
1473					%error,
1474					"fetch_block_transactions: error request"
1475				);
1476				None
1477			})
1478			.unwrap_or_default()
1479			.into_iter()
1480			.map(|t| self.hash_of(&t))
1481			.collect::<Vec<_>>()
1482	}
1483
1484	/// Returns the list of xts included in all block's ancestors up to recently finalized block (or
1485	/// up finality timeout threshold), including the block itself.
1486	///
1487	/// Example: for the following chain `F<-B1<-B2<-B3` xts from `B1,B2,B3` will be returned.
1488	async fn txs_included_since_finalized(
1489		&self,
1490		at: &HashAndNumber<Block>,
1491	) -> HashSet<TxHash<Self>> {
1492		let start = Instant::now();
1493		let recent_finalized_block = self.enactment_state.lock().recent_finalized_block();
1494
1495		let Ok(tree_route) = self.api.tree_route(recent_finalized_block, at.hash) else {
1496			return Default::default();
1497		};
1498
1499		let mut all_txs = HashSet::new();
1500
1501		for block in tree_route.enacted().iter() {
1502			// note: There is no point to fetch the transactions from blocks older than threshold.
1503			// All transactions included in these blocks, were already removed from pool
1504			// with FinalityTimeout event.
1505			if at.number.saturating_sub(block.number).into()
1506				<= self.finality_timeout_threshold.into()
1507			{
1508				all_txs.extend(self.fetch_block_transactions(block).await);
1509			}
1510		}
1511
1512		debug!(
1513			target: LOG_TARGET,
1514			?at,
1515			?recent_finalized_block,
1516			extrinsics_count = all_txs.len(),
1517			duration = ?start.elapsed(),
1518			"fatp::txs_included_since_finalized"
1519		);
1520		all_txs
1521	}
1522
1523	/// Updates the given view with the transactions from the internal mempol.
1524	///
1525	/// All transactions from the mempool (excluding those which are either already imported or
1526	/// already included in blocks since recently finalized block) are submitted to the
1527	/// view.
1528	///
1529	/// If there are no views, and mempool transaction is reported as invalid for the given view,
1530	/// the transaction is notified as invalid and removed from the mempool.
1531	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
1532		let xts_count = self.mempool.unwatched_and_watched_count().await;
1533		debug!(
1534			target: LOG_TARGET,
1535			view_at = ?view.at,
1536			?xts_count,
1537			active_views_count = self.active_views_count(),
1538			"update_view_with_mempool"
1539		);
1540		let included_xts = self.txs_included_since_finalized(&view.at).await;
1541
1542		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
1543			.mempool
1544			.with_transactions(|iter| {
1545				iter.filter(|(hash, _)| !view.is_imported(&hash) && !included_xts.contains(&hash))
1546					.map(|(k, v)| (*k, v.clone()))
1547					// todo [#8835]: better approach is needed - maybe time-budget approach?
1548					.take(MEMPOOL_TO_VIEW_BATCH_SIZE)
1549					.collect::<HashMap<_, _>>()
1550			})
1551			.await
1552			.into_iter()
1553			.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
1554			.unzip();
1555
1556		let results = view
1557			.submit_many(xts_filtered, ValidateTransactionPriority::Maintained)
1558			.await
1559			.into_iter()
1560			.zip(hashes)
1561			.map(|(result, tx_hash)| async move {
1562				if let Ok(outcome) = result {
1563					Ok(self
1564						.mempool
1565						.update_transaction_priority(outcome.hash(), outcome.priority())
1566						.await)
1567				} else {
1568					Err(tx_hash)
1569				}
1570			})
1571			.collect::<Vec<_>>();
1572
1573		let results = futures::future::join_all(results).await;
1574
1575		let submitted_count = results.len();
1576
1577		debug!(
1578			target: LOG_TARGET,
1579			view_at_hash = ?view.at.hash,
1580			submitted_count,
1581			mempool_len = self.mempool.len(),
1582			"update_view_with_mempool"
1583		);
1584
1585		self.metrics
1586			.report(|metrics| metrics.submitted_from_mempool_txs.inc_by(submitted_count as _));
1587
1588		// if there are no views yet, and a single newly created view is reporting error, just send
1589		// out the invalid event, and remove transaction.
1590		if self.view_store.is_empty() {
1591			for result in results {
1592				if let Err(tx_hash) = result {
1593					self.view_store.listener.transactions_invalidated(&[tx_hash]);
1594					self.mempool.remove_transactions(&[tx_hash]).await;
1595				}
1596			}
1597		}
1598	}
1599
1600	/// Attempts to search the view store for the `provides` tags of enacted
1601	/// transactions associated with the specified `tree_route`.
1602	///
1603	/// The 'provides' tags of transactions from enacted blocks are searched
1604	/// in inactive views. Found `provide` tags are intended to serve as cache,
1605	/// helping to avoid unnecessary revalidations during pruning.
1606	async fn collect_provides_tags_from_view_store(
1607		&self,
1608		tree_route: &TreeRoute<Block>,
1609		xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
1610	) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
1611		let blocks_hashes = tree_route
1612			.retracted()
1613			.iter()
1614			// Skip the tip of the retracted fork, since it has an active view.
1615			.skip(1)
1616			// Skip also the tip of the enacted fork, since it has an active view too.
1617			.chain(
1618				std::iter::once(tree_route.common_block())
1619					.chain(tree_route.enacted().iter().rev().skip(1)),
1620			)
1621			.collect::<Vec<&HashAndNumber<Block>>>();
1622
1623		self.view_store.provides_tags_from_inactive_views(blocks_hashes, xts_hashes)
1624	}
1625
1626	/// Build a map from blocks to their extrinsics.
1627	pub async fn collect_extrinsics(
1628		&self,
1629		blocks: &[HashAndNumber<Block>],
1630	) -> HashMap<Block::Hash, Vec<RawExtrinsicFor<ChainApi>>> {
1631		future::join_all(blocks.iter().map(|hn| async move {
1632			(
1633				hn.hash,
1634				self.api
1635					.block_body(hn.hash)
1636					.await
1637					.unwrap_or_else(|e| {
1638						warn!(target: LOG_TARGET, %e, ": block_body error request");
1639						None
1640					})
1641					.unwrap_or_default(),
1642			)
1643		}))
1644		.await
1645		.into_iter()
1646		.collect()
1647	}
1648
1649	/// Updates the view with the transactions from the given tree route.
1650	///
1651	/// Transactions from the retracted blocks are resubmitted to the given view. Tags for
1652	/// transactions included in blocks on enacted fork are pruned from the provided view.
1653	async fn update_view_with_fork(
1654		&self,
1655		view: &View<ChainApi>,
1656		tree_route: &TreeRoute<Block>,
1657		hash_and_number: HashAndNumber<Block>,
1658	) {
1659		debug!(
1660			target: LOG_TARGET,
1661			?tree_route,
1662			at = ?view.at,
1663			"update_view_with_fork"
1664		);
1665		let api = self.api.clone();
1666
1667		// Collect extrinsics on the enacted path in a map from block hn -> extrinsics.
1668		let mut extrinsics = self.collect_extrinsics(tree_route.enacted()).await;
1669
1670		// Create a map from enacted blocks' extrinsics to their `provides`
1671		// tags based on inactive views.
1672		let known_provides_tags = Arc::new(
1673			self.collect_provides_tags_from_view_store(
1674				tree_route,
1675				extrinsics.values().flatten().map(|tx| view.pool.hash_of(tx)).collect(),
1676			)
1677			.await,
1678		);
1679
1680		debug!(target: LOG_TARGET, "update_view_with_fork: txs to tags map length: {}", known_provides_tags.len());
1681
1682		// We keep track of everything we prune so that later we won't add
1683		// transactions with those hashes from the retracted blocks.
1684		let mut pruned_log = HashSet::<ExtrinsicHash<ChainApi>>::new();
1685		future::join_all(tree_route.enacted().iter().map(|hn| {
1686			let api = api.clone();
1687			let xts = extrinsics.remove(&hn.hash).unwrap_or_default();
1688			let known_provides_tags = known_provides_tags.clone();
1689			async move {
1690				(
1691					hn,
1692					crate::prune_known_txs_for_block(
1693						hn,
1694						&*api,
1695						&view.pool,
1696						Some(xts),
1697						Some(known_provides_tags),
1698					)
1699					.await,
1700				)
1701			}
1702		}))
1703		.await
1704		.into_iter()
1705		.for_each(|(key, enacted_log)| {
1706			pruned_log.extend(enacted_log.clone());
1707			self.included_transactions.lock().insert(key.clone(), enacted_log);
1708		});
1709
1710		let unknown_count = self.mempool.count_unknown_transactions(pruned_log.iter()).await;
1711		self.metrics
1712			.report(|metrics| metrics.unknown_from_block_import_txs.inc_by(unknown_count as _));
1713
1714		// resubmit
1715		{
1716			let mut resubmit_transactions = Vec::new();
1717
1718			for retracted in tree_route.retracted() {
1719				let hash = retracted.hash;
1720
1721				let block_transactions = api
1722					.block_body(hash)
1723					.await
1724					.unwrap_or_else(|error| {
1725						warn!(
1726							target: LOG_TARGET,
1727							%error,
1728							"Failed to fetch block body"
1729						);
1730						None
1731					})
1732					.unwrap_or_default()
1733					.into_iter();
1734
1735				let mut resubmitted_to_report = 0;
1736
1737				let txs = block_transactions.into_iter().map(|tx| (self.hash_of(&tx), tx)).filter(
1738					|(tx_hash, _)| {
1739						let contains = pruned_log.contains(&tx_hash);
1740
1741						// need to count all transactions, not just filtered, here
1742						resubmitted_to_report += 1;
1743
1744						if !contains {
1745							trace!(
1746								target: LOG_TARGET,
1747								?tx_hash,
1748								?hash,
1749								"Resubmitting from retracted block"
1750							);
1751						}
1752						!contains
1753					},
1754				);
1755				let mut result = vec![];
1756				for (tx_hash, tx) in txs {
1757					result.push(
1758						// find arc if tx is known
1759						self.mempool
1760							.get_by_hash(tx_hash)
1761							.await
1762							.map(|tx| (tx.source(), tx.tx()))
1763							.unwrap_or_else(|| {
1764								// These transactions are coming from retracted blocks, we
1765								// should simply consider them external.
1766								(TimedTransactionSource::new_external(true), Arc::from(tx))
1767							}),
1768					);
1769				}
1770				resubmit_transactions.extend(result);
1771
1772				self.metrics.report(|metrics| {
1773					metrics.resubmitted_retracted_txs.inc_by(resubmitted_to_report)
1774				});
1775			}
1776
1777			let _ = view
1778				.pool
1779				.resubmit_at(
1780					&hash_and_number,
1781					resubmit_transactions,
1782					ValidateTransactionPriority::Maintained,
1783				)
1784				.await;
1785		}
1786	}
1787
1788	/// Executes the maintainance for the finalized event.
1789	///
1790	/// Performs a house-keeping required for finalized event. This includes:
1791	/// - executing the on finalized procedure for the view store,
1792	/// - purging finalized transactions from the mempool and triggering mempool revalidation,
1793	async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) {
1794		let start = Instant::now();
1795		let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
1796		debug!(
1797			target: LOG_TARGET,
1798			?finalized_number,
1799			?tree_route,
1800			active_views_count = self.active_views_count(),
1801			"handle_finalized"
1802		);
1803		let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
1804
1805		self.mempool.purge_finalized_transactions(&finalized_xts).await;
1806		self.import_notification_sink.clean_notified_items(&finalized_xts);
1807
1808		self.metrics
1809			.report(|metrics| metrics.finalized_txs.inc_by(finalized_xts.len() as _));
1810
1811		if let Ok(Some(finalized_number)) = finalized_number {
1812			self.included_transactions
1813				.lock()
1814				.retain(|cached_block, _| finalized_number < cached_block.number);
1815			self.revalidation_queue
1816				.revalidate_mempool(
1817					self.mempool.clone(),
1818					self.view_store.clone(),
1819					HashAndNumber { hash: finalized_hash, number: finalized_number },
1820				)
1821				.await;
1822		} else {
1823			debug!(
1824				target: LOG_TARGET,
1825				?finalized_number,
1826				"handle_finalized: revalidation/cleanup skipped: could not resolve finalized block number"
1827			);
1828		}
1829
1830		self.ready_poll.lock().remove_cancelled();
1831
1832		debug!(
1833			target: LOG_TARGET,
1834			active_views_count = self.active_views_count(),
1835			included_transactions_len = ?self.included_transactions.lock().len(),
1836			duration = ?start.elapsed(),
1837			"handle_finalized after"
1838		);
1839	}
1840
1841	/// Computes a hash of the provided transaction
1842	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
1843		self.api.hash_and_length(xt).0
1844	}
1845
1846	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
1847	/// one.
1848	///
1849	/// This asynchronous function verifies the new transaction against the most recent view. If a
1850	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
1851	/// new transaction.
1852	///
1853	/// If no lower-priority transaction is found, the function returns an error indicating the
1854	/// transaction was dropped immediately.
1855	#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "fatp::attempt_transaction_replacement")]
1856	async fn attempt_transaction_replacement(
1857		&self,
1858		source: TransactionSource,
1859		at_number: u64,
1860		watched: bool,
1861		xt: ExtrinsicFor<ChainApi>,
1862	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1863		let best_view = self
1864			.view_store
1865			.most_recent_view
1866			.read()
1867			.as_ref()
1868			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1869			.clone();
1870
1871		let (xt_hash, validated_tx) = best_view
1872			.pool
1873			.verify_one(
1874				best_view.at.hash,
1875				best_view.at.number,
1876				TimedTransactionSource::from_transaction_source(source, false),
1877				xt.clone(),
1878				crate::graph::CheckBannedBeforeVerify::Yes,
1879				ValidateTransactionPriority::Submitted,
1880			)
1881			.await;
1882
1883		let Some(priority) = validated_tx.priority() else {
1884			return Err(TxPoolApiError::ImmediatelyDropped);
1885		};
1886
1887		let insertion_info = self
1888			.mempool
1889			.try_insert_with_replacement(xt, priority, source, at_number, watched)
1890			.await?;
1891		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1892	}
1893
1894	/// Sync version of [`Self::attempt_transaction_replacement`].
1895	fn attempt_transaction_replacement_sync(
1896		&self,
1897		source: TransactionSource,
1898		watched: bool,
1899		xt: ExtrinsicFor<ChainApi>,
1900	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1901		let HashAndNumber { number: at_number, hash: at_hash } = self
1902			.view_store
1903			.most_recent_view
1904			.read()
1905			.as_ref()
1906			.ok_or(TxPoolApiError::ImmediatelyDropped)?
1907			.at;
1908
1909		let ValidTransaction { priority, .. } = self
1910			.api
1911			.validate_transaction_blocking(at_hash, TransactionSource::Local, Arc::from(xt.clone()))
1912			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
1913			.map_err(|e| match e {
1914				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
1915				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
1916			})?;
1917		let xt_hash = self.hash_of(&xt);
1918
1919		let insertion_info = self.mempool.clone().try_insert_with_replacement_sync(
1920			xt,
1921			priority,
1922			source,
1923			at_number.into().as_u64(),
1924			watched,
1925		)?;
1926		self.post_attempt_transaction_replacement(xt_hash, insertion_info)
1927	}
1928
1929	fn post_attempt_transaction_replacement(
1930		&self,
1931		tx_hash: ExtrinsicHash<ChainApi>,
1932		insertion_info: InsertionInfo<ExtrinsicHash<ChainApi>>,
1933	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
1934		for worst_hash in &insertion_info.removed {
1935			trace!(
1936				target: LOG_TARGET,
1937				tx_hash = ?worst_hash,
1938				new_tx_hash = ?tx_hash,
1939				"removed: replaced by"
1940			);
1941			self.view_store
1942				.listener
1943				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
1944
1945			self.view_store
1946				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
1947					listener.limits_enforced(&removed_tx_hash);
1948				});
1949		}
1950
1951		return Ok(insertion_info);
1952	}
1953}
1954
1955#[async_trait]
1956impl<ChainApi, Block> MaintainedTransactionPool for ForkAwareTxPool<ChainApi, Block>
1957where
1958	Block: BlockT,
1959	ChainApi: 'static + graph::ChainApi<Block = Block>,
1960	<Block as BlockT>::Hash: Unpin,
1961{
1962	/// Executes the maintainance for the given chain event.
1963	async fn maintain(&self, event: ChainEvent<Self::Block>) {
1964		let start = Instant::now();
1965		debug!(
1966			target: LOG_TARGET,
1967			?event,
1968			"processing event"
1969		);
1970
1971		self.view_store.finish_background_revalidations().await;
1972
1973		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
1974
1975		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
1976			match self.api.tree_route(from, to) {
1977				Ok(tree_route) => Ok(tree_route),
1978				Err(e) => {
1979					return Err(format!(
1980						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
1981					))
1982				},
1983			}
1984		};
1985		let block_id_to_number =
1986			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
1987
1988		let result =
1989			self.enactment_state
1990				.lock()
1991				.update(&event, &compute_tree_route, &block_id_to_number);
1992
1993		match result {
1994			Err(error) => {
1995				debug!(
1996					target: LOG_TARGET,
1997					%error,
1998					"enactment_state::update error"
1999				);
2000				self.enactment_state.lock().force_update(&event);
2001			},
2002			Ok(EnactmentAction::Skip) => return,
2003			Ok(EnactmentAction::HandleFinalization) => {
2004				// todo [#5492]: in some cases handle_new_block is actually needed (new_num >
2005				// tips_of_forks) let hash = event.hash();
2006				// if !self.has_view(hash) {
2007				// 	if let Ok(tree_route) = compute_tree_route(prev_finalized_block, hash) {
2008				// 		self.handle_new_block(&tree_route).await;
2009				// 	}
2010				// }
2011			},
2012			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
2013				self.handle_new_block(&tree_route).await;
2014			},
2015		};
2016
2017		match event {
2018			ChainEvent::NewBestBlock { .. } => {},
2019			ChainEvent::Finalized { hash, ref tree_route } => {
2020				self.handle_finalized(hash, tree_route).await;
2021
2022				debug!(
2023					target: LOG_TARGET,
2024					?tree_route,
2025					?prev_finalized_block,
2026					"on-finalized enacted"
2027				);
2028			},
2029		}
2030
2031		let duration = start.elapsed();
2032		let mempool_len = self.mempool_len().await;
2033		debug!(
2034			target: LOG_TARGET,
2035			txs = ?mempool_len,
2036			a = self.active_views_count(),
2037			i = self.inactive_views_count(),
2038			views = ?self.views_stats(),
2039			?event,
2040			?duration,
2041			"maintain"
2042		);
2043
2044		self.metrics.report(|metrics| {
2045			let (unwatched, watched) = mempool_len;
2046			let _ = (
2047				self.active_views_count().try_into().map(|v| metrics.active_views.set(v)),
2048				self.inactive_views_count().try_into().map(|v| metrics.inactive_views.set(v)),
2049				watched.try_into().map(|v| metrics.watched_txs.set(v)),
2050				unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)),
2051			);
2052			metrics.maintain_duration.observe(duration.as_secs_f64());
2053		});
2054	}
2055}
2056
2057impl<Block, Client> ForkAwareTxPool<FullChainApi<Client, Block>, Block>
2058where
2059	Block: BlockT,
2060	Client: subsoil::api::ProvideRuntimeApi<Block>
2061		+ soil_client::client_api::BlockBackend<Block>
2062		+ soil_client::client_api::blockchain::HeaderBackend<Block>
2063		+ subsoil::runtime::traits::BlockIdTo<Block>
2064		+ soil_client::client_api::ExecutorProvider<Block>
2065		+ soil_client::client_api::UsageProvider<Block>
2066		+ soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>
2067		+ Send
2068		+ Sync
2069		+ 'static,
2070	Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
2071	<Block as BlockT>::Hash: std::marker::Unpin,
2072{
2073	/// Create new fork aware transaction pool for a full node with the provided api.
2074	pub fn new_full(
2075		options: Options,
2076		is_validator: IsValidator,
2077		prometheus: Option<&PrometheusRegistry>,
2078		spawner: impl SpawnEssentialNamed,
2079		client: Arc<Client>,
2080	) -> Self {
2081		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
2082		let pool = Self::new_with_background_worker(
2083			options,
2084			is_validator,
2085			pool_api,
2086			prometheus,
2087			spawner,
2088			client.usage_info().chain.best_hash,
2089			client.usage_info().chain.finalized_hash,
2090		);
2091
2092		pool
2093	}
2094}
2095
2096#[cfg(test)]
2097mod reduce_multiview_result_tests {
2098	use super::*;
2099	use subsoil::core::H256;
2100	#[derive(Debug, PartialEq, Clone)]
2101	enum Error {
2102		Custom(u8),
2103	}
2104
2105	#[test]
2106	fn empty() {
2107		subsoil::tracing::try_init_simple();
2108		let input = HashMap::default();
2109		let r = reduce_multiview_result::<H256, H256, Error>(input);
2110		assert!(r.is_empty());
2111	}
2112
2113	#[test]
2114	fn errors_only() {
2115		subsoil::tracing::try_init_simple();
2116		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2117			(
2118				H256::repeat_byte(0x13),
2119				vec![
2120					Err(Error::Custom(10)),
2121					Err(Error::Custom(11)),
2122					Err(Error::Custom(12)),
2123					Err(Error::Custom(13)),
2124				],
2125			),
2126			(
2127				H256::repeat_byte(0x14),
2128				vec![
2129					Err(Error::Custom(20)),
2130					Err(Error::Custom(21)),
2131					Err(Error::Custom(22)),
2132					Err(Error::Custom(23)),
2133				],
2134			),
2135			(
2136				H256::repeat_byte(0x15),
2137				vec![
2138					Err(Error::Custom(30)),
2139					Err(Error::Custom(31)),
2140					Err(Error::Custom(32)),
2141					Err(Error::Custom(33)),
2142				],
2143			),
2144		];
2145		let input = HashMap::from_iter(v.clone());
2146		let r = reduce_multiview_result(input);
2147
2148		// order in HashMap is random, the result shall be one of:
2149		assert!(r == v[0].1 || r == v[1].1 || r == v[2].1);
2150	}
2151
2152	#[test]
2153	#[should_panic]
2154	#[cfg(debug_assertions)]
2155	fn invalid_lengths() {
2156		subsoil::tracing::try_init_simple();
2157		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2158			(H256::repeat_byte(0x13), vec![Err(Error::Custom(12)), Err(Error::Custom(13))]),
2159			(H256::repeat_byte(0x14), vec![Err(Error::Custom(23))]),
2160		];
2161		let input = HashMap::from_iter(v);
2162		let _ = reduce_multiview_result(input);
2163	}
2164
2165	#[test]
2166	fn only_hashes() {
2167		subsoil::tracing::try_init_simple();
2168
2169		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2170			(
2171				H256::repeat_byte(0x13),
2172				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2173			),
2174			(
2175				H256::repeat_byte(0x14),
2176				vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))],
2177			),
2178		];
2179		let input = HashMap::from_iter(v);
2180		let r = reduce_multiview_result(input);
2181
2182		assert_eq!(r, vec![Ok(H256::repeat_byte(0x13)), Ok(H256::repeat_byte(0x14))]);
2183	}
2184
2185	#[test]
2186	fn one_view() {
2187		subsoil::tracing::try_init_simple();
2188		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![(
2189			H256::repeat_byte(0x13),
2190			vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))],
2191		)];
2192		let input = HashMap::from_iter(v);
2193		let r = reduce_multiview_result(input);
2194
2195		assert_eq!(r, vec![Ok(H256::repeat_byte(0x10)), Err(Error::Custom(11))]);
2196	}
2197
2198	#[test]
2199	fn mix() {
2200		subsoil::tracing::try_init_simple();
2201		let v: Vec<(H256, Vec<Result<H256, Error>>)> = vec![
2202			(
2203				H256::repeat_byte(0x13),
2204				vec![
2205					Ok(H256::repeat_byte(0x10)),
2206					Err(Error::Custom(11)),
2207					Err(Error::Custom(12)),
2208					Err(Error::Custom(33)),
2209				],
2210			),
2211			(
2212				H256::repeat_byte(0x14),
2213				vec![
2214					Err(Error::Custom(20)),
2215					Ok(H256::repeat_byte(0x21)),
2216					Err(Error::Custom(22)),
2217					Err(Error::Custom(33)),
2218				],
2219			),
2220			(
2221				H256::repeat_byte(0x15),
2222				vec![
2223					Err(Error::Custom(30)),
2224					Err(Error::Custom(31)),
2225					Ok(H256::repeat_byte(0x32)),
2226					Err(Error::Custom(33)),
2227				],
2228			),
2229		];
2230		let input = HashMap::from_iter(v);
2231		let r = reduce_multiview_result(input);
2232
2233		assert_eq!(
2234			r,
2235			vec![
2236				Ok(H256::repeat_byte(0x10)),
2237				Ok(H256::repeat_byte(0x21)),
2238				Ok(H256::repeat_byte(0x32)),
2239				Err(Error::Custom(33))
2240			]
2241		);
2242	}
2243}