Skip to main content

sc_transaction_pool/single_state_txpool/
single_state_txpool.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate transaction pool implementation.
20
21use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
22pub use crate::{
23	api::FullChainApi,
24	graph::{ChainApi, ValidatedTransaction},
25};
26use crate::{
27	common::{
28		enactment_state::{EnactmentAction, EnactmentState},
29		error,
30		tracing_log_xt::log_xt_trace,
31	},
32	graph::{
33		self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator,
34		RawExtrinsicFor,
35	},
36	ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
37};
38use async_trait::async_trait;
39use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
40use parking_lot::Mutex;
41use prometheus_endpoint::Registry as PrometheusRegistry;
42use sc_transaction_pool_api::{
43	error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
44	PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
45	TxHash, TxInvalidityReportMap,
46};
47use sp_blockchain::{HashAndNumber, TreeRoute};
48use sp_core::traits::SpawnEssentialNamed;
49use sp_runtime::{
50	generic::BlockId,
51	traits::{
52		AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
53	},
54	transaction_validity::{TransactionTag as Tag, TransactionValidityError},
55};
56use std::{
57	collections::{HashMap, HashSet},
58	pin::Pin,
59	sync::Arc,
60	time::Instant,
61};
62use tokio::select;
63use tracing::{trace, warn};
64
65/// Basic implementation of transaction pool that can be customized by providing PoolApi.
66pub struct BasicPool<PoolApi, Block>
67where
68	Block: BlockT,
69	PoolApi: graph::ChainApi<Block = Block>,
70{
71	pool: Arc<graph::Pool<PoolApi, ()>>,
72	api: Arc<PoolApi>,
73	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
74	revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
75	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
76	metrics: PrometheusMetrics,
77	enactment_state: Arc<Mutex<EnactmentState<Block>>>,
78}
79
80struct ReadyPoll<T, Block: BlockT> {
81	updated_at: NumberFor<Block>,
82	pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
83}
84
85impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
86	fn default() -> Self {
87		Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
88	}
89}
90
91impl<T, Block: BlockT> ReadyPoll<T, Block> {
92	fn new(best_block_number: NumberFor<Block>) -> Self {
93		Self { updated_at: best_block_number, pollers: Default::default() }
94	}
95
96	fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
97		self.updated_at = number;
98
99		let mut idx = 0;
100		while idx < self.pollers.len() {
101			if self.pollers[idx].0 <= number {
102				let poller_sender = self.pollers.swap_remove(idx);
103				trace!(
104					target: LOG_TARGET,
105					?number,
106					"Sending ready signal."
107				);
108				let _ = poller_sender.1.send(iterator_factory());
109			} else {
110				idx += 1;
111			}
112		}
113	}
114
115	fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
116		let (sender, receiver) = oneshot::channel();
117		self.pollers.push((number, sender));
118		receiver
119	}
120
121	fn updated_at(&self) -> NumberFor<Block> {
122		self.updated_at
123	}
124}
125
126/// Type of revalidation.
127pub enum RevalidationType {
128	/// Light revalidation type.
129	///
130	/// During maintenance, transaction pool makes periodic revalidation
131	/// of all transactions depending on number of blocks or time passed.
132	/// Also this kind of revalidation does not resubmit transactions from
133	/// retracted blocks, since it is too expensive.
134	Light,
135
136	/// Full revalidation type.
137	///
138	/// During maintenance, transaction pool revalidates some fixed amount of
139	/// transactions from the pool of valid transactions.
140	Full,
141}
142
143impl<PoolApi, Block> BasicPool<PoolApi, Block>
144where
145	Block: BlockT,
146	PoolApi: graph::ChainApi<Block = Block> + 'static,
147{
148	/// Create new basic transaction pool with provided api, for tests.
149	pub fn new_test(
150		pool_api: Arc<PoolApi>,
151		best_block_hash: Block::Hash,
152		finalized_hash: Block::Hash,
153		options: graph::Options,
154	) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
155		let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
156			options,
157			true.into(),
158			pool_api.clone(),
159		));
160		let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
161			pool_api.clone(),
162			pool.clone(),
163			finalized_hash,
164		);
165		(
166			Self {
167				api: pool_api,
168				pool,
169				revalidation_queue: Arc::new(revalidation_queue),
170				revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
171				ready_poll: Default::default(),
172				metrics: Default::default(),
173				enactment_state: Arc::new(Mutex::new(EnactmentState::new(
174					best_block_hash,
175					finalized_hash,
176				))),
177			},
178			background_task,
179		)
180	}
181
182	/// Create new basic transaction pool with provided api and custom
183	/// revalidation type.
184	pub fn with_revalidation_type(
185		options: graph::Options,
186		is_validator: IsValidator,
187		pool_api: Arc<PoolApi>,
188		prometheus: Option<&PrometheusRegistry>,
189		revalidation_type: RevalidationType,
190		spawner: impl SpawnEssentialNamed,
191		best_block_number: NumberFor<Block>,
192		best_block_hash: Block::Hash,
193		finalized_hash: Block::Hash,
194	) -> Self {
195		let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
196			options,
197			is_validator,
198			pool_api.clone(),
199		));
200		let (revalidation_queue, background_task) = match revalidation_type {
201			RevalidationType::Light => {
202				(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None)
203			},
204			RevalidationType::Full => {
205				let (queue, background) = revalidation::RevalidationQueue::new_background(
206					pool_api.clone(),
207					pool.clone(),
208					finalized_hash,
209				);
210				(queue, Some(background))
211			},
212		};
213
214		if let Some(background_task) = background_task {
215			spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
216		}
217
218		Self {
219			api: pool_api,
220			pool,
221			revalidation_queue: Arc::new(revalidation_queue),
222			revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
223				RevalidationType::Light => {
224					RevalidationStrategy::Light(RevalidationStatus::NotScheduled)
225				},
226				RevalidationType::Full => RevalidationStrategy::Always,
227			})),
228			ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
229			metrics: PrometheusMetrics::new(prometheus),
230			enactment_state: Arc::new(Mutex::new(EnactmentState::new(
231				best_block_hash,
232				finalized_hash,
233			))),
234		}
235	}
236
237	/// Gets shared reference to the underlying pool.
238	pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> {
239		&self.pool
240	}
241
242	/// Get access to the underlying api
243	pub fn api(&self) -> &PoolApi {
244		&self.api
245	}
246
247	async fn ready_at_with_timeout_internal(
248		&self,
249		at: Block::Hash,
250		timeout: std::time::Duration,
251	) -> ReadyIteratorFor<PoolApi> {
252		select! {
253			ready = self.ready_at(at)=> ready,
254			_ = futures_timer::Delay::new(timeout)=> self.ready()
255		}
256	}
257}
258
259#[async_trait]
260impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
261where
262	Block: BlockT,
263	PoolApi: 'static + graph::ChainApi<Block = Block>,
264{
265	type Block = PoolApi::Block;
266	type Hash = graph::ExtrinsicHash<PoolApi>;
267	type InPoolTransaction =
268		graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
269	type Error = PoolApi::Error;
270
271	async fn submit_at(
272		&self,
273		at: <Self::Block as BlockT>::Hash,
274		source: TransactionSource,
275		xts: Vec<TransactionFor<Self>>,
276	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
277		let pool = self.pool.clone();
278		let xts = xts
279			.into_iter()
280			.map(|xt| {
281				(TimedTransactionSource::from_transaction_source(source, false), Arc::from(xt))
282			})
283			.collect::<Vec<_>>();
284
285		self.metrics
286			.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
287
288		let number = self.api.resolve_block_number(at);
289		let at = HashAndNumber { hash: at, number: number? };
290		Ok(pool
291			.submit_at(&at, xts, ValidateTransactionPriority::Submitted)
292			.await
293			.into_iter()
294			.map(|result| result.map(|outcome| outcome.hash()))
295			.collect())
296	}
297
298	async fn submit_one(
299		&self,
300		at: <Self::Block as BlockT>::Hash,
301		source: TransactionSource,
302		xt: TransactionFor<Self>,
303	) -> Result<TxHash<Self>, Self::Error> {
304		let pool = self.pool.clone();
305		let xt = Arc::from(xt);
306
307		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
308
309		let number = self.api.resolve_block_number(at);
310		let at = HashAndNumber { hash: at, number: number? };
311		pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
312			.await
313			.map(|outcome| outcome.hash())
314	}
315
316	async fn submit_and_watch(
317		&self,
318		at: <Self::Block as BlockT>::Hash,
319		source: TransactionSource,
320		xt: TransactionFor<Self>,
321	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
322		let pool = self.pool.clone();
323		let xt = Arc::from(xt);
324
325		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
326
327		let number = self.api.resolve_block_number(at);
328
329		let at = HashAndNumber { hash: at, number: number? };
330		pool.submit_and_watch(
331			&at,
332			TimedTransactionSource::from_transaction_source(source, false),
333			xt,
334		)
335		.await
336		.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
337	}
338
339	async fn report_invalid(
340		&self,
341		_at: Option<<Self::Block as BlockT>::Hash>,
342		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
343	) -> Vec<Arc<Self::InPoolTransaction>> {
344		let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
345		let removed = self.pool.validated_pool().remove_invalid(&hashes);
346		self.metrics
347			.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
348		removed
349	}
350
351	fn status(&self) -> PoolStatus {
352		self.pool.validated_pool().status()
353	}
354
355	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
356		self.pool.validated_pool().import_notification_stream()
357	}
358
359	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
360		self.pool.hash_of(xt)
361	}
362
363	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
364		self.pool.validated_pool().on_broadcasted(propagations)
365	}
366
367	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
368		self.pool.validated_pool().ready_by_hash(hash)
369	}
370
371	async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<PoolApi> {
372		let Ok(at) = self.api.resolve_block_number(at) else {
373			return Box::new(std::iter::empty()) as Box<_>;
374		};
375
376		let status = self.status();
377		// If there are no transactions in the pool, it is fine to return early.
378		//
379		// There could be transaction being added because of some re-org happening at the relevant
380		// block, but this is relative unlikely.
381		if status.ready == 0 && status.future == 0 {
382			return Box::new(std::iter::empty()) as Box<_>;
383		}
384
385		if self.ready_poll.lock().updated_at() >= at {
386			trace!(
387				target: LOG_TARGET,
388				?at,
389				"Transaction pool already processed block."
390			);
391			let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
392			return iterator;
393		}
394
395		let result = self.ready_poll.lock().add(at).map(|received| {
396			received.unwrap_or_else(|error| {
397				warn!(target: LOG_TARGET,  ?error, "Error receiving pending set.");
398				Box::new(std::iter::empty())
399			})
400		});
401
402		result.await
403	}
404
405	fn ready(&self) -> ReadyIteratorFor<PoolApi> {
406		Box::new(self.pool.validated_pool().ready())
407	}
408
409	fn futures(&self) -> Vec<Self::InPoolTransaction> {
410		let pool = self.pool.validated_pool().pool.read();
411		pool.futures().cloned().collect::<Vec<_>>()
412	}
413
414	async fn ready_at_with_timeout(
415		&self,
416		at: <Self::Block as BlockT>::Hash,
417		timeout: std::time::Duration,
418	) -> ReadyIteratorFor<PoolApi> {
419		self.ready_at_with_timeout_internal(at, timeout).await
420	}
421}
422
423impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
424where
425	Block: BlockT,
426	Client: sp_api::ProvideRuntimeApi<Block>
427		+ sc_client_api::BlockBackend<Block>
428		+ sc_client_api::blockchain::HeaderBackend<Block>
429		+ sp_runtime::traits::BlockIdTo<Block>
430		+ sc_client_api::ExecutorProvider<Block>
431		+ sc_client_api::UsageProvider<Block>
432		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
433		+ Send
434		+ Sync
435		+ 'static,
436	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
437{
438	/// Create new basic transaction pool for a full node with the provided api.
439	pub fn new_full(
440		options: graph::Options,
441		is_validator: IsValidator,
442		prometheus: Option<&PrometheusRegistry>,
443		spawner: impl SpawnEssentialNamed,
444		client: Arc<Client>,
445	) -> Self {
446		let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
447		let pool = Self::with_revalidation_type(
448			options,
449			is_validator,
450			pool_api,
451			prometheus,
452			RevalidationType::Full,
453			spawner,
454			client.usage_info().chain.best_number,
455			client.usage_info().chain.best_hash,
456			client.usage_info().chain.finalized_hash,
457		);
458
459		pool
460	}
461}
462
463impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
464	for BasicPool<FullChainApi<Client, Block>, Block>
465where
466	Block: BlockT,
467	Client: sp_api::ProvideRuntimeApi<Block>
468		+ sc_client_api::BlockBackend<Block>
469		+ sc_client_api::blockchain::HeaderBackend<Block>
470		+ sp_runtime::traits::BlockIdTo<Block>
471		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
472	Client: Send + Sync + 'static,
473	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
474{
475	type Block = Block;
476	type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
477	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
478
479	fn submit_local(
480		&self,
481		at: Block::Hash,
482		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
483	) -> Result<Self::Hash, Self::Error> {
484		let validity = self
485			.api
486			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
487			.map_err(|e| {
488				Self::Error::Pool(match e {
489					TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
490					TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
491				})
492			})?;
493
494		let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
495		let block_number = self
496			.api
497			.block_id_to_number(&BlockId::hash(at))?
498			.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
499
500		let validated = ValidatedTransaction::valid_at(
501			block_number.saturated_into::<u64>(),
502			hash,
503			TimedTransactionSource::new_local(false),
504			Arc::from(xt),
505			bytes,
506			validity,
507		);
508
509		self.pool
510			.validated_pool()
511			.submit(vec![validated])
512			.remove(0)
513			.map(|outcome| outcome.hash())
514	}
515}
516
517#[cfg_attr(test, derive(Debug))]
518enum RevalidationStatus<N> {
519	/// The revalidation has never been completed.
520	NotScheduled,
521	/// The revalidation is scheduled.
522	Scheduled(Option<Instant>, Option<N>),
523	/// The revalidation is in progress.
524	InProgress,
525}
526
527enum RevalidationStrategy<N> {
528	Always,
529	Light(RevalidationStatus<N>),
530}
531
532struct RevalidationAction {
533	revalidate: bool,
534	resubmit: bool,
535}
536
537impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
538	pub fn clear(&mut self) {
539		if let Self::Light(status) = self {
540			status.clear()
541		}
542	}
543
544	pub fn next(
545		&mut self,
546		block: N,
547		revalidate_time_period: Option<std::time::Duration>,
548		revalidate_block_period: Option<N>,
549	) -> RevalidationAction {
550		match self {
551			Self::Light(status) => RevalidationAction {
552				revalidate: status.next_required(
553					block,
554					revalidate_time_period,
555					revalidate_block_period,
556				),
557				resubmit: false,
558			},
559			Self::Always => RevalidationAction { revalidate: true, resubmit: true },
560		}
561	}
562}
563
564impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
565	/// Called when revalidation is completed.
566	pub fn clear(&mut self) {
567		*self = Self::NotScheduled;
568	}
569
570	/// Returns true if revalidation is required.
571	pub fn next_required(
572		&mut self,
573		block: N,
574		revalidate_time_period: Option<std::time::Duration>,
575		revalidate_block_period: Option<N>,
576	) -> bool {
577		match *self {
578			Self::NotScheduled => {
579				*self = Self::Scheduled(
580					revalidate_time_period.map(|period| Instant::now() + period),
581					revalidate_block_period.map(|period| block + period),
582				);
583				false
584			},
585			Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
586				let is_required =
587					revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
588						revalidate_at_block.map(|at| block >= at).unwrap_or(false);
589				if is_required {
590					*self = Self::InProgress;
591				}
592				is_required
593			},
594			Self::InProgress => false,
595		}
596	}
597}
598
599/// Prune the known txs from the given pool for the given block.
600///
601/// Returns the hashes of all transactions included in given block.
602pub async fn prune_known_txs_for_block<
603	Block: BlockT,
604	Api: graph::ChainApi<Block = Block>,
605	L: EventHandler<Api>,
606>(
607	at: &HashAndNumber<Block>,
608	api: &Api,
609	pool: &graph::Pool<Api, L>,
610	extrinsics: Option<Vec<RawExtrinsicFor<Api>>>,
611	known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<Api>, Vec<Tag>>>>,
612) -> Vec<ExtrinsicHash<Api>> {
613	let extrinsics = match extrinsics {
614		Some(xts) => xts,
615		None => api
616			.block_body(at.hash)
617			.await
618			.unwrap_or_else(|error| {
619				warn!(target: LOG_TARGET, ?error, "Prune known transactions: error request.");
620				None
621			})
622			.unwrap_or_default(),
623	};
624
625	let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
626
627	let header = match api.block_header(at.hash) {
628		Ok(Some(h)) => h,
629		Ok(None) => {
630			trace!(target: LOG_TARGET, hash = ?at.hash, "Could not find header.");
631			return hashes;
632		},
633		Err(error) => {
634			trace!(target: LOG_TARGET, hash = ?at.hash,  ?error, "Error retrieving header.");
635			return hashes;
636		},
637	};
638
639	log_xt_trace!(target: LOG_TARGET, &hashes, "Pruning transaction.");
640
641	pool.prune(at, *header.parent_hash(), &extrinsics, known_provides_tags).await;
642	hashes
643}
644
645impl<PoolApi, Block> BasicPool<PoolApi, Block>
646where
647	Block: BlockT,
648	PoolApi: 'static + graph::ChainApi<Block = Block>,
649{
650	/// Handles enactment and retraction of blocks, prunes stale transactions
651	/// (that have already been enacted) and resubmits transactions that were
652	/// retracted.
653	async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
654		trace!(target: LOG_TARGET, ?tree_route, "handle_enactment tree_route.");
655		let pool = self.pool.clone();
656		let api = self.api.clone();
657
658		let hash_and_number = match tree_route.last() {
659			Some(hash_and_number) => hash_and_number,
660			None => {
661				warn!(target: LOG_TARGET, ?tree_route, "Skipping ChainEvent - no last block in tree route.");
662				return;
663			},
664		};
665
666		let next_action = self.revalidation_strategy.lock().next(
667			hash_and_number.number,
668			Some(std::time::Duration::from_secs(60)),
669			Some(20u32.into()),
670		);
671
672		// We keep track of everything we prune so that later we won't add
673		// transactions with those hashes from the retracted blocks.
674		let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
675
676		// If there is a tree route, we use this to prune known tx based on the enacted
677		// blocks. Before pruning enacted transactions, we inform the listeners about
678		// retracted blocks and their transactions. This order is important, because
679		// if we enact and retract the same transaction at the same time, we want to
680		// send first the retract and then the prune event.
681		for retracted in tree_route.retracted() {
682			// notify txs awaiting finality that it has been retracted
683			pool.validated_pool().on_block_retracted(retracted.hash);
684		}
685
686		future::join_all(
687			tree_route
688				.enacted()
689				.iter()
690				.map(|h| prune_known_txs_for_block(h, &*api, &*pool, None, None)),
691		)
692		.await
693		.into_iter()
694		.for_each(|enacted_log| {
695			pruned_log.extend(enacted_log);
696		});
697
698		self.metrics
699			.report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
700
701		if next_action.resubmit {
702			let mut resubmit_transactions = Vec::new();
703
704			for retracted in tree_route.retracted() {
705				let hash = retracted.hash;
706
707				let block_transactions = api
708					.block_body(hash)
709					.await
710					.unwrap_or_else(|error| {
711						warn!(target: LOG_TARGET, ?error, "Failed to fetch block body.");
712						None
713					})
714					.unwrap_or_default()
715					.into_iter();
716
717				let mut resubmitted_to_report = 0;
718
719				resubmit_transactions.extend(
720					// todo: arctx - we need to get ref from somewhere
721					block_transactions.into_iter().map(Arc::from).filter_map(|tx| {
722						let tx_hash = pool.hash_of(&tx);
723						let contains = pruned_log.contains(&tx_hash);
724
725						// need to count all transactions, not just filtered, here
726						resubmitted_to_report += 1;
727
728						if !contains {
729							trace!(target: LOG_TARGET, ?tx_hash, ?hash, "Resubmitting from retracted block.");
730							Some((
731								// These transactions are coming from retracted blocks, we should
732								// simply consider them external.
733								TimedTransactionSource::new_external(false),
734								tx,
735							))
736						} else {
737							None
738						}
739					}),
740				);
741
742				self.metrics.report(|metrics| {
743					metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
744				});
745			}
746
747			pool.resubmit_at(
748				&hash_and_number,
749				resubmit_transactions,
750				ValidateTransactionPriority::Submitted,
751			)
752			.await;
753		}
754
755		let extra_pool = pool.clone();
756		// After #5200 lands, this arguably might be moved to the
757		// handler of "all blocks notification".
758		self.ready_poll
759			.lock()
760			.trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
761
762		if next_action.revalidate {
763			let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
764			self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
765
766			self.revalidation_strategy.lock().clear();
767		}
768	}
769}
770
771#[async_trait]
772impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
773where
774	Block: BlockT,
775	PoolApi: 'static + graph::ChainApi<Block = Block>,
776{
777	async fn maintain(&self, event: ChainEvent<Self::Block>) {
778		let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
779		let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
780			match self.api.tree_route(from, to) {
781				Ok(tree_route) => Ok(tree_route),
782				Err(e) => {
783					return Err(format!(
784						"Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
785					))
786				},
787			}
788		};
789		let block_id_to_number =
790			|hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
791
792		let result =
793			self.enactment_state
794				.lock()
795				.update(&event, &compute_tree_route, &block_id_to_number);
796
797		match result {
798			Err(error) => {
799				trace!(target: LOG_TARGET, %error, "enactment state update");
800				self.enactment_state.lock().force_update(&event);
801			},
802			Ok(EnactmentAction::Skip) => return,
803			Ok(EnactmentAction::HandleFinalization) => {},
804			Ok(EnactmentAction::HandleEnactment(tree_route)) => {
805				self.handle_enactment(tree_route).await;
806			},
807		};
808
809		if let ChainEvent::Finalized { hash, tree_route } = event {
810			trace!(
811				target: LOG_TARGET,
812				?tree_route,
813				?prev_finalized_block,
814				"on-finalized enacted"
815			);
816
817			for hash in tree_route.iter().chain(std::iter::once(&hash)) {
818				if let Err(error) = self.pool.validated_pool().on_block_finalized(*hash).await {
819					warn!(
820						target: LOG_TARGET,
821						?hash,
822						?error,
823						"Error occurred while attempting to notify watchers about finalization"
824					);
825				}
826			}
827		}
828	}
829}