Skip to main content

soil_txpool/graph/
pool.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
8use async_trait::async_trait;
9use futures::channel::mpsc::Receiver;
10use indexmap::IndexMap;
11use soil_client::blockchain::{HashAndNumber, TreeRoute};
12use soil_client::transaction_pool::error;
13use std::{
14	collections::HashMap,
15	sync::Arc,
16	time::{Duration, Instant},
17};
18use subsoil::runtime::{
19	generic::BlockId,
20	traits::{self, Block as BlockT, SaturatedConversion},
21	transaction_validity::{
22		TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
23	},
24};
25use tracing::{debug, instrument, trace, Level};
26
27use super::{
28	base_pool as base,
29	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
30	EventHandler, ValidatedPoolSubmitOutcome,
31};
32
33/// Modification notification event stream type;
34pub type EventStream<H> = Receiver<H>;
35
36/// Block hash type for a pool.
37pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
38/// Extrinsic hash type for a pool.
39pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
40/// Extrinsic type for a pool (reference counted).
41pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
42/// Extrinsic type for a pool (raw data).
43pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
44/// Block number type for the ChainApi
45pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
46/// A type of transaction stored in the pool
47pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
48/// A type of validated transaction stored in the pool.
49pub type ValidatedTransactionFor<A> =
50	ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
51
52/// The priority of request to validate the transaction.
53#[derive(PartialEq, Copy, Clone)]
54pub enum ValidateTransactionPriority {
55	/// Validate the newly submitted transactions
56	///
57	/// Validation will be done with lower priority.
58	Submitted,
59	/// Validate the transaction during maintainance process,
60	///
61	/// Validation will be performed with higher priority.
62	Maintained,
63}
64
65/// Concrete extrinsic validation and query logic.
66#[async_trait]
67pub trait ChainApi: Send + Sync {
68	/// Block type.
69	type Block: BlockT;
70	/// Error type.
71	type Error: From<error::Error> + error::IntoPoolError + error::IntoMetricsLabel;
72
73	/// Asynchronously verify extrinsic at given block.
74	async fn validate_transaction(
75		&self,
76		at: <Self::Block as BlockT>::Hash,
77		source: TransactionSource,
78		uxt: ExtrinsicFor<Self>,
79		validation_priority: ValidateTransactionPriority,
80	) -> Result<TransactionValidity, Self::Error>;
81
82	/// Synchronously verify given extrinsic at given block.
83	///
84	/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
85	/// blocks the current thread when performing validation.
86	fn validate_transaction_blocking(
87		&self,
88		at: <Self::Block as BlockT>::Hash,
89		source: TransactionSource,
90		uxt: ExtrinsicFor<Self>,
91	) -> Result<TransactionValidity, Self::Error>;
92
93	/// Returns a block number given the block id.
94	fn block_id_to_number(
95		&self,
96		at: &BlockId<Self::Block>,
97	) -> Result<Option<NumberFor<Self>>, Self::Error>;
98
99	/// Returns a block hash given the block id.
100	fn block_id_to_hash(
101		&self,
102		at: &BlockId<Self::Block>,
103	) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
104
105	/// Returns hash and encoding length of the extrinsic.
106	fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
107
108	/// Returns a block body given the block.
109	async fn block_body(
110		&self,
111		at: <Self::Block as BlockT>::Hash,
112	) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
113
114	/// Returns a block header given the block id.
115	fn block_header(
116		&self,
117		at: <Self::Block as BlockT>::Hash,
118	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
119
120	/// Compute a tree-route between two blocks. See [`TreeRoute`] for more details.
121	fn tree_route(
122		&self,
123		from: <Self::Block as BlockT>::Hash,
124		to: <Self::Block as BlockT>::Hash,
125	) -> Result<TreeRoute<Self::Block>, Self::Error>;
126
127	/// Resolves block number by id.
128	fn resolve_block_number(
129		&self,
130		at: <Self::Block as BlockT>::Hash,
131	) -> Result<NumberFor<Self>, Self::Error> {
132		self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
133			number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
134		})
135	}
136}
137
138/// Pool configuration options.
139#[derive(Debug, Clone)]
140pub struct Options {
141	/// Ready queue limits.
142	pub ready: base::Limit,
143	/// Future queue limits.
144	pub future: base::Limit,
145	/// Reject future transactions.
146	pub reject_future_transactions: bool,
147	/// How long the extrinsic is banned for.
148	pub ban_time: Duration,
149}
150
151impl Default for Options {
152	fn default() -> Self {
153		Self {
154			ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
155			future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
156			reject_future_transactions: false,
157			ban_time: Duration::from_secs(60 * 30),
158		}
159	}
160}
161
162impl Options {
163	/// Total (ready+future) maximal number of transactions in the pool.
164	pub fn total_count(&self) -> usize {
165		self.ready.count + self.future.count
166	}
167}
168
169/// Should we check that the transaction is banned
170/// in the pool, before we verify it?
171#[derive(Copy, Clone)]
172pub(crate) enum CheckBannedBeforeVerify {
173	Yes,
174	No,
175}
176
177/// Extrinsics pool that performs validation.
178pub struct Pool<B: ChainApi, L: EventHandler<B>> {
179	validated_pool: Arc<ValidatedPool<B, L>>,
180}
181
182impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
183	/// Create a new transaction pool with statically sized rotator.
184	pub fn new_with_staticly_sized_rotator(
185		options: Options,
186		is_validator: IsValidator,
187		api: Arc<B>,
188	) -> Self {
189		Self {
190			validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
191				options,
192				is_validator,
193				api,
194			)),
195		}
196	}
197
198	/// Create a new transaction pool.
199	pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
200		Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
201	}
202
203	/// Create a new transaction pool.
204	pub fn new_with_event_handler(
205		options: Options,
206		is_validator: IsValidator,
207		api: Arc<B>,
208		event_handler: L,
209	) -> Self {
210		Self {
211			validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
212				options,
213				is_validator,
214				api,
215				event_handler,
216			)),
217		}
218	}
219
220	/// Imports a bunch of unverified extrinsics to the pool
221	#[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
222	pub async fn submit_at(
223		&self,
224		at: &HashAndNumber<B::Block>,
225		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
226		validation_priority: ValidateTransactionPriority,
227	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
228		let validated_transactions =
229			self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
230		self.validated_pool.submit(validated_transactions.into_values())
231	}
232
233	/// Resubmit the given extrinsics to the pool.
234	///
235	/// This does not check if a transaction is banned, before we verify it again.
236	pub async fn resubmit_at(
237		&self,
238		at: &HashAndNumber<B::Block>,
239		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
240		validation_priority: ValidateTransactionPriority,
241	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
242		let validated_transactions =
243			self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
244		self.validated_pool.submit(validated_transactions.into_values())
245	}
246
247	/// Imports one unverified extrinsic to the pool
248	pub async fn submit_one(
249		&self,
250		at: &HashAndNumber<B::Block>,
251		source: base::TimedTransactionSource,
252		xt: ExtrinsicFor<B>,
253	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
254		let res = self
255			.submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
256			.await
257			.pop();
258		res.expect("One extrinsic passed; one result returned; qed")
259	}
260
261	/// Import a single extrinsic and starts to watch its progress in the pool.
262	pub async fn submit_and_watch(
263		&self,
264		at: &HashAndNumber<B::Block>,
265		source: base::TimedTransactionSource,
266		xt: ExtrinsicFor<B>,
267	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
268		let (_, tx) = self
269			.verify_one(
270				at.hash,
271				at.number,
272				source,
273				xt,
274				CheckBannedBeforeVerify::Yes,
275				ValidateTransactionPriority::Submitted,
276			)
277			.await;
278		self.validated_pool.submit_and_watch(tx)
279	}
280
281	/// Resubmit some transaction that were validated elsewhere.
282	pub fn resubmit(
283		&self,
284		revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
285	) {
286		let now = Instant::now();
287		self.validated_pool.resubmit(revalidated_transactions);
288		trace!(
289			target: LOG_TARGET,
290			duration = ?now.elapsed(),
291			status = ?self.validated_pool.status(),
292			"Resubmitted transaction."
293		);
294	}
295
296	/// Prunes known ready transactions.
297	///
298	/// Used to clear the pool from transactions that were part of recently imported block.
299	/// The main difference from the `prune` is that we do not revalidate any transactions
300	/// and ignore unknown passed hashes.
301	pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
302		// Get details of all extrinsics that are already in the pool
303		let in_pool_tags =
304			self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
305
306		// Prune all transactions that provide given tags
307		let prune_status = self.validated_pool.prune_tags(in_pool_tags);
308		let pruned_transactions =
309			hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
310		self.validated_pool.fire_pruned(at, pruned_transactions);
311	}
312
313	/// Prunes ready transactions.
314	///
315	/// Used to clear the pool from transactions that were part of recently imported block.
316	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
317	/// into runtime too often we first look up all extrinsics that are in the pool and get
318	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
319	pub async fn prune(
320		&self,
321		at: &HashAndNumber<B::Block>,
322		parent: <B::Block as BlockT>::Hash,
323		extrinsics: &[RawExtrinsicFor<B>],
324		known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
325	) {
326		debug!(
327			target: LOG_TARGET,
328			?at,
329			extrinsics_count = extrinsics.len(),
330			"Starting pruning of block."
331		);
332		// Get details of all extrinsics that are already in the pool
333		let in_pool_hashes =
334			extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
335		let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
336		// Fill unknown tags based on the known tags given in `known_provides_tags`.
337		let mut unknown_txs_count = 0usize;
338		let mut reused_txs_count = 0usize;
339		let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
340			tags.or_else(|| {
341				unknown_txs_count += 1;
342				known_provides_tags.as_ref().and_then(|inner| {
343					inner.get(&tx_hash).map(|found_tags| {
344						reused_txs_count += 1;
345						found_tags.clone()
346					})
347				})
348			})
349		});
350
351		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic,
352		// Option<Vec<Tag>>)`)
353		let all = extrinsics.iter().zip(tags);
354		let mut validated_counter: usize = 0;
355		let mut future_tags = Vec::new();
356		let now = Instant::now();
357		for (extrinsic, in_pool_tags) in all {
358			match in_pool_tags {
359				// reuse the tags for extrinsics that were found in the pool or given in
360				// `known_provides_tags` cache.
361				Some(tags) => future_tags.extend(tags),
362				// if it's not found in the pool query the runtime at parent block
363				// to get validity info and tags that the extrinsic provides.
364				None => {
365					// Avoid validating block txs if the pool is empty
366					if !self.validated_pool.status().is_empty() {
367						validated_counter = validated_counter + 1;
368						let validity = self
369							.validated_pool
370							.api()
371							.validate_transaction(
372								parent,
373								TransactionSource::InBlock,
374								Arc::from(extrinsic.clone()),
375								ValidateTransactionPriority::Maintained,
376							)
377							.await;
378
379						trace!(
380							target: LOG_TARGET,
381							tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
382							?validity,
383							"prune::revalidated"
384						);
385						if let Ok(Ok(validity)) = validity {
386							future_tags.extend(validity.provides);
387						}
388					} else {
389						trace!(
390							target: LOG_TARGET,
391							?at,
392							"txpool is empty, skipping validation for block",
393						);
394					}
395				},
396			}
397		}
398
399		let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
400		debug!(
401			target: LOG_TARGET,
402			validated_counter,
403			known_provides_tags_len,
404			unknown_txs_count,
405			reused_txs_count,
406			duration = ?now.elapsed(),
407			"prune"
408		);
409		self.prune_tags(at, future_tags, in_pool_hashes).await
410	}
411
412	/// Prunes ready transactions that provide given list of tags.
413	///
414	/// Given tags are assumed to be always provided now, so all transactions
415	/// in the Future Queue that require that particular tag (and have other
416	/// requirements satisfied) are promoted to Ready Queue.
417	///
418	/// Moreover for each provided tag we remove transactions in the pool that:
419	/// 1. Provide that tag directly
420	/// 2. Are a dependency of pruned transaction.
421	///
422	/// Returns transactions that have been removed from the pool and must be reverified
423	/// before reinserting to the pool.
424	///
425	/// By removing predecessor transactions as well we might actually end up
426	/// pruning too much, so all removed transactions are reverified against
427	/// the runtime (`validate_transaction`) to make sure they are invalid.
428	///
429	/// However we avoid revalidating transactions that are contained within
430	/// the second parameter of `known_imported_hashes`. These transactions
431	/// (if pruned) are not revalidated and become temporarily banned to
432	/// prevent importing them in the (near) future.
433	pub async fn prune_tags(
434		&self,
435		at: &HashAndNumber<B::Block>,
436		tags: impl IntoIterator<Item = Tag>,
437		known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
438	) {
439		let now = Instant::now();
440		trace!(target: LOG_TARGET, ?at, "Pruning tags.");
441		// Prune all transactions that provide given tags
442		let prune_status = self.validated_pool.prune_tags(tags);
443
444		// Make sure that we don't revalidate extrinsics that were part of the recently
445		// imported block. This is especially important for UTXO-like chains cause the
446		// inputs are pruned so such transaction would go to future again.
447		self.validated_pool
448			.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
449
450		// Try to re-validate pruned transactions since some of them might be still valid.
451		// note that `known_imported_hashes` will be rejected here due to temporary ban.
452		let pruned_transactions =
453			prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
454
455		let reverified_transactions = self
456			.verify(
457				at,
458				pruned_transactions,
459				CheckBannedBeforeVerify::Yes,
460				ValidateTransactionPriority::Maintained,
461			)
462			.await;
463
464		let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
465		debug!(
466			target: LOG_TARGET,
467			?at,
468			reverified_transactions = reverified_transactions.len(),
469			duration = ?now.elapsed(),
470			"Pruned. Resubmitting transactions."
471		);
472		log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
473
474		// And finally - submit reverified transactions back to the pool
475		self.validated_pool.resubmit_pruned(
476			&at,
477			known_imported_hashes,
478			pruned_hashes,
479			reverified_transactions.into_values().collect(),
480		)
481	}
482
483	/// Returns transaction hash
484	pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
485		self.validated_pool.api().hash_and_length(xt).0
486	}
487
488	/// Returns future that validates a bunch of transactions at given block.
489	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
490	async fn verify(
491		&self,
492		at: &HashAndNumber<B::Block>,
493		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
494		check: CheckBannedBeforeVerify,
495		validation_priority: ValidateTransactionPriority,
496	) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
497		let HashAndNumber { number, hash } = *at;
498
499		let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
500			self.verify_one(hash, number, source, xt, check, validation_priority)
501		}))
502		.await
503		.into_iter()
504		.collect::<IndexMap<_, _>>();
505
506		res
507	}
508
509	/// Returns future that validates single transaction at given block.
510	#[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
511	pub(crate) async fn verify_one(
512		&self,
513		block_hash: <B::Block as BlockT>::Hash,
514		block_number: NumberFor<B>,
515		source: base::TimedTransactionSource,
516		xt: ExtrinsicFor<B>,
517		check: CheckBannedBeforeVerify,
518		validation_priority: ValidateTransactionPriority,
519	) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
520		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
521
522		let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
523		if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
524			return (hash, ValidatedTransaction::Invalid(hash, err));
525		}
526
527		let validation_result = self
528			.validated_pool
529			.api()
530			.validate_transaction(
531				block_hash,
532				source.clone().into(),
533				xt.clone(),
534				validation_priority,
535			)
536			.await;
537
538		let status = match validation_result {
539			Ok(status) => status,
540			Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
541		};
542
543		let validity = match status {
544			Ok(validity) => {
545				if validity.provides.is_empty() {
546					ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
547				} else {
548					ValidatedTransaction::valid_at(
549						block_number.saturated_into::<u64>(),
550						hash,
551						source,
552						xt,
553						bytes,
554						validity,
555					)
556				}
557			},
558			Err(TransactionValidityError::Invalid(e)) => {
559				ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into())
560			},
561			Err(TransactionValidityError::Unknown(e)) => {
562				ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into())
563			},
564		};
565
566		(hash, validity)
567	}
568
569	/// Get a reference to the underlying validated pool.
570	pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
571		&self.validated_pool
572	}
573
574	/// Clears the recently pruned transactions in validated pool.
575	pub fn clear_recently_pruned(&mut self) {
576		self.validated_pool.pool.write().clear_recently_pruned();
577	}
578}
579
580impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
581	/// Deep clones the pool.
582	///
583	/// Must be called on purpose: it duplicates all the internal structures.
584	pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
585		let other: ValidatedPool<B, L> =
586			self.validated_pool().deep_clone_with_event_handler(event_handler);
587		Self { validated_pool: Arc::from(other) }
588	}
589}
590
591#[cfg(test)]
592mod tests {
593	use super::{super::base_pool::Limit, *};
594	use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
595	use assert_matches::assert_matches;
596	use base::TimedTransactionSource;
597	use codec::Encode;
598	use futures::executor::block_on;
599	use parking_lot::Mutex;
600	use soil_client::transaction_pool::TransactionStatus;
601	use std::{collections::HashMap, time::Instant};
602	use subsoil::runtime::transaction_validity::TransactionSource;
603	use soil_test_node_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
604	use soil_test_node_runtime_client::Sr25519Keyring::{Alice, Bob};
605
606	const SOURCE: TimedTransactionSource =
607		TimedTransactionSource { source: TransactionSource::External, timestamp: None };
608
609	type Pool<Api> = super::Pool<Api, ()>;
610
611	#[test]
612	fn should_validate_and_import_transaction() {
613		// given
614		let (pool, api) = pool();
615
616		// when
617		let hash = block_on(
618			pool.submit_one(
619				&api.expect_hash_and_number(0),
620				SOURCE,
621				uxt(Transfer {
622					from: Alice.into(),
623					to: AccountId::from_h256(H256::from_low_u64_be(2)),
624					amount: 5,
625					nonce: 0,
626				})
627				.into(),
628			),
629		)
630		.map(|outcome| outcome.hash())
631		.unwrap();
632
633		// then
634		assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
635	}
636
637	#[test]
638	fn submit_at_preserves_order() {
639		subsoil::tracing::try_init_simple();
640		// given
641		let (pool, api) = pool();
642
643		let txs = (0..10)
644			.map(|i| {
645				uxt(Transfer {
646					from: Alice.into(),
647					to: AccountId::from_h256(H256::from_low_u64_be(i)),
648					amount: 5,
649					nonce: i,
650				})
651				.into()
652			})
653			.collect::<Vec<_>>();
654
655		let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
656
657		// when
658		let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
659		let hashes = block_on(pool.submit_at(
660			&api.expect_hash_and_number(0),
661			txs,
662			ValidateTransactionPriority::Submitted,
663		))
664		.into_iter()
665		.map(|r| r.map(|o| o.hash()))
666		.collect::<Vec<_>>();
667		debug!(hashes = ?hashes, "-->");
668
669		// then
670		hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
671			|(result_hash, initial_hash)| {
672				assert_eq!(result_hash.unwrap(), initial_hash);
673			},
674		);
675	}
676
677	#[test]
678	fn should_reject_if_temporarily_banned() {
679		// given
680		let (pool, api) = pool();
681		let uxt = uxt(Transfer {
682			from: Alice.into(),
683			to: AccountId::from_h256(H256::from_low_u64_be(2)),
684			amount: 5,
685			nonce: 0,
686		});
687
688		// when
689		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
690		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
691			.map(|o| o.hash());
692		assert_eq!(pool.validated_pool().status().ready, 0);
693		assert_eq!(pool.validated_pool().status().future, 0);
694
695		// then
696		assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
697	}
698
699	#[test]
700	fn should_reject_unactionable_transactions() {
701		// given
702		let api = Arc::new(TestApi::default());
703		let pool = Pool::new_with_staticly_sized_rotator(
704			Default::default(),
705			// the node does not author blocks
706			false.into(),
707			api.clone(),
708		);
709
710		// after validation `IncludeData` will be set to non-propagable (validate_transaction mock)
711		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
712
713		// when
714		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
715			.map(|o| o.hash());
716
717		// then
718		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
719	}
720
721	#[test]
722	fn should_notify_about_pool_events() {
723		let (stream, hash0, hash1) = {
724			// given
725			let (pool, api) = pool();
726			let han_of_block0 = api.expect_hash_and_number(0);
727			let stream = pool.validated_pool().import_notification_stream();
728
729			// when
730			let hash0 = block_on(
731				pool.submit_one(
732					&han_of_block0,
733					SOURCE,
734					uxt(Transfer {
735						from: Alice.into(),
736						to: AccountId::from_h256(H256::from_low_u64_be(2)),
737						amount: 5,
738						nonce: 0,
739					})
740					.into(),
741				),
742			)
743			.unwrap()
744			.hash();
745			let hash1 = block_on(
746				pool.submit_one(
747					&han_of_block0,
748					SOURCE,
749					uxt(Transfer {
750						from: Alice.into(),
751						to: AccountId::from_h256(H256::from_low_u64_be(2)),
752						amount: 5,
753						nonce: 1,
754					})
755					.into(),
756				),
757			)
758			.unwrap()
759			.hash();
760			// future doesn't count
761			let _hash = block_on(
762				pool.submit_one(
763					&han_of_block0,
764					SOURCE,
765					uxt(Transfer {
766						from: Alice.into(),
767						to: AccountId::from_h256(H256::from_low_u64_be(2)),
768						amount: 5,
769						nonce: 3,
770					})
771					.into(),
772				),
773			)
774			.unwrap()
775			.hash();
776
777			assert_eq!(pool.validated_pool().status().ready, 2);
778			assert_eq!(pool.validated_pool().status().future, 1);
779
780			(stream, hash0, hash1)
781		};
782
783		// then
784		let mut it = futures::executor::block_on_stream(stream);
785		assert_eq!(it.next(), Some(hash0));
786		assert_eq!(it.next(), Some(hash1));
787		assert_eq!(it.next(), None);
788	}
789
790	#[test]
791	fn should_clear_stale_transactions() {
792		// given
793		let (pool, api) = pool();
794		let han_of_block0 = api.expect_hash_and_number(0);
795		let hash1 = block_on(
796			pool.submit_one(
797				&han_of_block0,
798				SOURCE,
799				uxt(Transfer {
800					from: Alice.into(),
801					to: AccountId::from_h256(H256::from_low_u64_be(2)),
802					amount: 5,
803					nonce: 0,
804				})
805				.into(),
806			),
807		)
808		.unwrap()
809		.hash();
810		let hash2 = block_on(
811			pool.submit_one(
812				&han_of_block0,
813				SOURCE,
814				uxt(Transfer {
815					from: Alice.into(),
816					to: AccountId::from_h256(H256::from_low_u64_be(2)),
817					amount: 5,
818					nonce: 1,
819				})
820				.into(),
821			),
822		)
823		.unwrap()
824		.hash();
825		let hash3 = block_on(
826			pool.submit_one(
827				&han_of_block0,
828				SOURCE,
829				uxt(Transfer {
830					from: Alice.into(),
831					to: AccountId::from_h256(H256::from_low_u64_be(2)),
832					amount: 5,
833					nonce: 3,
834				})
835				.into(),
836			),
837		)
838		.unwrap()
839		.hash();
840
841		// when
842		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
843
844		// then
845		assert_eq!(pool.validated_pool().ready().count(), 0);
846		assert_eq!(pool.validated_pool().status().future, 0);
847		assert_eq!(pool.validated_pool().status().ready, 0);
848		// make sure they are temporarily banned as well
849		assert!(pool.validated_pool.is_banned(&hash1));
850		assert!(pool.validated_pool.is_banned(&hash2));
851		assert!(pool.validated_pool.is_banned(&hash3));
852	}
853
854	#[test]
855	fn should_ban_mined_transactions() {
856		// given
857		let (pool, api) = pool();
858		let hash1 = block_on(
859			pool.submit_one(
860				&api.expect_hash_and_number(0),
861				SOURCE,
862				uxt(Transfer {
863					from: Alice.into(),
864					to: AccountId::from_h256(H256::from_low_u64_be(2)),
865					amount: 5,
866					nonce: 0,
867				})
868				.into(),
869			),
870		)
871		.unwrap()
872		.hash();
873
874		// when
875		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
876
877		// then
878		assert!(pool.validated_pool.is_banned(&hash1));
879	}
880
881	#[test]
882	fn should_limit_futures() {
883		subsoil::tracing::try_init_simple();
884
885		let xt = uxt(Transfer {
886			from: Alice.into(),
887			to: AccountId::from_h256(H256::from_low_u64_be(2)),
888			amount: 5,
889			nonce: 1,
890		});
891
892		// given
893		let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
894
895		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
896
897		let api = Arc::new(TestApi::default());
898		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
899
900		let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
901			.unwrap()
902			.hash();
903		assert_eq!(pool.validated_pool().status().future, 1);
904
905		// when
906		let hash2 = block_on(
907			pool.submit_one(
908				&api.expect_hash_and_number(0),
909				SOURCE,
910				uxt(Transfer {
911					from: Bob.into(),
912					to: AccountId::from_h256(H256::from_low_u64_be(2)),
913					amount: 5,
914					nonce: 10,
915				})
916				.into(),
917			),
918		)
919		.unwrap()
920		.hash();
921
922		// then
923		assert_eq!(pool.validated_pool().status().future, 1);
924		assert!(pool.validated_pool.is_banned(&hash1));
925		assert!(!pool.validated_pool.is_banned(&hash2));
926	}
927
928	#[test]
929	fn should_error_if_reject_immediately() {
930		// given
931		let limit = Limit { count: 100, total_bytes: 10 };
932
933		let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
934
935		let api = Arc::new(TestApi::default());
936		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
937
938		// when
939		block_on(
940			pool.submit_one(
941				&api.expect_hash_and_number(0),
942				SOURCE,
943				uxt(Transfer {
944					from: Alice.into(),
945					to: AccountId::from_h256(H256::from_low_u64_be(2)),
946					amount: 5,
947					nonce: 1,
948				})
949				.into(),
950			),
951		)
952		.map(|o| o.hash())
953		.unwrap_err();
954
955		// then
956		assert_eq!(pool.validated_pool().status().ready, 0);
957		assert_eq!(pool.validated_pool().status().future, 0);
958	}
959
960	#[test]
961	fn should_reject_transactions_with_no_provides() {
962		// given
963		let (pool, api) = pool();
964
965		// when
966		let err = block_on(
967			pool.submit_one(
968				&api.expect_hash_and_number(0),
969				SOURCE,
970				uxt(Transfer {
971					from: Alice.into(),
972					to: AccountId::from_h256(H256::from_low_u64_be(2)),
973					amount: 5,
974					nonce: INVALID_NONCE,
975				})
976				.into(),
977			),
978		)
979		.map(|o| o.hash())
980		.unwrap_err();
981
982		// then
983		assert_eq!(pool.validated_pool().status().ready, 0);
984		assert_eq!(pool.validated_pool().status().future, 0);
985		assert_matches!(err, error::Error::NoTagsProvided);
986	}
987
988	mod listener {
989		use super::*;
990
991		#[test]
992		fn should_trigger_ready_and_finalized() {
993			// given
994			let (pool, api) = pool();
995			let watcher = block_on(
996				pool.submit_and_watch(
997					&api.expect_hash_and_number(0),
998					SOURCE,
999					uxt(Transfer {
1000						from: Alice.into(),
1001						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1002						amount: 5,
1003						nonce: 0,
1004					})
1005					.into(),
1006				),
1007			)
1008			.unwrap()
1009			.expect_watcher();
1010			assert_eq!(pool.validated_pool().status().ready, 1);
1011			assert_eq!(pool.validated_pool().status().future, 0);
1012
1013			let han_of_block2 = api.expect_hash_and_number(2);
1014
1015			// when
1016			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1017			assert_eq!(pool.validated_pool().status().ready, 0);
1018			assert_eq!(pool.validated_pool().status().future, 0);
1019
1020			// then
1021			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1022			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1023			assert_eq!(
1024				stream.next(),
1025				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1026			);
1027		}
1028
1029		#[test]
1030		fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1031			// given
1032			let (pool, api) = pool();
1033			let watcher = block_on(
1034				pool.submit_and_watch(
1035					&api.expect_hash_and_number(0),
1036					SOURCE,
1037					uxt(Transfer {
1038						from: Alice.into(),
1039						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1040						amount: 5,
1041						nonce: 0,
1042					})
1043					.into(),
1044				),
1045			)
1046			.unwrap()
1047			.expect_watcher();
1048			assert_eq!(pool.validated_pool().status().ready, 1);
1049			assert_eq!(pool.validated_pool().status().future, 0);
1050
1051			let han_of_block2 = api.expect_hash_and_number(2);
1052
1053			// when
1054			block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1055			assert_eq!(pool.validated_pool().status().ready, 0);
1056			assert_eq!(pool.validated_pool().status().future, 0);
1057
1058			// then
1059			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1060			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1061			assert_eq!(
1062				stream.next(),
1063				Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1064			);
1065		}
1066
1067		#[test]
1068		fn should_trigger_future_and_ready_after_promoted() {
1069			// given
1070			let (pool, api) = pool();
1071			let han_of_block0 = api.expect_hash_and_number(0);
1072
1073			let watcher = block_on(
1074				pool.submit_and_watch(
1075					&han_of_block0,
1076					SOURCE,
1077					uxt(Transfer {
1078						from: Alice.into(),
1079						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1080						amount: 5,
1081						nonce: 1,
1082					})
1083					.into(),
1084				),
1085			)
1086			.unwrap()
1087			.expect_watcher();
1088			assert_eq!(pool.validated_pool().status().ready, 0);
1089			assert_eq!(pool.validated_pool().status().future, 1);
1090
1091			// when
1092			block_on(
1093				pool.submit_one(
1094					&han_of_block0,
1095					SOURCE,
1096					uxt(Transfer {
1097						from: Alice.into(),
1098						to: AccountId::from_h256(H256::from_low_u64_be(2)),
1099						amount: 5,
1100						nonce: 0,
1101					})
1102					.into(),
1103				),
1104			)
1105			.unwrap();
1106			assert_eq!(pool.validated_pool().status().ready, 2);
1107
1108			// then
1109			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1110			assert_eq!(stream.next(), Some(TransactionStatus::Future));
1111			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1112		}
1113
1114		#[test]
1115		fn should_trigger_invalid_and_ban() {
1116			// given
1117			let (pool, api) = pool();
1118			let uxt = uxt(Transfer {
1119				from: Alice.into(),
1120				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1121				amount: 5,
1122				nonce: 0,
1123			});
1124			let watcher =
1125				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1126					.unwrap()
1127					.expect_watcher();
1128			assert_eq!(pool.validated_pool().status().ready, 1);
1129
1130			// when
1131			pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1132
1133			// then
1134			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1135			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1136			assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1137			assert_eq!(stream.next(), None);
1138		}
1139
1140		#[test]
1141		fn should_trigger_broadcasted() {
1142			// given
1143			let (pool, api) = pool();
1144			let uxt = uxt(Transfer {
1145				from: Alice.into(),
1146				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1147				amount: 5,
1148				nonce: 0,
1149			});
1150			let watcher =
1151				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1152					.unwrap()
1153					.expect_watcher();
1154			assert_eq!(pool.validated_pool().status().ready, 1);
1155
1156			// when
1157			let mut map = HashMap::new();
1158			let peers = vec!["a".into(), "b".into(), "c".into()];
1159			map.insert(*watcher.hash(), peers.clone());
1160			pool.validated_pool().on_broadcasted(map);
1161
1162			// then
1163			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1164			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1165			assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1166		}
1167
1168		#[test]
1169		fn should_trigger_dropped_older() {
1170			// given
1171			let limit = Limit { count: 1, total_bytes: 1000 };
1172			let options =
1173				Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1174
1175			let api = Arc::new(TestApi::default());
1176			let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1177
1178			let xt = uxt(Transfer {
1179				from: Alice.into(),
1180				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1181				amount: 5,
1182				nonce: 0,
1183			});
1184			let watcher =
1185				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1186					.unwrap()
1187					.expect_watcher();
1188			assert_eq!(pool.validated_pool().status().ready, 1);
1189
1190			// when
1191			let xt = uxt(Transfer {
1192				from: Bob.into(),
1193				to: AccountId::from_h256(H256::from_low_u64_be(1)),
1194				amount: 4,
1195				nonce: 1,
1196			});
1197			block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1198			assert_eq!(pool.validated_pool().status().ready, 1);
1199
1200			// then
1201			let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1202			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1203			assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1204		}
1205
1206		#[test]
1207		fn should_trigger_dropped_lower_priority() {
1208			{
1209				// given
1210				let limit = Limit { count: 1, total_bytes: 1000 };
1211				let options =
1212					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1213
1214				let api = Arc::new(TestApi::default());
1215				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1216
1217				// after validation `IncludeData` will have priority set to 9001
1218				// (validate_transaction mock)
1219				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1220				block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1221					.unwrap();
1222				assert_eq!(pool.validated_pool().status().ready, 1);
1223
1224				// then
1225				// after validation `Transfer` will have priority set to 4 (validate_transaction
1226				// mock)
1227				let xt = uxt(Transfer {
1228					from: Bob.into(),
1229					to: AccountId::from_h256(H256::from_low_u64_be(1)),
1230					amount: 4,
1231					nonce: 1,
1232				});
1233				let result =
1234					block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1235				assert!(matches!(
1236					result,
1237					Err(soil_client::transaction_pool::error::Error::ImmediatelyDropped)
1238				));
1239			}
1240			{
1241				// given
1242				let limit = Limit { count: 2, total_bytes: 1000 };
1243				let options =
1244					Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1245
1246				let api = Arc::new(TestApi::default());
1247				let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1248
1249				let han_of_block0 = api.expect_hash_and_number(0);
1250
1251				// after validation `IncludeData` will have priority set to 9001
1252				// (validate_transaction mock)
1253				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1254				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1255					.unwrap()
1256					.expect_watcher();
1257				assert_eq!(pool.validated_pool().status().ready, 1);
1258
1259				// after validation `Transfer` will have priority set to 4 (validate_transaction
1260				// mock)
1261				let xt = uxt(Transfer {
1262					from: Alice.into(),
1263					to: AccountId::from_h256(H256::from_low_u64_be(2)),
1264					amount: 5,
1265					nonce: 0,
1266				});
1267				let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1268					.unwrap()
1269					.expect_watcher();
1270				assert_eq!(pool.validated_pool().status().ready, 2);
1271
1272				// when
1273				// after validation `Store` will have priority set to 9001 (validate_transaction
1274				// mock)
1275				let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1276				block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1277					.unwrap();
1278				assert_eq!(pool.validated_pool().status().ready, 2);
1279
1280				// then
1281				let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1282				assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1283				assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1284			}
1285		}
1286
1287		#[test]
1288		fn should_handle_pruning_in_the_middle_of_import() {
1289			// given
1290			let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1291			let (tx, rx) = std::sync::mpsc::sync_channel(1);
1292			let mut api = TestApi::default();
1293			api.delay = Arc::new(Mutex::new(rx.into()));
1294			let api = Arc::new(api);
1295			let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1296				Default::default(),
1297				true.into(),
1298				api.clone(),
1299			));
1300
1301			let han_of_block0 = api.expect_hash_and_number(0);
1302
1303			// when
1304			let xt = uxt(Transfer {
1305				from: Alice.into(),
1306				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1307				amount: 5,
1308				nonce: 1,
1309			});
1310
1311			// This transaction should go to future, since we use `nonce: 1`
1312			let pool2 = pool.clone();
1313			std::thread::spawn({
1314				let hash_of_block0 = han_of_block0.clone();
1315				move || {
1316					block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1317					ready.send(()).unwrap();
1318				}
1319			});
1320
1321			// But now before the previous one is imported we import
1322			// the one that it depends on.
1323			let xt = uxt(Transfer {
1324				from: Alice.into(),
1325				to: AccountId::from_h256(H256::from_low_u64_be(2)),
1326				amount: 4,
1327				nonce: 0,
1328			});
1329			// The tag the above transaction provides (TestApi is using just nonce as u8)
1330			let provides = vec![0_u8];
1331			block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1332			assert_eq!(pool.validated_pool().status().ready, 1);
1333
1334			// Now block import happens before the second transaction is able to finish
1335			// verification.
1336			block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1337			assert_eq!(pool.validated_pool().status().ready, 0);
1338
1339			// so when we release the verification of the previous one it will have
1340			// something in `requires`, but should go to ready directly, since the previous
1341			// transaction was imported correctly.
1342			tx.send(()).unwrap();
1343
1344			// then
1345			is_ready.recv().unwrap(); // wait for finish
1346			assert_eq!(pool.validated_pool().status().ready, 1);
1347			assert_eq!(pool.validated_pool().status().future, 0);
1348		}
1349	}
1350}