transaction_pool/
pool.rs

1// Copyright 2020 Parity Technologies
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use log::{trace, warn};
10use std::collections::{hash_map, BTreeSet, HashMap};
11use std::slice;
12use std::sync::Arc;
13
14use crate::{
15	error,
16	listener::{Listener, NoopListener},
17	options::Options,
18	ready::{Readiness, Ready},
19	replace::{ReplaceTransaction, ShouldReplace},
20	scoring::{self, ScoreWithRef, Scoring},
21	status::{LightStatus, Status},
22	transactions::{AddResult, Transactions},
23	VerifiedTransaction,
24};
25
26/// Internal representation of transaction.
27///
28/// Includes unique insertion id that can be used for scoring explicitly,
29/// but internally is used to resolve conflicts in case of equal scoring
30/// (newer transactions are preferred).
31#[derive(Debug)]
32pub struct Transaction<T> {
33	/// Sequential id of the transaction
34	pub insertion_id: u64,
35	/// Shared transaction
36	pub transaction: Arc<T>,
37}
38
39impl<T> Clone for Transaction<T> {
40	fn clone(&self) -> Self {
41		Transaction { insertion_id: self.insertion_id, transaction: self.transaction.clone() }
42	}
43}
44
45impl<T> ::std::ops::Deref for Transaction<T> {
46	type Target = Arc<T>;
47
48	fn deref(&self) -> &Self::Target {
49		&self.transaction
50	}
51}
52
53/// A transaction pool.
54#[derive(Debug)]
55pub struct Pool<T: VerifiedTransaction, S: Scoring<T>, L = NoopListener> {
56	listener: L,
57	scoring: S,
58	options: Options,
59	mem_usage: usize,
60
61	transactions: HashMap<T::Sender, Transactions<T, S>>,
62	by_hash: HashMap<T::Hash, Transaction<T>>,
63
64	best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
65	worst_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
66
67	insertion_id: u64,
68}
69
70impl<T: VerifiedTransaction, S: Scoring<T> + Default> Default for Pool<T, S> {
71	fn default() -> Self {
72		Self::with_scoring(S::default(), Options::default())
73	}
74}
75
76impl<T: VerifiedTransaction, S: Scoring<T> + Default> Pool<T, S> {
77	/// Creates a new `Pool` with given options
78	/// and default `Scoring` and `Listener`.
79	pub fn with_options(options: Options) -> Self {
80		Self::with_scoring(S::default(), options)
81	}
82}
83
84impl<T: VerifiedTransaction, S: Scoring<T>> Pool<T, S> {
85	/// Creates a new `Pool` with given `Scoring` and options.
86	pub fn with_scoring(scoring: S, options: Options) -> Self {
87		Self::new(NoopListener, scoring, options)
88	}
89}
90
91const INITIAL_NUMBER_OF_SENDERS: usize = 16;
92
93impl<T, S, L> Pool<T, S, L>
94where
95	T: VerifiedTransaction,
96	S: Scoring<T>,
97	L: Listener<T>,
98{
99	/// Creates new `Pool` with given `Scoring`, `Listener` and options.
100	pub fn new(listener: L, scoring: S, options: Options) -> Self {
101		let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS);
102		let by_hash = HashMap::with_capacity(options.max_count / 16);
103
104		Pool {
105			listener,
106			scoring,
107			options,
108			mem_usage: 0,
109			transactions,
110			by_hash,
111			best_transactions: Default::default(),
112			worst_transactions: Default::default(),
113			insertion_id: 0,
114		}
115	}
116
117	/// Attempts to import new transaction to the pool, returns a `Arc<T>` or an `Error`.
118	///
119	/// NOTE: Since `Ready`ness is separate from the pool it's possible to import stalled transactions.
120	/// It's the caller responsibility to make sure that's not the case.
121	///
122	/// NOTE: The transaction may push out some other transactions from the pool
123	/// either because of limits (see `Options`) or because `Scoring` decides that the transaction
124	/// replaces an existing transaction from that sender.
125	///
126	/// If any limit is reached the transaction with the lowest `Score` will be compared with the
127	/// new transaction via the supplied `ShouldReplace` implementation and may be evicted.
128	///
129	/// The `Listener` will be informed on any drops or rejections.
130	pub fn import(&mut self, transaction: T, replace: &dyn ShouldReplace<T>) -> error::Result<Arc<T>, T::Hash> {
131		let mem_usage = transaction.mem_usage();
132
133		if self.by_hash.contains_key(transaction.hash()) {
134			return Err(error::Error::AlreadyImported(transaction.hash().clone()));
135		}
136
137		self.insertion_id += 1;
138		let transaction = Transaction { insertion_id: self.insertion_id, transaction: Arc::new(transaction) };
139
140		// TODO [ToDr] Most likely move this after the transaction is inserted.
141		// Avoid using should_replace, but rather use scoring for that.
142		{
143			let remove_worst = |s: &mut Self, transaction| match s.remove_worst(transaction, replace) {
144				Err(err) => {
145					s.listener.rejected(transaction, &err);
146					Err(err)
147				}
148				Ok(None) => Ok(false),
149				Ok(Some(removed)) => {
150					s.listener.dropped(&removed, Some(transaction));
151					s.finalize_remove(removed.hash());
152					Ok(true)
153				}
154			};
155
156			while self.by_hash.len() + 1 > self.options.max_count {
157				trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
158				if !remove_worst(self, &transaction)? {
159					break;
160				}
161			}
162
163			while self.mem_usage + mem_usage > self.options.max_mem_usage {
164				trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
165				if !remove_worst(self, &transaction)? {
166					break;
167				}
168			}
169		}
170
171		let (result, prev_state, current_state) = {
172			let transactions =
173				self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
174			// get worst and best transactions for comparison
175			let prev = transactions.worst_and_best();
176			let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
177			let current = transactions.worst_and_best();
178			(result, prev, current)
179		};
180
181		// update best and worst transactions from this sender (if required)
182		self.update_senders_worst_and_best(prev_state, current_state);
183
184		match result {
185			AddResult::Ok(tx) => {
186				self.listener.added(&tx, None);
187				self.finalize_insert(&tx, None);
188				Ok(tx.transaction)
189			}
190			AddResult::PushedOut { new, old } | AddResult::Replaced { new, old } => {
191				self.listener.added(&new, Some(&old));
192				self.finalize_insert(&new, Some(&old));
193				Ok(new.transaction)
194			}
195			AddResult::TooCheap { new, old } => {
196				let error = error::Error::TooCheapToReplace(old.hash().clone(), new.hash().clone());
197				self.listener.rejected(&new, &error);
198				return Err(error);
199			}
200			AddResult::TooCheapToEnter(new, score) => {
201				let error = error::Error::TooCheapToEnter(new.hash().clone(), format!("{:#x}", score));
202				self.listener.rejected(&new, &error);
203				return Err(error);
204			}
205		}
206	}
207
208	/// Updates state of the pool statistics if the transaction was added to a set.
209	fn finalize_insert(&mut self, new: &Transaction<T>, old: Option<&Transaction<T>>) {
210		self.mem_usage += new.mem_usage();
211		self.by_hash.insert(new.hash().clone(), new.clone());
212
213		if let Some(old) = old {
214			self.finalize_remove(old.hash());
215		}
216	}
217
218	/// Updates the pool statistics if transaction was removed.
219	fn finalize_remove(&mut self, hash: &T::Hash) -> Option<Arc<T>> {
220		self.by_hash.remove(hash).map(|old| {
221			self.mem_usage -= old.transaction.mem_usage();
222			old.transaction
223		})
224	}
225
226	/// Updates best and worst transactions from a sender.
227	fn update_senders_worst_and_best(
228		&mut self,
229		previous: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
230		current: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
231	) {
232		let worst_collection = &mut self.worst_transactions;
233		let best_collection = &mut self.best_transactions;
234
235		let is_same =
236			|a: &(S::Score, Transaction<T>), b: &(S::Score, Transaction<T>)| a.0 == b.0 && a.1.hash() == b.1.hash();
237
238		let update = |collection: &mut BTreeSet<_>, (score, tx), remove| {
239			if remove {
240				collection.remove(&ScoreWithRef::new(score, tx));
241			} else {
242				collection.insert(ScoreWithRef::new(score, tx));
243			}
244		};
245
246		match (previous, current) {
247			(None, Some((worst, best))) => {
248				update(worst_collection, worst, false);
249				update(best_collection, best, false);
250			}
251			(Some((worst, best)), None) => {
252				// all transactions from that sender has been removed.
253				// We can clear a hashmap entry.
254				self.transactions.remove(worst.1.sender());
255				update(worst_collection, worst, true);
256				update(best_collection, best, true);
257			}
258			(Some((w1, b1)), Some((w2, b2))) => {
259				if !is_same(&w1, &w2) {
260					update(worst_collection, w1, true);
261					update(worst_collection, w2, false);
262				}
263				if !is_same(&b1, &b2) {
264					update(best_collection, b1, true);
265					update(best_collection, b2, false);
266				}
267			}
268			(None, None) => {}
269		}
270	}
271
272	/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
273	///
274	/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
275	/// In such case we will accept the transaction even though it is going to exceed the limit.
276	fn remove_worst(
277		&mut self,
278		transaction: &Transaction<T>,
279		replace: &dyn ShouldReplace<T>,
280	) -> error::Result<Option<Transaction<T>>, T::Hash> {
281		let to_remove = match self.worst_transactions.iter().next_back() {
282			// No elements to remove? and the pool is still full?
283			None => {
284				warn!("The pool is full but there are no transactions to remove.");
285				return Err(error::Error::TooCheapToEnter(transaction.hash().clone(), "unknown".into()));
286			}
287			Some(old) => {
288				let txs = &self.transactions;
289				let get_replace_tx = |tx| {
290					let sender_txs = txs.get(transaction.sender()).map(|txs| txs.iter().as_slice());
291					ReplaceTransaction::new(tx, sender_txs)
292				};
293				let old_replace = get_replace_tx(&old.transaction);
294				let new_replace = get_replace_tx(transaction);
295
296				match replace.should_replace(&old_replace, &new_replace) {
297					// We can't decide which of them should be removed, so accept both.
298					scoring::Choice::InsertNew => None,
299					// New transaction is better than the worst one so we can replace it.
300					scoring::Choice::ReplaceOld => Some(old.clone()),
301					// otherwise fail
302					scoring::Choice::RejectNew => {
303						return Err(error::Error::TooCheapToEnter(
304							transaction.hash().clone(),
305							format!("{:#x}", old.score),
306						))
307					}
308				}
309			}
310		};
311
312		if let Some(to_remove) = to_remove {
313			// Remove from transaction set
314			self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
315				set.remove(&to_remove.transaction, scoring)
316			});
317
318			Ok(Some(to_remove.transaction))
319		} else {
320			Ok(None)
321		}
322	}
323
324	/// Removes transaction from sender's transaction `HashMap`.
325	fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(
326		&mut self,
327		sender: &T::Sender,
328		f: F,
329	) -> Option<R> {
330		let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) {
331			let prev = set.worst_and_best();
332			let result = f(set, &self.scoring);
333			(prev, set.worst_and_best(), result)
334		} else {
335			return None;
336		};
337
338		self.update_senders_worst_and_best(prev, next);
339		Some(result)
340	}
341
342	/// Clears pool from all transactions.
343	/// This causes a listener notification that all transactions were dropped.
344	/// NOTE: the drop-notification order will be arbitrary.
345	pub fn clear(&mut self) {
346		self.mem_usage = 0;
347		self.transactions.clear();
348		self.best_transactions.clear();
349		self.worst_transactions.clear();
350
351		for (_hash, tx) in self.by_hash.drain() {
352			self.listener.dropped(&tx.transaction, None)
353		}
354	}
355
356	/// Removes single transaction from the pool.
357	/// Depending on the `is_invalid` flag the listener
358	/// will either get a `cancelled` or `invalid` notification.
359	pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option<Arc<T>> {
360		if let Some(tx) = self.finalize_remove(hash) {
361			self.remove_from_set(tx.sender(), |set, scoring| set.remove(&tx, scoring));
362			if is_invalid {
363				self.listener.invalid(&tx);
364			} else {
365				self.listener.canceled(&tx);
366			}
367			Some(tx)
368		} else {
369			None
370		}
371	}
372
373	/// Removes all stalled transactions from given sender.
374	fn remove_stalled<R: Ready<T>>(&mut self, sender: &T::Sender, ready: &mut R) -> usize {
375		let removed_from_set = self.remove_from_set(sender, |transactions, scoring| transactions.cull(ready, scoring));
376
377		match removed_from_set {
378			Some(removed) => {
379				let len = removed.len();
380				for tx in removed {
381					self.finalize_remove(tx.hash());
382					self.listener.culled(&tx);
383				}
384				len
385			}
386			None => 0,
387		}
388	}
389
390	/// Removes all stalled transactions from given sender list (or from all senders).
391	pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize {
392		let mut removed = 0;
393		match senders {
394			Some(senders) => {
395				for sender in senders {
396					removed += self.remove_stalled(sender, &mut ready);
397				}
398			}
399			None => {
400				let senders = self.transactions.keys().cloned().collect::<Vec<_>>();
401				for sender in senders {
402					removed += self.remove_stalled(&sender, &mut ready);
403				}
404			}
405		}
406
407		removed
408	}
409
410	/// Returns a transaction if it's part of the pool or `None` otherwise.
411	pub fn find(&self, hash: &T::Hash) -> Option<Arc<T>> {
412		self.by_hash.get(hash).map(|t| t.transaction.clone())
413	}
414
415	/// Returns worst transaction in the queue (if any).
416	pub fn worst_transaction(&self) -> Option<Arc<T>> {
417		self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
418	}
419
420	/// Returns true if the pool is at it's capacity.
421	pub fn is_full(&self) -> bool {
422		self.by_hash.len() >= self.options.max_count || self.mem_usage >= self.options.max_mem_usage
423	}
424
425	/// Returns senders ordered by priority of their transactions.
426	pub fn senders(&self) -> impl Iterator<Item = &T::Sender> {
427		self.best_transactions.iter().map(|tx| tx.transaction.sender())
428	}
429
430	/// Returns an iterator of pending (ready) transactions.
431	pub fn pending<R: Ready<T>>(&self, ready: R) -> PendingIterator<'_, T, R, S, L> {
432		PendingIterator { ready, best_transactions: self.best_transactions.clone(), pool: self }
433	}
434
435	/// Returns pending (ready) transactions from given sender.
436	pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &T::Sender) -> PendingIterator<'_, T, R, S, L> {
437		let best_transactions = self
438			.transactions
439			.get(sender)
440			.and_then(|transactions| transactions.worst_and_best())
441			.map(|(_, best)| ScoreWithRef::new(best.0, best.1))
442			.map(|s| {
443				let mut set = BTreeSet::new();
444				set.insert(s);
445				set
446			})
447			.unwrap_or_default();
448
449		PendingIterator { ready, best_transactions, pool: self }
450	}
451
452	/// Returns unprioritized list of ready transactions.
453	pub fn unordered_pending<R: Ready<T>>(&self, ready: R) -> UnorderedIterator<'_, T, R, S> {
454		UnorderedIterator { ready, senders: self.transactions.iter(), transactions: None }
455	}
456
457	/// Update score of transactions of a particular sender.
458	pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) {
459		let res = if let Some(set) = self.transactions.get_mut(sender) {
460			let prev = set.worst_and_best();
461			set.update_scores(&self.scoring, event);
462			let current = set.worst_and_best();
463			Some((prev, current))
464		} else {
465			None
466		};
467
468		if let Some((prev, current)) = res {
469			self.update_senders_worst_and_best(prev, current);
470		}
471	}
472
473	/// Computes the full status of the pool (including readiness).
474	pub fn status<R: Ready<T>>(&self, mut ready: R) -> Status {
475		let mut status = Status::default();
476
477		for (_sender, transactions) in &self.transactions {
478			let len = transactions.len();
479			for (idx, tx) in transactions.iter().enumerate() {
480				match ready.is_ready(tx) {
481					Readiness::Stale => status.stalled += 1,
482					Readiness::Ready => status.pending += 1,
483					Readiness::Future => {
484						status.future += len - idx;
485						break;
486					}
487				}
488			}
489		}
490
491		status
492	}
493
494	/// Returns light status of the pool.
495	pub fn light_status(&self) -> LightStatus {
496		LightStatus {
497			mem_usage: self.mem_usage,
498			transaction_count: self.by_hash.len(),
499			senders: self.transactions.len(),
500		}
501	}
502
503	/// Returns current pool options.
504	pub fn options(&self) -> Options {
505		self.options.clone()
506	}
507
508	/// Borrows listener instance.
509	pub fn listener(&self) -> &L {
510		&self.listener
511	}
512
513	/// Borrows scoring instance.
514	pub fn scoring(&self) -> &S {
515		&self.scoring
516	}
517
518	/// Borrows listener mutably.
519	pub fn listener_mut(&mut self) -> &mut L {
520		&mut self.listener
521	}
522}
523
524/// An iterator over all pending (ready) transactions in unoredered fashion.
525///
526/// NOTE: Current implementation will iterate over all transactions from particular sender
527/// ordered by nonce, but that might change in the future.
528///
529/// NOTE: the transactions are not removed from the queue.
530/// You might remove them later by calling `cull`.
531pub struct UnorderedIterator<'a, T, R, S>
532where
533	T: VerifiedTransaction + 'a,
534	S: Scoring<T> + 'a,
535{
536	ready: R,
537	senders: hash_map::Iter<'a, T::Sender, Transactions<T, S>>,
538	transactions: Option<slice::Iter<'a, Transaction<T>>>,
539}
540
541impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S>
542where
543	T: VerifiedTransaction,
544	R: Ready<T>,
545	S: Scoring<T>,
546{
547	type Item = Arc<T>;
548
549	fn next(&mut self) -> Option<Self::Item> {
550		loop {
551			if let Some(transactions) = self.transactions.as_mut() {
552				if let Some(tx) = transactions.next() {
553					match self.ready.is_ready(&tx) {
554						Readiness::Ready => {
555							return Some(tx.transaction.clone());
556						}
557						state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state),
558					}
559				}
560			}
561
562			// otherwise fallback and try next sender
563			let next_sender = self.senders.next()?;
564			self.transactions = Some(next_sender.1.iter());
565		}
566	}
567}
568
569/// An iterator over all pending (ready) transactions.
570/// NOTE: the transactions are not removed from the queue.
571/// You might remove them later by calling `cull`.
572pub struct PendingIterator<'a, T, R, S, L>
573where
574	T: VerifiedTransaction + 'a,
575	S: Scoring<T> + 'a,
576	L: 'a,
577{
578	ready: R,
579	best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
580	pool: &'a Pool<T, S, L>,
581}
582
583impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L>
584where
585	T: VerifiedTransaction,
586	R: Ready<T>,
587	S: Scoring<T>,
588{
589	type Item = Arc<T>;
590
591	fn next(&mut self) -> Option<Self::Item> {
592		while !self.best_transactions.is_empty() {
593			let best = {
594				let best = self.best_transactions.iter().next().expect("current_best is not empty; qed").clone();
595				self.best_transactions.take(&best).expect("Just taken from iterator; qed")
596			};
597
598			let tx_state = self.ready.is_ready(&best.transaction);
599			// Add the next best sender's transaction when applicable
600			match tx_state {
601				Readiness::Ready | Readiness::Stale => {
602					// retrieve next one from the same sender.
603					let next = self
604						.pool
605						.transactions
606						.get(best.transaction.sender())
607						.and_then(|s| s.find_next(&best.transaction, &self.pool.scoring));
608					if let Some((score, tx)) = next {
609						self.best_transactions.insert(ScoreWithRef::new(score, tx));
610					}
611				}
612				_ => (),
613			}
614
615			if tx_state == Readiness::Ready {
616				return Some(best.transaction.transaction);
617			}
618
619			trace!("[{:?}] Ignoring {:?} transaction.", best.transaction.hash(), tx_state);
620		}
621
622		None
623	}
624}