Skip to main content

soil_txpool/graph/
base_pool.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! A basic version of the dependency graph.
8//!
9//! For a more full-featured pool, have a look at the `pool` module.
10
11use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
12
13use crate::LOG_TARGET;
14use serde::Serialize;
15use soil_client::transaction_pool::{error, InPoolTransaction, PoolStatus};
16use subsoil::core::hexdisplay::HexDisplay;
17use subsoil::runtime::{
18	traits::Member,
19	transaction_validity::{
20		TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
21		TransactionTag as Tag,
22	},
23};
24use tracing::{trace, warn};
25
26use super::{
27	future::{FutureTransactions, WaitingTransaction},
28	ready::{BestIterator, ReadyTransactions, TransactionRef},
29};
30
31/// Successful import result.
32#[derive(Debug, PartialEq, Eq)]
33pub enum Imported<Hash, Ex> {
34	/// Transaction was successfully imported to Ready queue.
35	Ready {
36		/// Hash of transaction that was successfully imported.
37		hash: Hash,
38		/// Transactions that got promoted from the Future queue.
39		promoted: Vec<Hash>,
40		/// Transactions that failed to be promoted from the Future queue and are now discarded.
41		failed: Vec<Hash>,
42		/// Transactions removed from the Ready pool (replaced).
43		removed: Vec<Arc<Transaction<Hash, Ex>>>,
44	},
45	/// Transaction was successfully imported to Future queue.
46	Future {
47		/// Hash of transaction that was successfully imported.
48		hash: Hash,
49	},
50}
51
52impl<Hash, Ex> Imported<Hash, Ex> {
53	/// Returns the hash of imported transaction.
54	pub fn hash(&self) -> &Hash {
55		use self::Imported::*;
56		match *self {
57			Ready { ref hash, .. } => hash,
58			Future { ref hash, .. } => hash,
59		}
60	}
61}
62
63/// Status of pruning the queue.
64#[derive(Debug)]
65pub struct PruneStatus<Hash, Ex> {
66	/// A list of imports that satisfying the tag triggered.
67	pub promoted: Vec<Imported<Hash, Ex>>,
68	/// A list of transactions that failed to be promoted and now are discarded.
69	pub failed: Vec<Hash>,
70	/// A list of transactions that got pruned from the ready queue.
71	pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
72}
73
74/// A transaction source that includes a timestamp indicating when the transaction was submitted.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct TimedTransactionSource {
77	/// The original source of the transaction.
78	pub source: TransactionSource,
79
80	/// The time at which the transaction was submitted.
81	pub timestamp: Option<Instant>,
82}
83
84impl From<TimedTransactionSource> for TransactionSource {
85	fn from(value: TimedTransactionSource) -> Self {
86		value.source
87	}
88}
89
90impl TimedTransactionSource {
91	/// Creates a new instance with an internal `TransactionSource::InBlock` source and an optional
92	/// timestamp.
93	pub fn new_in_block(with_timestamp: bool) -> Self {
94		Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
95	}
96	/// Creates a new instance with an internal `TransactionSource::External` source and an optional
97	/// timestamp.
98	pub fn new_external(with_timestamp: bool) -> Self {
99		Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
100	}
101	/// Creates a new instance with an internal `TransactionSource::Local` source and an optional
102	/// timestamp.
103	pub fn new_local(with_timestamp: bool) -> Self {
104		Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
105	}
106	/// Creates a new instance with an given source and an optional timestamp.
107	pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
108		Self { source, timestamp: with_timestamp.then(Instant::now) }
109	}
110}
111
112/// Immutable transaction
113#[derive(PartialEq, Eq, Clone)]
114pub struct Transaction<Hash, Extrinsic> {
115	/// Raw extrinsic representing that transaction.
116	pub data: Extrinsic,
117	/// Number of bytes encoding of the transaction requires.
118	pub bytes: usize,
119	/// Transaction hash (unique)
120	pub hash: Hash,
121	/// Transaction priority (higher = better)
122	pub priority: Priority,
123	/// At which block the transaction becomes invalid?
124	pub valid_till: Longevity,
125	/// Tags required by the transaction.
126	pub requires: Vec<Tag>,
127	/// Tags that this transaction provides.
128	pub provides: Vec<Tag>,
129	/// Should that transaction be propagated.
130	pub propagate: bool,
131	/// Timed source of that transaction.
132	pub source: TimedTransactionSource,
133}
134
135impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
136	fn as_ref(&self) -> &Extrinsic {
137		&self.data
138	}
139}
140
141impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
142	type Transaction = Extrinsic;
143	type Hash = Hash;
144
145	fn data(&self) -> &Extrinsic {
146		&self.data
147	}
148
149	fn hash(&self) -> &Hash {
150		&self.hash
151	}
152
153	fn priority(&self) -> &Priority {
154		&self.priority
155	}
156
157	fn longevity(&self) -> &Longevity {
158		&self.valid_till
159	}
160
161	fn requires(&self) -> &[Tag] {
162		&self.requires
163	}
164
165	fn provides(&self) -> &[Tag] {
166		&self.provides
167	}
168
169	fn is_propagable(&self) -> bool {
170		self.propagate
171	}
172}
173
174impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
175	/// Explicit transaction clone.
176	///
177	/// Transaction should be cloned only if absolutely necessary && we want
178	/// every reason to be commented. That's why we `Transaction` is not `Clone`,
179	/// but there's explicit `duplicate` method.
180	pub fn duplicate(&self) -> Self {
181		Self {
182			data: self.data.clone(),
183			bytes: self.bytes,
184			hash: self.hash.clone(),
185			priority: self.priority,
186			source: self.source.clone(),
187			valid_till: self.valid_till,
188			requires: self.requires.clone(),
189			provides: self.provides.clone(),
190			propagate: self.propagate,
191		}
192	}
193}
194
195impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
196where
197	Hash: fmt::Debug,
198	Extrinsic: fmt::Debug,
199{
200	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
201		let join_tags = |tags: &[Tag]| {
202			tags.iter()
203				.map(|tag| HexDisplay::from(tag).to_string())
204				.collect::<Vec<_>>()
205				.join(", ")
206		};
207
208		write!(fmt, "Transaction {{ ")?;
209		write!(fmt, "hash: {:?}, ", &self.hash)?;
210		write!(fmt, "priority: {:?}, ", &self.priority)?;
211		write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
212		write!(fmt, "bytes: {:?}, ", &self.bytes)?;
213		write!(fmt, "propagate: {:?}, ", &self.propagate)?;
214		write!(fmt, "source: {:?}, ", &self.source)?;
215		write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
216		write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
217		write!(fmt, "data: {:?}", &self.data)?;
218		write!(fmt, "}}")?;
219		Ok(())
220	}
221}
222
223/// Store last pruned tags for given number of invocations.
224const RECENTLY_PRUNED_TAGS: usize = 2;
225
226/// Transaction pool.
227///
228/// Builds a dependency graph for all transactions in the pool and returns
229/// the ones that are currently ready to be executed.
230///
231/// General note:
232/// If function returns some transactions it usually means that importing them
233/// as-is for the second time will fail or produce unwanted results.
234/// Most likely it is required to revalidate them and recompute set of
235/// required tags.
236#[derive(Clone, Debug)]
237pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
238	reject_future_transactions: bool,
239	future: FutureTransactions<Hash, Ex>,
240	ready: ReadyTransactions<Hash, Ex>,
241	/// Store recently pruned tags (for last two invocations).
242	///
243	/// This is used to make sure we don't accidentally put
244	/// transactions to future in case they were just stuck in verification.
245	recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
246	recently_pruned_index: usize,
247}
248
249impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
250	fn default() -> Self {
251		Self::new(false)
252	}
253}
254
255impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
256	/// Create new pool given reject_future_transactions flag.
257	pub fn new(reject_future_transactions: bool) -> Self {
258		Self {
259			reject_future_transactions,
260			future: Default::default(),
261			ready: Default::default(),
262			recently_pruned: Default::default(),
263			recently_pruned_index: 0,
264		}
265	}
266
267	/// Clears buffer keeping recently pruned transaction.
268	pub fn clear_recently_pruned(&mut self) {
269		self.recently_pruned = Default::default();
270		self.recently_pruned_index = 0;
271	}
272
273	/// Temporary enables future transactions, runs closure and then restores
274	/// `reject_future_transactions` flag back to previous value.
275	///
276	/// The closure accepts the mutable reference to the pool and original value
277	/// of the `reject_future_transactions` flag.
278	pub(crate) fn with_futures_enabled<T>(
279		&mut self,
280		closure: impl FnOnce(&mut Self, bool) -> T,
281	) -> T {
282		let previous = self.reject_future_transactions;
283		self.reject_future_transactions = false;
284		let return_value = closure(self, previous);
285		self.reject_future_transactions = previous;
286		return_value
287	}
288
289	/// Returns if the transaction for the given hash is already imported.
290	pub fn is_imported(&self, tx_hash: &Hash) -> bool {
291		self.future.contains(tx_hash) || self.ready.contains(tx_hash)
292	}
293
294	/// Imports transaction to the pool.
295	///
296	/// The pool consists of two parts: Future and Ready.
297	/// The former contains transactions that require some tags that are not yet provided by
298	/// other transactions in the pool.
299	/// The latter contains transactions that have all the requirements satisfied and are
300	/// ready to be included in the block.
301	pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
302		if self.is_imported(&tx.hash) {
303			return Err(error::Error::AlreadyImported(Box::new(tx.hash)));
304		}
305
306		let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
307		trace!(
308			target: LOG_TARGET,
309			tx_hash = ?tx.transaction.hash,
310			?tx,
311			set = if tx.is_ready() { "ready" } else { "future" },
312			"Importing transaction"
313		);
314
315		// If all tags are not satisfied import to future.
316		if !tx.is_ready() {
317			if self.reject_future_transactions {
318				return Err(error::Error::RejectedFutureTransaction);
319			}
320
321			let hash = tx.transaction.hash.clone();
322			self.future.import(tx);
323			return Ok(Imported::Future { hash });
324		}
325
326		self.import_to_ready(tx)
327	}
328
329	/// Imports transaction to ready queue.
330	///
331	/// NOTE the transaction has to have all requirements satisfied.
332	fn import_to_ready(
333		&mut self,
334		tx: WaitingTransaction<Hash, Ex>,
335	) -> error::Result<Imported<Hash, Ex>> {
336		let tx_hash = tx.transaction.hash.clone();
337		let mut promoted = vec![];
338		let mut failed = vec![];
339		let mut removed = vec![];
340
341		let mut first = true;
342		let mut to_import = vec![tx];
343
344		// take first transaction from the list
345		while let Some(tx) = to_import.pop() {
346			// find transactions in Future that it unlocks
347			to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
348
349			// import this transaction
350			let current_hash = tx.transaction.hash.clone();
351			let current_tx = tx.transaction.clone();
352			match self.ready.import(tx) {
353				Ok(mut replaced) => {
354					if !first {
355						promoted.push(current_hash.clone());
356					}
357					// If there were conflicting future transactions promoted, removed them from
358					// promoted set.
359					promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
360					// The transactions were removed from the ready pool. We might attempt to
361					// re-import them.
362					removed.append(&mut replaced);
363				},
364				Err(error @ error::Error::TooLowPriority { .. }) => {
365					trace!(
366						target: LOG_TARGET,
367						tx_hash = ?current_tx.hash,
368						?first,
369						%error,
370						"Error importing transaction"
371					);
372					if first {
373						return Err(error);
374					} else {
375						removed.push(current_tx);
376						promoted.retain(|hash| *hash != current_hash);
377					}
378				},
379				// transaction failed to be imported.
380				Err(error) => {
381					trace!(
382						target: LOG_TARGET,
383						tx_hash = ?current_tx.hash,
384						?error,
385						first,
386						"Error importing transaction"
387					);
388					if first {
389						return Err(error);
390					} else {
391						failed.push(current_tx.hash.clone());
392					}
393				},
394			}
395			first = false;
396		}
397
398		// An edge case when importing transaction caused
399		// some future transactions to be imported and that
400		// future transactions pushed out current transaction.
401		// This means that there is a cycle and the transactions should
402		// be moved back to future, since we can't resolve it.
403		if removed.iter().any(|tx| tx.hash == tx_hash) {
404			// We still need to remove all transactions that we promoted
405			// since they depend on each other and will never get to the best iterator.
406			self.ready.remove_subtree(&promoted);
407
408			trace!(
409				target: LOG_TARGET,
410				?tx_hash,
411				"Cycle detected, bailing."
412			);
413			return Err(error::Error::CycleDetected);
414		}
415
416		Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
417	}
418
419	/// Returns an iterator over ready transactions in the pool.
420	pub fn ready(&self) -> BestIterator<Hash, Ex> {
421		self.ready.get()
422	}
423
424	/// Returns an iterator over future transactions in the pool.
425	pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
426		self.future.all()
427	}
428
429	/// Returns pool transactions given list of hashes.
430	///
431	/// Includes both ready and future pool. For every hash in the `hashes`
432	/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
433	pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
434		let ready = self.ready.by_hashes(hashes);
435		let future = self.future.by_hashes(hashes);
436
437		ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
438	}
439
440	/// Returns pool transaction by hash.
441	pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
442		self.ready.by_hash(hash)
443	}
444
445	/// Makes sure that the transactions in the queues stay within provided limits.
446	///
447	/// Removes and returns worst transactions from the queues and all transactions that depend on
448	/// them. Technically the worst transaction should be evaluated by computing the entire pending
449	/// set. We use a simplified approach to remove transactions with the lowest priority first or
450	/// those that occupy the pool for the longest time in case priority is the same.
451	pub fn enforce_limits(
452		&mut self,
453		ready: &Limit,
454		future: &Limit,
455	) -> Vec<Arc<Transaction<Hash, Ex>>> {
456		let mut removed = vec![];
457
458		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
459			// find the worst transaction
460			let worst =
461				self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
462					let transaction = &current.transaction;
463					worst
464						.map(|worst| {
465							// Here we don't use `TransactionRef`'s ordering implementation because
466							// while it prefers priority like need here, it also prefers older
467							// transactions for inclusion purposes and limit enforcement needs to
468							// prefer newer transactions instead and drop the older ones.
469							match worst.transaction.priority.cmp(&transaction.transaction.priority)
470							{
471								Ordering::Less => worst,
472								Ordering::Equal => {
473									if worst.insertion_id > transaction.insertion_id {
474										transaction.clone()
475									} else {
476										worst
477									}
478								},
479								Ordering::Greater => transaction.clone(),
480							}
481						})
482						.or_else(|| Some(transaction.clone()))
483				});
484
485			if let Some(worst) = worst {
486				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
487			} else {
488				break;
489			}
490		}
491
492		while future.is_exceeded(self.future.len(), self.future.bytes()) {
493			// find the worst transaction
494			let worst = self.future.fold(|worst, current| match worst {
495				None => Some(current.clone()),
496				Some(worst) => Some(
497					match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
498					{
499						(Some(worst_timestamp), Some(current_timestamp)) => {
500							if worst_timestamp > current_timestamp {
501								current.clone()
502							} else {
503								worst
504							}
505						},
506						_ => {
507							if worst.imported_at > current.imported_at {
508								current.clone()
509							} else {
510								worst
511							}
512						},
513					},
514				),
515			});
516
517			if let Some(worst) = worst {
518				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
519			} else {
520				break;
521			}
522		}
523
524		removed
525	}
526
527	/// Removes all transactions represented by the hashes and all other transactions
528	/// that depend on them.
529	///
530	/// Returns a list of actually removed transactions.
531	/// NOTE some transactions might still be valid, but were just removed because
532	/// they were part of a chain, you may attempt to re-import them later.
533	/// NOTE If you want to remove ready transactions that were already used,
534	/// and you don't want them to be stored in the pool use `prune_tags` method.
535	pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
536		let mut removed = self.ready.remove_subtree(hashes);
537		removed.extend(self.future.remove(hashes));
538		removed
539	}
540
541	/// Removes and returns all transactions from the future queue.
542	pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
543		self.future.clear()
544	}
545
546	/// Prunes transactions that provide given list of tags.
547	///
548	/// This will cause all transactions (both ready and future) that provide these tags to be
549	/// removed from the pool, but unlike `remove_subtree`, dependent transactions are not touched.
550	/// Additional transactions from future queue might be promoted to ready if you satisfy tags
551	/// that the pool didn't previously know about.
552	pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
553		let mut to_import = vec![];
554		let mut pruned = vec![];
555		let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
556		self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
557		recently_pruned.clear();
558
559		let tags = tags.into_iter().collect::<Vec<_>>();
560		let futures_removed = self.future.prune_tags(&tags);
561
562		for tag in tags {
563			// make sure to promote any future transactions that could be unlocked
564			to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
565			// and actually prune transactions in ready queue
566			pruned.append(&mut self.ready.prune_tags(tag.clone()));
567			// store the tags for next submission
568			recently_pruned.insert(tag);
569		}
570
571		let mut promoted = vec![];
572		let mut failed = vec![];
573		for tx in futures_removed {
574			failed.push(tx.hash.clone());
575		}
576
577		for tx in to_import {
578			let tx_hash = tx.transaction.hash.clone();
579			match self.import_to_ready(tx) {
580				Ok(res) => promoted.push(res),
581				Err(error) => {
582					warn!(
583						target: LOG_TARGET,
584						?tx_hash,
585						?error,
586						"Failed to promote during pruning."
587					);
588					failed.push(tx_hash)
589				},
590			}
591		}
592
593		PruneStatus { pruned, failed, promoted }
594	}
595
596	/// Get pool status.
597	pub fn status(&self) -> PoolStatus {
598		PoolStatus {
599			ready: self.ready.len(),
600			ready_bytes: self.ready.bytes(),
601			future: self.future.len(),
602			future_bytes: self.future.bytes(),
603		}
604	}
605}
606
607/// Queue limits
608#[derive(Debug, Clone)]
609pub struct Limit {
610	/// Maximal number of transactions in the queue.
611	pub count: usize,
612	/// Maximal size of encodings of all transactions in the queue.
613	pub total_bytes: usize,
614}
615
616impl Limit {
617	/// Returns true if any of the provided values exceeds the limit.
618	pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
619		self.count < count || self.total_bytes < bytes
620	}
621}
622
623#[cfg(test)]
624mod tests {
625	use super::*;
626
627	type Hash = u64;
628
629	fn pool() -> BasePool<Hash, Vec<u8>> {
630		BasePool::default()
631	}
632
633	fn default_tx() -> Transaction<Hash, Vec<u8>> {
634		Transaction {
635			data: vec![],
636			bytes: 1,
637			hash: 1u64,
638			priority: 5u64,
639			valid_till: 64u64,
640			requires: vec![],
641			provides: vec![],
642			propagate: true,
643			source: TimedTransactionSource::new_external(false),
644		}
645	}
646
647	#[test]
648	fn prune_for_ready_works() {
649		// given
650		let mut pool = pool();
651
652		// when
653		pool.import(Transaction {
654			data: vec![1u8].into(),
655			provides: vec![vec![2]],
656			..default_tx().clone()
657		})
658		.unwrap();
659
660		// then
661		assert_eq!(pool.ready().count(), 1);
662		assert_eq!(pool.ready.len(), 1);
663
664		let result = pool.prune_tags(vec![vec![2]]);
665		assert_eq!(pool.ready().count(), 0);
666		assert_eq!(pool.ready.len(), 0);
667		assert_eq!(result.pruned.len(), 1);
668		assert_eq!(result.failed.len(), 0);
669		assert_eq!(result.promoted.len(), 0);
670	}
671
672	#[test]
673	fn prune_for_future_works() {
674		// given
675		let mut pool = pool();
676
677		// when
678		pool.import(Transaction {
679			data: vec![1u8].into(),
680			requires: vec![vec![1]],
681			provides: vec![vec![2]],
682			hash: 0xaa,
683			..default_tx().clone()
684		})
685		.unwrap();
686
687		// then
688		assert_eq!(pool.futures().count(), 1);
689		assert_eq!(pool.future.len(), 1);
690
691		let result = pool.prune_tags(vec![vec![2]]);
692		assert_eq!(pool.ready().count(), 0);
693		assert_eq!(pool.ready.len(), 0);
694		assert_eq!(pool.futures().count(), 0);
695		assert_eq!(pool.future.len(), 0);
696
697		assert_eq!(result.pruned.len(), 0);
698		assert_eq!(result.failed.len(), 1);
699		assert_eq!(result.failed[0], 0xaa);
700		assert_eq!(result.promoted.len(), 0);
701	}
702
703	#[test]
704	fn should_import_transaction_to_ready() {
705		// given
706		let mut pool = pool();
707
708		// when
709		pool.import(Transaction {
710			data: vec![1u8].into(),
711			provides: vec![vec![1]],
712			..default_tx().clone()
713		})
714		.unwrap();
715
716		// then
717		assert_eq!(pool.ready().count(), 1);
718		assert_eq!(pool.ready.len(), 1);
719	}
720
721	#[test]
722	fn should_not_import_same_transaction_twice() {
723		// given
724		let mut pool = pool();
725
726		// when
727		pool.import(Transaction {
728			data: vec![1u8].into(),
729			provides: vec![vec![1]],
730			..default_tx().clone()
731		})
732		.unwrap();
733		pool.import(Transaction {
734			data: vec![1u8].into(),
735			provides: vec![vec![1]],
736			..default_tx().clone()
737		})
738		.unwrap_err();
739
740		// then
741		assert_eq!(pool.ready().count(), 1);
742		assert_eq!(pool.ready.len(), 1);
743	}
744
745	#[test]
746	fn should_import_transaction_to_future_and_promote_it_later() {
747		// given
748		let mut pool = pool();
749
750		// when
751		pool.import(Transaction {
752			data: vec![1u8].into(),
753			requires: vec![vec![0]],
754			provides: vec![vec![1]],
755			..default_tx().clone()
756		})
757		.unwrap();
758		assert_eq!(pool.ready().count(), 0);
759		assert_eq!(pool.ready.len(), 0);
760		pool.import(Transaction {
761			data: vec![2u8].into(),
762			hash: 2,
763			provides: vec![vec![0]],
764			..default_tx().clone()
765		})
766		.unwrap();
767
768		// then
769		assert_eq!(pool.ready().count(), 2);
770		assert_eq!(pool.ready.len(), 2);
771	}
772
773	#[test]
774	fn should_promote_a_subgraph() {
775		// given
776		let mut pool = pool();
777
778		// when
779		pool.import(Transaction {
780			data: vec![1u8].into(),
781			requires: vec![vec![0]],
782			provides: vec![vec![1]],
783			..default_tx().clone()
784		})
785		.unwrap();
786		pool.import(Transaction {
787			data: vec![3u8].into(),
788			hash: 3,
789			requires: vec![vec![2]],
790			..default_tx().clone()
791		})
792		.unwrap();
793		pool.import(Transaction {
794			data: vec![2u8].into(),
795			hash: 2,
796			requires: vec![vec![1]],
797			provides: vec![vec![3], vec![2]],
798			..default_tx().clone()
799		})
800		.unwrap();
801		pool.import(Transaction {
802			data: vec![4u8].into(),
803			hash: 4,
804			priority: 1_000u64,
805			requires: vec![vec![3], vec![4]],
806			..default_tx().clone()
807		})
808		.unwrap();
809		assert_eq!(pool.ready().count(), 0);
810		assert_eq!(pool.ready.len(), 0);
811
812		let res = pool
813			.import(Transaction {
814				data: vec![5u8].into(),
815				hash: 5,
816				provides: vec![vec![0], vec![4]],
817				..default_tx().clone()
818			})
819			.unwrap();
820
821		// then
822		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
823
824		assert_eq!(it.next(), Some(5));
825		assert_eq!(it.next(), Some(1));
826		assert_eq!(it.next(), Some(2));
827		assert_eq!(it.next(), Some(4));
828		assert_eq!(it.next(), Some(3));
829		assert_eq!(it.next(), None);
830		assert_eq!(
831			res,
832			Imported::Ready {
833				hash: 5,
834				promoted: vec![1, 2, 3, 4],
835				failed: vec![],
836				removed: vec![],
837			}
838		);
839	}
840
841	#[test]
842	fn should_remove_conflicting_future() {
843		let mut pool = pool();
844		pool.import(Transaction {
845			data: vec![3u8].into(),
846			hash: 3,
847			requires: vec![vec![1]],
848			priority: 50u64,
849			provides: vec![vec![3]],
850			..default_tx().clone()
851		})
852		.unwrap();
853		assert_eq!(pool.ready().count(), 0);
854		assert_eq!(pool.ready.len(), 0);
855
856		let tx2 = Transaction {
857			data: vec![2u8].into(),
858			hash: 2,
859			requires: vec![vec![1]],
860			provides: vec![vec![3]],
861			..default_tx().clone()
862		};
863		pool.import(tx2.clone()).unwrap();
864		assert_eq!(pool.future.len(), 2);
865
866		let res = pool
867			.import(Transaction {
868				data: vec![1u8].into(),
869				hash: 1,
870				provides: vec![vec![1]],
871				..default_tx().clone()
872			})
873			.unwrap();
874
875		assert_eq!(
876			res,
877			Imported::Ready {
878				hash: 1,
879				promoted: vec![3],
880				failed: vec![],
881				removed: vec![tx2.into()]
882			}
883		);
884
885		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
886		assert_eq!(it.next(), Some(1));
887		assert_eq!(it.next(), Some(3));
888		assert_eq!(it.next(), None);
889
890		assert_eq!(pool.future.len(), 0);
891	}
892
893	#[test]
894	fn should_handle_a_cycle() {
895		// given
896		let mut pool = pool();
897		pool.import(Transaction {
898			data: vec![1u8].into(),
899			requires: vec![vec![0]],
900			provides: vec![vec![1]],
901			..default_tx().clone()
902		})
903		.unwrap();
904		pool.import(Transaction {
905			data: vec![3u8].into(),
906			hash: 3,
907			requires: vec![vec![1]],
908			provides: vec![vec![2]],
909			..default_tx().clone()
910		})
911		.unwrap();
912		assert_eq!(pool.ready().count(), 0);
913		assert_eq!(pool.ready.len(), 0);
914
915		// when
916		let tx2 = Transaction {
917			data: vec![2u8].into(),
918			hash: 2,
919			requires: vec![vec![2]],
920			provides: vec![vec![0]],
921			..default_tx().clone()
922		};
923		pool.import(tx2.clone()).unwrap();
924
925		// then
926		{
927			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
928			assert_eq!(it.next(), None);
929		}
930		// all transactions occupy the Future queue - it's fine
931		assert_eq!(pool.future.len(), 3);
932
933		// let's close the cycle with one additional transaction
934		let res = pool
935			.import(Transaction {
936				data: vec![4u8].into(),
937				hash: 4,
938				priority: 50u64,
939				provides: vec![vec![0]],
940				..default_tx().clone()
941			})
942			.unwrap();
943		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
944		assert_eq!(it.next(), Some(4));
945		assert_eq!(it.next(), Some(1));
946		assert_eq!(it.next(), Some(3));
947		assert_eq!(it.next(), None);
948		assert_eq!(
949			res,
950			Imported::Ready {
951				hash: 4,
952				promoted: vec![1, 3],
953				failed: vec![],
954				removed: vec![tx2.into()]
955			}
956		);
957		assert_eq!(pool.future.len(), 0);
958	}
959
960	#[test]
961	fn should_handle_a_cycle_with_low_priority() {
962		// given
963		let mut pool = pool();
964		pool.import(Transaction {
965			data: vec![1u8].into(),
966			requires: vec![vec![0]],
967			provides: vec![vec![1]],
968			..default_tx().clone()
969		})
970		.unwrap();
971		pool.import(Transaction {
972			data: vec![3u8].into(),
973			hash: 3,
974			requires: vec![vec![1]],
975			provides: vec![vec![2]],
976			..default_tx().clone()
977		})
978		.unwrap();
979		assert_eq!(pool.ready().count(), 0);
980		assert_eq!(pool.ready.len(), 0);
981
982		// when
983		pool.import(Transaction {
984			data: vec![2u8].into(),
985			hash: 2,
986			requires: vec![vec![2]],
987			provides: vec![vec![0]],
988			..default_tx().clone()
989		})
990		.unwrap();
991
992		// then
993		{
994			let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
995			assert_eq!(it.next(), None);
996		}
997		// all transactions occupy the Future queue - it's fine
998		assert_eq!(pool.future.len(), 3);
999
1000		// let's close the cycle with one additional transaction
1001		let err = pool
1002			.import(Transaction {
1003				data: vec![4u8].into(),
1004				hash: 4,
1005				priority: 1u64, // lower priority than Tx(2)
1006				provides: vec![vec![0]],
1007				..default_tx().clone()
1008			})
1009			.unwrap_err();
1010		let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1011		assert_eq!(it.next(), None);
1012		assert_eq!(pool.ready.len(), 0);
1013		assert_eq!(pool.future.len(), 0);
1014		if let error::Error::CycleDetected = err {
1015		} else {
1016			assert!(false, "Invalid error kind: {:?}", err);
1017		}
1018	}
1019
1020	#[test]
1021	fn should_remove_invalid_transactions() {
1022		// given
1023		let mut pool = pool();
1024		pool.import(Transaction {
1025			data: vec![5u8].into(),
1026			hash: 5,
1027			provides: vec![vec![0], vec![4]],
1028			..default_tx().clone()
1029		})
1030		.unwrap();
1031		pool.import(Transaction {
1032			data: vec![1u8].into(),
1033			requires: vec![vec![0]],
1034			provides: vec![vec![1]],
1035			..default_tx().clone()
1036		})
1037		.unwrap();
1038		pool.import(Transaction {
1039			data: vec![3u8].into(),
1040			hash: 3,
1041			requires: vec![vec![2]],
1042			..default_tx().clone()
1043		})
1044		.unwrap();
1045		pool.import(Transaction {
1046			data: vec![2u8].into(),
1047			hash: 2,
1048			requires: vec![vec![1]],
1049			provides: vec![vec![3], vec![2]],
1050			..default_tx().clone()
1051		})
1052		.unwrap();
1053		pool.import(Transaction {
1054			data: vec![4u8].into(),
1055			hash: 4,
1056			priority: 1_000u64,
1057			requires: vec![vec![3], vec![4]],
1058			..default_tx().clone()
1059		})
1060		.unwrap();
1061		// future
1062		pool.import(Transaction {
1063			data: vec![6u8].into(),
1064			hash: 6,
1065			priority: 1_000u64,
1066			requires: vec![vec![11]],
1067			..default_tx().clone()
1068		})
1069		.unwrap();
1070		assert_eq!(pool.ready().count(), 5);
1071		assert_eq!(pool.future.len(), 1);
1072
1073		// when
1074		pool.remove_subtree(&[6, 1]);
1075
1076		// then
1077		assert_eq!(pool.ready().count(), 1);
1078		assert_eq!(pool.future.len(), 0);
1079	}
1080
1081	#[test]
1082	fn should_prune_ready_transactions() {
1083		// given
1084		let mut pool = pool();
1085		// future (waiting for 0)
1086		pool.import(Transaction {
1087			data: vec![5u8].into(),
1088			hash: 5,
1089			requires: vec![vec![0]],
1090			provides: vec![vec![100]],
1091			..default_tx().clone()
1092		})
1093		.unwrap();
1094		// ready
1095		pool.import(Transaction {
1096			data: vec![1u8].into(),
1097			provides: vec![vec![1]],
1098			..default_tx().clone()
1099		})
1100		.unwrap();
1101		pool.import(Transaction {
1102			data: vec![2u8].into(),
1103			hash: 2,
1104			requires: vec![vec![2]],
1105			provides: vec![vec![3]],
1106			..default_tx().clone()
1107		})
1108		.unwrap();
1109		pool.import(Transaction {
1110			data: vec![3u8].into(),
1111			hash: 3,
1112			requires: vec![vec![1]],
1113			provides: vec![vec![2]],
1114			..default_tx().clone()
1115		})
1116		.unwrap();
1117		pool.import(Transaction {
1118			data: vec![4u8].into(),
1119			hash: 4,
1120			priority: 1_000u64,
1121			requires: vec![vec![3], vec![2]],
1122			provides: vec![vec![4]],
1123			..default_tx().clone()
1124		})
1125		.unwrap();
1126
1127		assert_eq!(pool.ready().count(), 4);
1128		assert_eq!(pool.future.len(), 1);
1129
1130		// when
1131		let result = pool.prune_tags(vec![vec![0], vec![2]]);
1132
1133		// then
1134		assert_eq!(result.pruned.len(), 2);
1135		assert_eq!(result.failed.len(), 0);
1136		assert_eq!(
1137			result.promoted[0],
1138			Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1139		);
1140		assert_eq!(result.promoted.len(), 1);
1141		assert_eq!(pool.future.len(), 0);
1142		assert_eq!(pool.ready.len(), 3);
1143		assert_eq!(pool.ready().count(), 3);
1144	}
1145
1146	#[test]
1147	fn transaction_debug() {
1148		assert_eq!(
1149			format!(
1150				"{:?}",
1151				Transaction {
1152					data: vec![4u8].into(),
1153					hash: 4,
1154					priority: 1_000u64,
1155					requires: vec![vec![3], vec![2]],
1156					provides: vec![vec![4]],
1157					..default_tx().clone()
1158				}
1159			),
1160			"Transaction { \
1161hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1162source: TimedTransactionSource { source: External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1163				.to_owned()
1164		);
1165	}
1166
1167	#[test]
1168	fn transaction_propagation() {
1169		assert_eq!(
1170			Transaction {
1171				data: vec![4u8].into(),
1172				hash: 4,
1173				priority: 1_000u64,
1174				requires: vec![vec![3], vec![2]],
1175				provides: vec![vec![4]],
1176				..default_tx().clone()
1177			}
1178			.is_propagable(),
1179			true
1180		);
1181
1182		assert_eq!(
1183			Transaction {
1184				data: vec![4u8].into(),
1185				hash: 4,
1186				priority: 1_000u64,
1187				requires: vec![vec![3], vec![2]],
1188				provides: vec![vec![4]],
1189				propagate: false,
1190				..default_tx().clone()
1191			}
1192			.is_propagable(),
1193			false
1194		);
1195	}
1196
1197	#[test]
1198	fn should_reject_future_transactions() {
1199		// given
1200		let mut pool = pool();
1201
1202		// when
1203		pool.reject_future_transactions = true;
1204
1205		// then
1206		let err = pool.import(Transaction {
1207			data: vec![5u8].into(),
1208			hash: 5,
1209			requires: vec![vec![0]],
1210			..default_tx().clone()
1211		});
1212
1213		if let Err(error::Error::RejectedFutureTransaction) = err {
1214		} else {
1215			assert!(false, "Invalid error kind: {:?}", err);
1216		}
1217	}
1218
1219	#[test]
1220	fn should_clear_future_queue() {
1221		// given
1222		let mut pool = pool();
1223
1224		// when
1225		pool.import(Transaction {
1226			data: vec![5u8].into(),
1227			hash: 5,
1228			requires: vec![vec![0]],
1229			..default_tx().clone()
1230		})
1231		.unwrap();
1232
1233		// then
1234		assert_eq!(pool.future.len(), 1);
1235
1236		// and then when
1237		assert_eq!(pool.clear_future().len(), 1);
1238
1239		// then
1240		assert_eq!(pool.future.len(), 0);
1241	}
1242
1243	#[test]
1244	fn should_accept_future_transactions_when_explicitly_asked_to() {
1245		// given
1246		let mut pool = pool();
1247		pool.reject_future_transactions = true;
1248
1249		// when
1250		let flag_value = pool.with_futures_enabled(|pool, flag| {
1251			pool.import(Transaction {
1252				data: vec![5u8].into(),
1253				hash: 5,
1254				requires: vec![vec![0]],
1255				..default_tx().clone()
1256			})
1257			.unwrap();
1258
1259			flag
1260		});
1261
1262		// then
1263		assert_eq!(flag_value, true);
1264		assert_eq!(pool.reject_future_transactions, true);
1265		assert_eq!(pool.future.len(), 1);
1266	}
1267}