Skip to main content

soil_txpool/single_state_txpool/
single_state_txpool.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate transaction pool implementation.
8
9use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
10pub use crate::{
11	api::FullChainApi,
12	graph::{ChainApi, ValidatedTransaction},
13};
14use crate::{
15	common::{
16		enactment_state::{EnactmentAction, EnactmentState},
17		error,
18		tracing_log_xt::log_xt_trace,
19	},
20	graph::{
21		self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator,
22		RawExtrinsicFor,
23	},
24	ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
25};
26use async_trait::async_trait;
27use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
28use parking_lot::Mutex;
29use soil_prometheus::Registry as PrometheusRegistry;
30use soil_client::blockchain::{HashAndNumber, TreeRoute};
31use soil_client::transaction_pool::{
32	error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
33	PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
34	TxHash, TxInvalidityReportMap,
35};
36use std::{
37	collections::{HashMap, HashSet},
38	pin::Pin,
39	sync::Arc,
40	time::Instant,
41};
42use subsoil::core::traits::SpawnEssentialNamed;
43use subsoil::runtime::{
44	generic::BlockId,
45	traits::{
46		AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
47	},
48	transaction_validity::{TransactionTag as Tag, TransactionValidityError},
49};
50use tokio::select;
51use tracing::{trace, warn};
52
53/// Basic implementation of transaction pool that can be customized by providing PoolApi.
54pub struct BasicPool<PoolApi, Block>
55where
56	Block: BlockT,
57	PoolApi: graph::ChainApi<Block = Block>,
58{
59	pool: Arc<graph::Pool<PoolApi, ()>>,
60	api: Arc<PoolApi>,
61	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
62	revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
63	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
64	metrics: PrometheusMetrics,
65	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
66}
67
68struct ReadyPoll<T, Block: BlockT> {
69	updated_at: NumberFor<Block>,
70	pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
71}
72
73impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
74	fn default() -> Self {
75		Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
76	}
77}
78
79impl<T, Block: BlockT> ReadyPoll<T, Block> {
80	fn new(best_block_number: NumberFor<Block>) -> Self {
81		Self { updated_at: best_block_number, pollers: Default::default() }
82	}
83
84	fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
85		self.updated_at = number;
86
87		let mut idx = 0;
88		while idx < self.pollers.len() {
89			if self.pollers[idx].0 <= number {
90				let poller_sender = self.pollers.swap_remove(idx);
91				trace!(
92					target: LOG_TARGET,
93					?number,
94					"Sending ready signal."
95				);
96				let _ = poller_sender.1.send(iterator_factory());
97			} else {
98				idx += 1;
99			}
100		}
101	}
102
103	fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
104		let (sender, receiver) = oneshot::channel();
105		self.pollers.push((number, sender));
106		receiver
107	}
108
109	fn updated_at(&self) -> NumberFor<Block> {
110		self.updated_at
111	}
112}
113
114/// Type of revalidation.
115pub enum RevalidationType {
116	/// Light revalidation type.
117	///
118	/// During maintenance, transaction pool makes periodic revalidation
119	/// of all transactions depending on number of blocks or time passed.
120	/// Also this kind of revalidation does not resubmit transactions from
121	/// retracted blocks, since it is too expensive.
122	Light,
123
124	/// Full revalidation type.
125	///
126	/// During maintenance, transaction pool revalidates some fixed amount of
127	/// transactions from the pool of valid transactions.
128	Full,
129}
130
131impl<PoolApi, Block> BasicPool<PoolApi, Block>
132where
133	Block: BlockT,
134	PoolApi: graph::ChainApi<Block = Block> + 'static,
135{
136	/// Create new basic transaction pool with provided api, for tests.
137	pub fn new_test(
138		pool_api: Arc<PoolApi>,
139		best_block_hash: Block::Hash,
140		finalized_hash: Block::Hash,
141		options: graph::Options,
142	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
143		let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
144			options,
145			true.into(),
146			pool_api.clone(),
147		));
148		let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
149			pool_api.clone(),
150			pool.clone(),
151			finalized_hash,
152		);
153		(
154			Self {
155				api: pool_api,
156				pool,
157				revalidation_queue: Arc::new(revalidation_queue),
158				revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
159				ready_poll: Default::default(),
160				metrics: Default::default(),
161				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
162					best_block_hash,
163					finalized_hash,
164				))),
165			},
166			background_task,
167		)
168	}
169
170	/// Create new basic transaction pool with provided api and custom
171	/// revalidation type.
172	pub fn with_revalidation_type(
173		options: graph::Options,
174		is_validator: IsValidator,
175		pool_api: Arc<PoolApi>,
176		prometheus: Option<&PrometheusRegistry>,
177		revalidation_type: RevalidationType,
178		spawner: impl SpawnEssentialNamed,
179		best_block_number: NumberFor<Block>,
180		best_block_hash: Block::Hash,
181		finalized_hash: Block::Hash,
182	) -> Self {
183		let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
184			options,
185			is_validator,
186			pool_api.clone(),
187		));
188		let (revalidation_queue, background_task) = match revalidation_type {
189			RevalidationType::Light => {
190				(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None)
191			},
192			RevalidationType::Full => {
193				let (queue, background) = revalidation::RevalidationQueue::new_background(
194					pool_api.clone(),
195					pool.clone(),
196					finalized_hash,
197				);
198				(queue, Some(background))
199			},
200		};
201
202		if let Some(background_task) = background_task {
203			spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
204		}
205
206		Self {
207			api: pool_api,
208			pool,
209			revalidation_queue: Arc::new(revalidation_queue),
210			revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
211				RevalidationType::Light => {
212					RevalidationStrategy::Light(RevalidationStatus::NotScheduled)
213				},
214				RevalidationType::Full => RevalidationStrategy::Always,
215			})),
216			ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
217			metrics: PrometheusMetrics::new(prometheus),
218			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
219				best_block_hash,
220				finalized_hash,
221			))),
222		}
223	}
224
225	/// Gets shared reference to the underlying pool.
226	pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> {
227		&self.pool
228	}
229
230	/// Get access to the underlying api
231	pub fn api(&self) -> &PoolApi {
232		&self.api
233	}
234
235	async fn ready_at_with_timeout_internal(
236		&self,
237		at: Block::Hash,
238		timeout: std::time::Duration,
239	) -> ReadyIteratorFor<PoolApi> {
240		select! {
241			ready = self.ready_at(at)=> ready,
242			_ = futures_timer::Delay::new(timeout)=> self.ready()
243		}
244	}
245}
246
247#[async_trait]
248impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
249where
250	Block: BlockT,
251	PoolApi: 'static + graph::ChainApi<Block = Block>,
252{
253	type Block = PoolApi::Block;
254	type Hash = graph::ExtrinsicHash<PoolApi>;
255	type InPoolTransaction =
256		graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
257	type Error = PoolApi::Error;
258
259	async fn submit_at(
260		&self,
261		at: <Self::Block as BlockT>::Hash,
262		source: TransactionSource,
263		xts: Vec<TransactionFor<Self>>,
264	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
265		let pool = self.pool.clone();
266		let xts = xts
267			.into_iter()
268			.map(|xt| {
269				(TimedTransactionSource::from_transaction_source(source, false), Arc::from(xt))
270			})
271			.collect::<Vec<_>>();
272
273		self.metrics
274			.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
275
276		let number = self.api.resolve_block_number(at);
277		let at = HashAndNumber { hash: at, number: number? };
278		Ok(pool
279			.submit_at(&at, xts, ValidateTransactionPriority::Submitted)
280			.await
281			.into_iter()
282			.map(|result| result.map(|outcome| outcome.hash()))
283			.collect())
284	}
285
286	async fn submit_one(
287		&self,
288		at: <Self::Block as BlockT>::Hash,
289		source: TransactionSource,
290		xt: TransactionFor<Self>,
291	) -> Result<TxHash<Self>, Self::Error> {
292		let pool = self.pool.clone();
293		let xt = Arc::from(xt);
294
295		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
296
297		let number = self.api.resolve_block_number(at);
298		let at = HashAndNumber { hash: at, number: number? };
299		pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
300			.await
301			.map(|outcome| outcome.hash())
302	}
303
304	async fn submit_and_watch(
305		&self,
306		at: <Self::Block as BlockT>::Hash,
307		source: TransactionSource,
308		xt: TransactionFor<Self>,
309	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
310		let pool = self.pool.clone();
311		let xt = Arc::from(xt);
312
313		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
314
315		let number = self.api.resolve_block_number(at);
316
317		let at = HashAndNumber { hash: at, number: number? };
318		pool.submit_and_watch(
319			&at,
320			TimedTransactionSource::from_transaction_source(source, false),
321			xt,
322		)
323		.await
324		.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
325	}
326
327	async fn report_invalid(
328		&self,
329		_at: Option<<Self::Block as BlockT>::Hash>,
330		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
331	) -> Vec<Arc<Self::InPoolTransaction>> {
332		let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
333		let removed = self.pool.validated_pool().remove_invalid(&hashes);
334		self.metrics
335			.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
336		removed
337	}
338
339	fn status(&self) -> PoolStatus {
340		self.pool.validated_pool().status()
341	}
342
343	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
344		self.pool.validated_pool().import_notification_stream()
345	}
346
347	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
348		self.pool.hash_of(xt)
349	}
350
351	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
352		self.pool.validated_pool().on_broadcasted(propagations)
353	}
354
355	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
356		self.pool.validated_pool().ready_by_hash(hash)
357	}
358
359	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<PoolApi> {
360		let Ok(at) = self.api.resolve_block_number(at) else {
361			return Box::new(std::iter::empty()) as Box<_>;
362		};
363
364		let status = self.status();
365		// If there are no transactions in the pool, it is fine to return early.
366		//
367		// There could be transaction being added because of some re-org happening at the relevant
368		// block, but this is relative unlikely.
369		if status.ready == 0 && status.future == 0 {
370			return Box::new(std::iter::empty()) as Box<_>;
371		}
372
373		if self.ready_poll.lock().updated_at() >= at {
374			trace!(
375				target: LOG_TARGET,
376				?at,
377				"Transaction pool already processed block."
378			);
379			let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
380			return iterator;
381		}
382
383		let result = self.ready_poll.lock().add(at).map(|received| {
384			received.unwrap_or_else(|error| {
385				warn!(target: LOG_TARGET,  ?error, "Error receiving pending set.");
386				Box::new(std::iter::empty())
387			})
388		});
389
390		result.await
391	}
392
393	fn ready(&self) -> ReadyIteratorFor<PoolApi> {
394		Box::new(self.pool.validated_pool().ready())
395	}
396
397	fn futures(&self) -> Vec<Self::InPoolTransaction> {
398		let pool = self.pool.validated_pool().pool.read();
399		pool.futures().cloned().collect::<Vec<_>>()
400	}
401
402	async fn ready_at_with_timeout(
403		&self,
404		at: <Self::Block as BlockT>::Hash,
405		timeout: std::time::Duration,
406	) -> ReadyIteratorFor<PoolApi> {
407		self.ready_at_with_timeout_internal(at, timeout).await
408	}
409}
410
411impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
412where
413	Block: BlockT,
414	Client: subsoil::api::ProvideRuntimeApi<Block>
415		+ soil_client::client_api::BlockBackend<Block>
416		+ soil_client::client_api::blockchain::HeaderBackend<Block>
417		+ subsoil::runtime::traits::BlockIdTo<Block>
418		+ soil_client::client_api::ExecutorProvider<Block>
419		+ soil_client::client_api::UsageProvider<Block>
420		+ soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>
421		+ Send
422		+ Sync
423		+ 'static,
424	Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
425{
426	/// Create new basic transaction pool for a full node with the provided api.
427	pub fn new_full(
428		options: graph::Options,
429		is_validator: IsValidator,
430		prometheus: Option<&PrometheusRegistry>,
431		spawner: impl SpawnEssentialNamed,
432		client: Arc<Client>,
433	) -> Self {
434		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
435		let pool = Self::with_revalidation_type(
436			options,
437			is_validator,
438			pool_api,
439			prometheus,
440			RevalidationType::Full,
441			spawner,
442			client.usage_info().chain.best_number,
443			client.usage_info().chain.best_hash,
444			client.usage_info().chain.finalized_hash,
445		);
446
447		pool
448	}
449}
450
451impl<Block, Client> soil_client::transaction_pool::LocalTransactionPool
452	for BasicPool<FullChainApi<Client, Block>, Block>
453where
454	Block: BlockT,
455	Client: subsoil::api::ProvideRuntimeApi<Block>
456		+ soil_client::client_api::BlockBackend<Block>
457		+ soil_client::client_api::blockchain::HeaderBackend<Block>
458		+ subsoil::runtime::traits::BlockIdTo<Block>
459		+ soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
460	Client: Send + Sync + 'static,
461	Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
462{
463	type Block = Block;
464	type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
465	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
466
467	fn submit_local(
468		&self,
469		at: Block::Hash,
470		xt: soil_client::transaction_pool::LocalTransactionFor<Self>,
471	) -> Result<Self::Hash, Self::Error> {
472		let validity = self
473			.api
474			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
475			.map_err(|e| {
476				Self::Error::Pool(match e {
477					TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
478					TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
479				})
480			})?;
481
482		let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
483		let block_number = self
484			.api
485			.block_id_to_number(&BlockId::hash(at))?
486			.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
487
488		let validated = ValidatedTransaction::valid_at(
489			block_number.saturated_into::<u64>(),
490			hash,
491			TimedTransactionSource::new_local(false),
492			Arc::from(xt),
493			bytes,
494			validity,
495		);
496
497		self.pool
498			.validated_pool()
499			.submit(vec![validated])
500			.remove(0)
501			.map(|outcome| outcome.hash())
502	}
503}
504
505#[cfg_attr(test, derive(Debug))]
506enum RevalidationStatus<N> {
507	/// The revalidation has never been completed.
508	NotScheduled,
509	/// The revalidation is scheduled.
510	Scheduled(Option<Instant>, Option<N>),
511	/// The revalidation is in progress.
512	InProgress,
513}
514
515enum RevalidationStrategy<N> {
516	Always,
517	Light(RevalidationStatus<N>),
518}
519
520struct RevalidationAction {
521	revalidate: bool,
522	resubmit: bool,
523}
524
525impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
526	pub fn clear(&mut self) {
527		if let Self::Light(status) = self {
528			status.clear()
529		}
530	}
531
532	pub fn next(
533		&mut self,
534		block: N,
535		revalidate_time_period: Option<std::time::Duration>,
536		revalidate_block_period: Option<N>,
537	) -> RevalidationAction {
538		match self {
539			Self::Light(status) => RevalidationAction {
540				revalidate: status.next_required(
541					block,
542					revalidate_time_period,
543					revalidate_block_period,
544				),
545				resubmit: false,
546			},
547			Self::Always => RevalidationAction { revalidate: true, resubmit: true },
548		}
549	}
550}
551
552impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
553	/// Called when revalidation is completed.
554	pub fn clear(&mut self) {
555		*self = Self::NotScheduled;
556	}
557
558	/// Returns true if revalidation is required.
559	pub fn next_required(
560		&mut self,
561		block: N,
562		revalidate_time_period: Option<std::time::Duration>,
563		revalidate_block_period: Option<N>,
564	) -> bool {
565		match *self {
566			Self::NotScheduled => {
567				*self = Self::Scheduled(
568					revalidate_time_period.map(|period| Instant::now() + period),
569					revalidate_block_period.map(|period| block + period),
570				);
571				false
572			},
573			Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
574				let is_required =
575					revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
576						|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
577				if is_required {
578					*self = Self::InProgress;
579				}
580				is_required
581			},
582			Self::InProgress => false,
583		}
584	}
585}
586
587/// Prune the known txs from the given pool for the given block.
588///
589/// Returns the hashes of all transactions included in given block.
590pub async fn prune_known_txs_for_block<
591	Block: BlockT,
592	Api: graph::ChainApi<Block = Block>,
593	L: EventHandler<Api>,
594>(
595	at: &HashAndNumber<Block>,
596	api: &Api,
597	pool: &graph::Pool<Api, L>,
598	extrinsics: Option<Vec<RawExtrinsicFor<Api>>>,
599	known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<Api>, Vec<Tag>>>>,
600) -> Vec<ExtrinsicHash<Api>> {
601	let extrinsics = match extrinsics {
602		Some(xts) => xts,
603		None => api
604			.block_body(at.hash)
605			.await
606			.unwrap_or_else(|error| {
607				warn!(target: LOG_TARGET, ?error, "Prune known transactions: error request.");
608				None
609			})
610			.unwrap_or_default(),
611	};
612
613	let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
614
615	let header = match api.block_header(at.hash) {
616		Ok(Some(h)) => h,
617		Ok(None) => {
618			trace!(target: LOG_TARGET, hash = ?at.hash, "Could not find header.");
619			return hashes;
620		},
621		Err(error) => {
622			trace!(target: LOG_TARGET, hash = ?at.hash,  ?error, "Error retrieving header.");
623			return hashes;
624		},
625	};
626
627	log_xt_trace!(target: LOG_TARGET, &hashes, "Pruning transaction.");
628
629	pool.prune(at, *header.parent_hash(), &extrinsics, known_provides_tags).await;
630	hashes
631}
632
633impl<PoolApi, Block> BasicPool<PoolApi, Block>
634where
635	Block: BlockT,
636	PoolApi: 'static + graph::ChainApi<Block = Block>,
637{
638	/// Handles enactment and retraction of blocks, prunes stale transactions
639	/// (that have already been enacted) and resubmits transactions that were
640	/// retracted.
641	async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
642		trace!(target: LOG_TARGET, ?tree_route, "handle_enactment tree_route.");
643		let pool = self.pool.clone();
644		let api = self.api.clone();
645
646		let hash_and_number = match tree_route.last() {
647			Some(hash_and_number) => hash_and_number,
648			None => {
649				warn!(target: LOG_TARGET, ?tree_route, "Skipping ChainEvent - no last block in tree route.");
650				return;
651			},
652		};
653
654		let next_action = self.revalidation_strategy.lock().next(
655			hash_and_number.number,
656			Some(std::time::Duration::from_secs(60)),
657			Some(20u32.into()),
658		);
659
660		// We keep track of everything we prune so that later we won't add
661		// transactions with those hashes from the retracted blocks.
662		let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
663
664		// If there is a tree route, we use this to prune known tx based on the enacted
665		// blocks. Before pruning enacted transactions, we inform the listeners about
666		// retracted blocks and their transactions. This order is important, because
667		// if we enact and retract the same transaction at the same time, we want to
668		// send first the retract and then the prune event.
669		for retracted in tree_route.retracted() {
670			// notify txs awaiting finality that it has been retracted
671			pool.validated_pool().on_block_retracted(retracted.hash);
672		}
673
674		future::join_all(
675			tree_route
676				.enacted()
677				.iter()
678				.map(|h| prune_known_txs_for_block(h, &*api, &*pool, None, None)),
679		)
680		.await
681		.into_iter()
682		.for_each(|enacted_log| {
683			pruned_log.extend(enacted_log);
684		});
685
686		self.metrics
687			.report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
688
689		if next_action.resubmit {
690			let mut resubmit_transactions = Vec::new();
691
692			for retracted in tree_route.retracted() {
693				let hash = retracted.hash;
694
695				let block_transactions = api
696					.block_body(hash)
697					.await
698					.unwrap_or_else(|error| {
699						warn!(target: LOG_TARGET, ?error, "Failed to fetch block body.");
700						None
701					})
702					.unwrap_or_default()
703					.into_iter();
704
705				let mut resubmitted_to_report = 0;
706
707				resubmit_transactions.extend(
708					// todo: arctx - we need to get ref from somewhere
709					block_transactions.into_iter().map(Arc::from).filter_map(|tx| {
710						let tx_hash = pool.hash_of(&tx);
711						let contains = pruned_log.contains(&tx_hash);
712
713						// need to count all transactions, not just filtered, here
714						resubmitted_to_report += 1;
715
716						if !contains {
717							trace!(target: LOG_TARGET, ?tx_hash, ?hash, "Resubmitting from retracted block.");
718							Some((
719								// These transactions are coming from retracted blocks, we should
720								// simply consider them external.
721								TimedTransactionSource::new_external(false),
722								tx,
723							))
724						} else {
725							None
726						}
727					}),
728				);
729
730				self.metrics.report(|metrics| {
731					metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
732				});
733			}
734
735			pool.resubmit_at(
736				&hash_and_number,
737				resubmit_transactions,
738				ValidateTransactionPriority::Submitted,
739			)
740			.await;
741		}
742
743		let extra_pool = pool.clone();
744		// After #5200 lands, this arguably might be moved to the
745		// handler of "all blocks notification".
746		self.ready_poll
747			.lock()
748			.trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
749
750		if next_action.revalidate {
751			let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
752			self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
753
754			self.revalidation_strategy.lock().clear();
755		}
756	}
757}
758
759#[async_trait]
760impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
761where
762	Block: BlockT,
763	PoolApi: 'static + graph::ChainApi<Block = Block>,
764{
765	async fn maintain(&self, event: ChainEvent<Self::Block>) {
766		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
767		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
768			match self.api.tree_route(from, to) {
769				Ok(tree_route) => Ok(tree_route),
770				Err(e) => {
771					return Err(format!(
772						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
773					))
774				},
775			}
776		};
777		let block_id_to_number =
778			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
779
780		let result =
781			self.enactment_state
782				.lock()
783				.update(&event, &compute_tree_route, &block_id_to_number);
784
785		match result {
786			Err(error) => {
787				trace!(target: LOG_TARGET, %error, "enactment state update");
788				self.enactment_state.lock().force_update(&event);
789			},
790			Ok(EnactmentAction::Skip) => return,
791			Ok(EnactmentAction::HandleFinalization) => {},
792			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
793				self.handle_enactment(tree_route).await;
794			},
795		};
796
797		if let ChainEvent::Finalized { hash, tree_route } = event {
798			trace!(
799				target: LOG_TARGET,
800				?tree_route,
801				?prev_finalized_block,
802				"on-finalized enacted"
803			);
804
805			for hash in tree_route.iter().chain(std::iter::once(&hash)) {
806				if let Err(error) = self.pool.validated_pool().on_block_finalized(*hash).await {
807					warn!(
808						target: LOG_TARGET,
809						?hash,
810						?error,
811						"Error occurred while attempting to notify watchers about finalization"
812					);
813				}
814			}
815		}
816	}
817}