Skip to main content

soil_client/transaction_pool/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Transaction pool client facing API.
8#![warn(missing_docs)]
9
10pub mod error;
11
12use async_trait::async_trait;
13use codec::Codec;
14use futures::Stream;
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
17use subsoil::core::offchain::TransactionPoolExt;
18use subsoil::runtime::traits::{Block as BlockT, Member};
19
20const LOG_TARGET: &str = "txpool::api";
21
22pub use subsoil::runtime::transaction_validity::{
23	TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
24	TransactionValidityError,
25};
26
27/// Transaction pool status.
28#[derive(Debug, Clone)]
29pub struct PoolStatus {
30	/// Number of transactions in the ready queue.
31	pub ready: usize,
32	/// Sum of bytes of ready transaction encodings.
33	pub ready_bytes: usize,
34	/// Number of transactions in the future queue.
35	pub future: usize,
36	/// Sum of bytes of ready transaction encodings.
37	pub future_bytes: usize,
38}
39
40impl PoolStatus {
41	/// Returns true if there are no transactions in the pool.
42	pub fn is_empty(&self) -> bool {
43		self.ready == 0 && self.future == 0
44	}
45}
46
47/// Possible transaction status events.
48///
49/// These events are being emitted by `TransactionPool` watchers,
50/// which are also exposed over RPC.
51///
52/// The status events can be grouped based on their kinds as:
53/// 1. Entering/Moving within the pool:
54/// 		- [Future](TransactionStatus::Future)
55/// 		- [Ready](TransactionStatus::Ready)
56/// 2. Inside `Ready` queue:
57/// 		- [Broadcast](TransactionStatus::Broadcast)
58/// 3. Leaving the pool:
59/// 		- [InBlock](TransactionStatus::InBlock)
60/// 		- [Invalid](TransactionStatus::Invalid)
61/// 		- [Usurped](TransactionStatus::Usurped)
62/// 		- [Dropped](TransactionStatus::Dropped)
63/// 	4. Re-entering the pool:
64/// 		- [Retracted](TransactionStatus::Retracted)
65/// 	5. Block finalized:
66/// 		- [Finalized](TransactionStatus::Finalized)
67/// 		- [FinalityTimeout](TransactionStatus::FinalityTimeout)
68///
69/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
70/// Substrate validates the transaction before it enters the pool.
71///
72/// A transaction is placed in the `Future` queue if it will become valid at a future time.
73/// For example, submitting a transaction with a higher account nonce than the current
74/// expected nonce will place the transaction in the `Future` queue.
75///
76/// The events will always be received in the order described above, however
77/// there might be cases where transactions alternate between `Future` and `Ready`
78/// pool, and are `Broadcast` in the meantime.
79///
80/// There is also only single event causing the transaction to leave the pool.
81/// I.e. only one of the listed ones should be triggered.
82///
83/// Note that there are conditions that may cause transactions to reappear in the pool.
84/// 1. Due to possible forks, the transaction that ends up being in included
85/// in one block, may later re-enter the pool or be marked as invalid.
86/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
87/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
88/// resubmitted.
89/// 3. `Invalid` transaction may become valid at some point in the future.
90/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
91/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
92/// 4. `Retracted` transactions might be included in some next block.
93///
94/// The `FinalityTimeout` event will be emitted when the block did not reach finality
95/// within 512 blocks. This either indicates that finality is not available for your chain,
96/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
97/// re-subscribe for a particular transaction hash manually again.
98///
99/// ### Last Event
100///
101/// The stream is considered finished when one of the following events happen:
102/// - [Finalized](TransactionStatus::Finalized)
103/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
104/// - [Usurped](TransactionStatus::Usurped)
105/// - [Invalid](TransactionStatus::Invalid)
106/// - [Dropped](TransactionStatus::Dropped)
107///
108/// See [`TransactionStatus::is_final`] for more details.
109///
110/// ### Resubmit Transactions
111///
112/// Users might resubmit the transaction at a later time for the following events:
113/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
114/// - [Invalid](TransactionStatus::Invalid)
115/// - [Dropped](TransactionStatus::Dropped)
116///
117/// See [`TransactionStatus::is_retriable`] for more details.
118#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
119#[serde(rename_all = "camelCase")]
120pub enum TransactionStatus<Hash, BlockHash> {
121	/// Transaction is part of the future queue.
122	Future,
123	/// Transaction is part of the ready queue.
124	Ready,
125	/// The transaction has been broadcast to the given peers.
126	Broadcast(Vec<String>),
127	/// Transaction has been included in block with given hash
128	/// at the given position.
129	#[serde(with = "v1_compatible")]
130	InBlock((BlockHash, TxIndex)),
131	/// The block this transaction was included in has been retracted.
132	Retracted(BlockHash),
133	/// Maximum number of finality watchers has been reached,
134	/// old watchers are being removed.
135	FinalityTimeout(BlockHash),
136	/// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
137	#[serde(with = "v1_compatible")]
138	Finalized((BlockHash, TxIndex)),
139	/// Transaction has been replaced in the pool, by another transaction
140	/// that provides the same tags. (e.g. same (sender, nonce)).
141	Usurped(Hash),
142	/// Transaction has been dropped from the pool because of the limit.
143	Dropped,
144	/// Transaction is no longer valid in the current state.
145	Invalid,
146}
147
148impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
149	/// Returns true if this is the last event emitted by [`TransactionStatusStream`].
150	pub fn is_final(&self) -> bool {
151		// The state must be kept in sync with `crate::graph::Sender`.
152		match self {
153			Self::Usurped(_)
154			| Self::Finalized(_)
155			| Self::FinalityTimeout(_)
156			| Self::Invalid
157			| Self::Dropped => true,
158			_ => false,
159		}
160	}
161
162	/// Returns true if the transaction could be re-submitted to the pool in the future.
163	///
164	/// For example, `TransactionStatus::Dropped` is retriable, because the transaction
165	/// may enter the pool if there is space for it in the future.
166	pub fn is_retriable(&self) -> bool {
167		match self {
168			// The number of finality watchers has been reached.
169			Self::FinalityTimeout(_) |
170			// An invalid transaction might be valid at a later time.
171			Self::Invalid |
172			// The transaction was dropped because of the limits of the pool.
173			// It can reenter the pool when other transactions are removed / finalized.
174			Self::Dropped => true,
175			_ => false,
176		}
177	}
178}
179
180/// The stream of transaction events.
181pub type TransactionStatusStream<Hash, BlockHash> =
182	dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
183
184/// The import notification event stream.
185pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
186
187/// Transaction hash type for a pool.
188pub type TxHash<P> = <P as TransactionPool>::Hash;
189/// Block hash type for a pool.
190pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
191/// Transaction type for a pool.
192pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
193/// Type of transactions event stream for a pool.
194pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
195/// Transaction type for a local pool.
196pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
197/// Transaction's index within the block in which it was included.
198pub type TxIndex = usize;
199/// Map containing validity errors associated with transaction hashes. Used to report invalid
200/// transactions to the pool.
201pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
202
203/// In-pool transaction interface.
204///
205/// The pool is container of transactions that are implementing this trait.
206/// See `subsoil::runtime::ValidTransaction` for details about every field.
207pub trait InPoolTransaction {
208	/// Transaction type.
209	type Transaction;
210	/// Transaction hash type.
211	type Hash;
212
213	/// Get the reference to the transaction data.
214	fn data(&self) -> &Self::Transaction;
215	/// Get hash of the transaction.
216	fn hash(&self) -> &Self::Hash;
217	/// Get priority of the transaction.
218	fn priority(&self) -> &TransactionPriority;
219	/// Get longevity of the transaction.
220	fn longevity(&self) -> &TransactionLongevity;
221	/// Get transaction dependencies.
222	fn requires(&self) -> &[TransactionTag];
223	/// Get tags that transaction provides.
224	fn provides(&self) -> &[TransactionTag];
225	/// Return a flag indicating if the transaction should be propagated to other peers.
226	fn is_propagable(&self) -> bool;
227}
228
229/// Transaction pool interface.
230#[async_trait]
231pub trait TransactionPool: Send + Sync {
232	/// Block type.
233	type Block: BlockT;
234	/// Transaction hash type.
235	type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
236	/// In-pool transaction type.
237	type InPoolTransaction: InPoolTransaction<
238		Transaction = Arc<TransactionFor<Self>>,
239		Hash = TxHash<Self>,
240	>;
241	/// Error type.
242	type Error: From<crate::transaction_pool::error::Error>
243		+ crate::transaction_pool::error::IntoPoolError;
244
245	// *** RPC
246
247	/// Asynchronously imports a bunch of unverified transactions to the pool.
248	async fn submit_at(
249		&self,
250		at: <Self::Block as BlockT>::Hash,
251		source: TransactionSource,
252		xts: Vec<TransactionFor<Self>>,
253	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
254
255	/// Asynchronously imports one unverified transaction to the pool.
256	async fn submit_one(
257		&self,
258		at: <Self::Block as BlockT>::Hash,
259		source: TransactionSource,
260		xt: TransactionFor<Self>,
261	) -> Result<TxHash<Self>, Self::Error>;
262
263	/// Asynchronously imports a single transaction and starts to watch their progress in the
264	/// pool.
265	async fn submit_and_watch(
266		&self,
267		at: <Self::Block as BlockT>::Hash,
268		source: TransactionSource,
269		xt: TransactionFor<Self>,
270	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
271
272	// *** Block production / Networking
273	/// Get an iterator for ready transactions ordered by priority.
274	///
275	/// Guaranteed to resolve only when transaction pool got updated at `at` block.
276	/// Guaranteed to resolve immediately when `None` is passed.
277	async fn ready_at(
278		&self,
279		at: <Self::Block as BlockT>::Hash,
280	) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
281
282	/// Get an iterator for ready transactions ordered by priority.
283	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
284
285	// *** Block production
286	/// Reports invalid transactions to the transaction pool.
287	///
288	/// This function takes a map where the key is a transaction hash and the value is an
289	/// optional error encountered during the transaction execution, possibly within a specific
290	/// block.
291	///
292	/// The transaction pool implementation decides which transactions to remove. Transactions
293	/// removed from the pool will be notified with `TransactionStatus::Invalid` event (if
294	/// `submit_and_watch` was used for submission).
295	///
296	/// If the error associated to transaction is `None`, the transaction will be forcibly removed
297	/// from the pool.
298	///
299	/// The optional `at` parameter provides additional context regarding the block where the error
300	/// occurred.
301	///
302	/// Function returns the transactions actually removed from the pool.
303	async fn report_invalid(
304		&self,
305		at: Option<<Self::Block as BlockT>::Hash>,
306		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
307	) -> Vec<Arc<Self::InPoolTransaction>>;
308
309	// *** logging
310	/// Get futures transaction list.
311	fn futures(&self) -> Vec<Self::InPoolTransaction>;
312
313	/// Returns pool status.
314	fn status(&self) -> PoolStatus;
315
316	// *** logging / RPC / networking
317	/// Return an event stream of transactions imported to the pool.
318	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
319
320	// *** networking
321	/// Notify the pool about transactions broadcast.
322	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
323
324	/// Returns transaction hash
325	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
326
327	/// Return specific ready transaction by hash, if there is one.
328	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
329
330	/// Asynchronously returns a set of ready transaction at given block within given timeout.
331	///
332	/// If the timeout is hit during method execution, then the best effort (without executing full
333	/// maintain process) set of ready transactions for given block is returned.
334	async fn ready_at_with_timeout(
335		&self,
336		at: <Self::Block as BlockT>::Hash,
337		timeout: std::time::Duration,
338	) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
339}
340
341/// An iterator of ready transactions.
342///
343/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
344/// last-returned element as invalid.
345///
346/// The implementation is then allowed, for performance reasons, to change the elements
347/// returned next, by e.g.  skipping elements that are known to depend on the reported
348/// transaction, which yields them invalid as well.
349pub trait ReadyTransactions: Iterator {
350	/// Report given transaction as invalid.
351	///
352	/// This might affect subsequent elements returned by the iterator, so dependent transactions
353	/// are skipped for performance reasons.
354	fn report_invalid(&mut self, _tx: &Self::Item);
355}
356
357/// A no-op implementation for an empty iterator.
358impl<T> ReadyTransactions for std::iter::Empty<T> {
359	fn report_invalid(&mut self, _tx: &T) {}
360}
361
362/// Events that the transaction pool listens for.
363#[derive(Debug)]
364pub enum ChainEvent<B: BlockT> {
365	/// New best block have been added to the chain.
366	NewBestBlock {
367		/// Hash of the block.
368		hash: B::Hash,
369		/// Tree route from old best to new best parent that was calculated on import.
370		///
371		/// If `None`, no re-org happened on import.
372		tree_route: Option<Arc<crate::blockchain::TreeRoute<B>>>,
373	},
374	/// An existing block has been finalized.
375	Finalized {
376		/// Hash of just finalized block.
377		hash: B::Hash,
378		/// Path from old finalized to new finalized parent.
379		tree_route: Arc<[B::Hash]>,
380	},
381}
382
383impl<B: BlockT> ChainEvent<B> {
384	/// Returns the block hash associated to the event.
385	pub fn hash(&self) -> B::Hash {
386		match self {
387			Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
388		}
389	}
390
391	/// Is `self == Self::Finalized`?
392	pub fn is_finalized(&self) -> bool {
393		matches!(self, Self::Finalized { .. })
394	}
395}
396
397/// Trait for transaction pool maintenance.
398#[async_trait]
399pub trait MaintainedTransactionPool: TransactionPool {
400	/// Perform maintenance
401	async fn maintain(&self, event: ChainEvent<Self::Block>);
402}
403
404/// Transaction pool interface for submitting local transactions that exposes a
405/// blocking interface for submission.
406pub trait LocalTransactionPool: Send + Sync {
407	/// Block type.
408	type Block: BlockT;
409	/// Transaction hash type.
410	type Hash: Hash + Eq + Member + Serialize;
411	/// Error type.
412	type Error: From<crate::transaction_pool::error::Error>
413		+ crate::transaction_pool::error::IntoPoolError;
414
415	/// Submits the given local unverified transaction to the pool blocking the
416	/// current thread for any necessary pre-verification.
417	/// NOTE: It MUST NOT be used for transactions that originate from the
418	/// network or RPC, since the validation is performed with
419	/// `TransactionSource::Local`.
420	fn submit_local(
421		&self,
422		at: <Self::Block as BlockT>::Hash,
423		xt: LocalTransactionFor<Self>,
424	) -> Result<Self::Hash, Self::Error>;
425}
426
427impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
428	type Block = T::Block;
429
430	type Hash = T::Hash;
431
432	type Error = T::Error;
433
434	fn submit_local(
435		&self,
436		at: <Self::Block as BlockT>::Hash,
437		xt: LocalTransactionFor<Self>,
438	) -> Result<Self::Hash, Self::Error> {
439		(**self).submit_local(at, xt)
440	}
441}
442
443/// An abstraction for [`LocalTransactionPool`]
444///
445/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
446/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
447/// the wrapping in a `Arc`.
448trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
449	/// Submit transaction.
450	///
451	/// The transaction will end up in the pool and be propagated to others.
452	fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
453}
454
455impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
456	fn submit_at(
457		&self,
458		at: <TPool::Block as BlockT>::Hash,
459		extrinsic: <TPool::Block as BlockT>::Extrinsic,
460	) -> Result<(), ()> {
461		log::trace!(
462			target: LOG_TARGET,
463			"(offchain call) Submitting a transaction to the pool: {:?}",
464			extrinsic
465		);
466
467		let result = self.submit_local(at, extrinsic);
468
469		result.map(|_| ()).map_err(|e| {
470			log::warn!(
471				target: LOG_TARGET,
472				"(offchain call) Error submitting a transaction to the pool: {}",
473				e
474			)
475		})
476	}
477}
478
479/// Factory for creating [`TransactionPoolExt`]s.
480///
481/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
482/// the wasm execution environment to send transactions from an offchain call to the  runtime.
483#[derive(Clone)]
484pub struct OffchainTransactionPoolFactory<Block: BlockT> {
485	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
486}
487
488impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
489	/// Creates a new instance using the given `tx_pool`.
490	pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
491		Self { pool: Arc::new(tx_pool) as Arc<_> }
492	}
493
494	/// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
495	///
496	/// Transactions that are being submitted by this instance will be submitted with `block_hash`
497	/// as context for validation.
498	pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
499		TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
500	}
501}
502
503/// Wraps a `pool` and `block_hash` to implement [`subsoil::core::offchain::TransactionPool`].
504struct OffchainTransactionPool<Block: BlockT> {
505	block_hash: Block::Hash,
506	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
507}
508
509impl<Block: BlockT> subsoil::core::offchain::TransactionPool for OffchainTransactionPool<Block> {
510	fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
511		let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
512			Ok(t) => t,
513			Err(e) => {
514				log::error!(
515					target: LOG_TARGET,
516					"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
517				);
518
519				return Err(());
520			},
521		};
522
523		self.pool.submit_at(self.block_hash, extrinsic)
524	}
525}
526
527/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
528mod v1_compatible {
529	use serde::{Deserialize, Deserializer, Serialize, Serializer};
530
531	pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
532	where
533		S: Serializer,
534		H: Serialize,
535	{
536		let (hash, _) = data;
537		serde::Serialize::serialize(&hash, serializer)
538	}
539
540	pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
541	where
542		D: Deserializer<'de>,
543		H: Deserialize<'de>,
544	{
545		let hash: H = serde::Deserialize::deserialize(deserializer)?;
546		Ok((hash, 0))
547	}
548}
549
550/// Transaction pool that rejects all submitted transactions.
551///
552/// Could be used for example in tests.
553pub struct RejectAllTxPool<Block>(PhantomData<Block>);
554
555impl<Block> Default for RejectAllTxPool<Block> {
556	fn default() -> Self {
557		Self(PhantomData)
558	}
559}
560
561impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
562	type Block = Block;
563
564	type Hash = Block::Hash;
565
566	type Error = error::Error;
567
568	fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
569		Err(error::Error::ImmediatelyDropped)
570	}
571}
572
573#[cfg(test)]
574mod tests {
575	use super::*;
576
577	#[test]
578	fn tx_status_compatibility() {
579		let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
580		let ser = serde_json::to_string(&event).unwrap();
581
582		let exp = r#"{"inBlock":1}"#;
583		assert_eq!(ser, exp);
584
585		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
586		assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
587
588		let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
589		let ser = serde_json::to_string(&event).unwrap();
590
591		let exp = r#"{"finalized":1}"#;
592		assert_eq!(ser, exp);
593
594		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
595		assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
596	}
597}