sc_transaction_pool_api/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool client facing API.
20#![warn(missing_docs)]
21
22pub mod error;
23
24use async_trait::async_trait;
25use codec::Codec;
26use futures::Stream;
27use serde::{de::DeserializeOwned, Deserialize, Serialize};
28use sp_core::offchain::TransactionPoolExt;
29use sp_runtime::traits::{Block as BlockT, Member};
30use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
31
32const LOG_TARGET: &str = "txpool::api";
33
34pub use sp_runtime::transaction_validity::{
35	TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
36	TransactionValidityError,
37};
38
39/// Transaction pool status.
40#[derive(Debug, Clone)]
41pub struct PoolStatus {
42	/// Number of transactions in the ready queue.
43	pub ready: usize,
44	/// Sum of bytes of ready transaction encodings.
45	pub ready_bytes: usize,
46	/// Number of transactions in the future queue.
47	pub future: usize,
48	/// Sum of bytes of ready transaction encodings.
49	pub future_bytes: usize,
50}
51
52impl PoolStatus {
53	/// Returns true if there are no transactions in the pool.
54	pub fn is_empty(&self) -> bool {
55		self.ready == 0 && self.future == 0
56	}
57}
58
59/// Possible transaction status events.
60///
61/// These events are being emitted by `TransactionPool` watchers,
62/// which are also exposed over RPC.
63///
64/// The status events can be grouped based on their kinds as:
65/// 1. Entering/Moving within the pool:
66/// 		- [Future](TransactionStatus::Future)
67/// 		- [Ready](TransactionStatus::Ready)
68/// 2. Inside `Ready` queue:
69/// 		- [Broadcast](TransactionStatus::Broadcast)
70/// 3. Leaving the pool:
71/// 		- [InBlock](TransactionStatus::InBlock)
72/// 		- [Invalid](TransactionStatus::Invalid)
73/// 		- [Usurped](TransactionStatus::Usurped)
74/// 		- [Dropped](TransactionStatus::Dropped)
75/// 	4. Re-entering the pool:
76/// 		- [Retracted](TransactionStatus::Retracted)
77/// 	5. Block finalized:
78/// 		- [Finalized](TransactionStatus::Finalized)
79/// 		- [FinalityTimeout](TransactionStatus::FinalityTimeout)
80///
81/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
82/// Substrate validates the transaction before it enters the pool.
83///
84/// A transaction is placed in the `Future` queue if it will become valid at a future time.
85/// For example, submitting a transaction with a higher account nonce than the current
86/// expected nonce will place the transaction in the `Future` queue.
87///
88/// The events will always be received in the order described above, however
89/// there might be cases where transactions alternate between `Future` and `Ready`
90/// pool, and are `Broadcast` in the meantime.
91///
92/// There is also only single event causing the transaction to leave the pool.
93/// I.e. only one of the listed ones should be triggered.
94///
95/// Note that there are conditions that may cause transactions to reappear in the pool.
96/// 1. Due to possible forks, the transaction that ends up being in included
97/// in one block, may later re-enter the pool or be marked as invalid.
98/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
99/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
100/// resubmitted.
101/// 3. `Invalid` transaction may become valid at some point in the future.
102/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
103/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
104/// 4. `Retracted` transactions might be included in some next block.
105///
106/// The `FinalityTimeout` event will be emitted when the block did not reach finality
107/// within 512 blocks. This either indicates that finality is not available for your chain,
108/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
109/// re-subscribe for a particular transaction hash manually again.
110///
111/// ### Last Event
112///
113/// The stream is considered finished when one of the following events happen:
114/// - [Finalized](TransactionStatus::Finalized)
115/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
116/// - [Usurped](TransactionStatus::Usurped)
117/// - [Invalid](TransactionStatus::Invalid)
118/// - [Dropped](TransactionStatus::Dropped)
119///
120/// See [`TransactionStatus::is_final`] for more details.
121///
122/// ### Resubmit Transactions
123///
124/// Users might resubmit the transaction at a later time for the following events:
125/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
126/// - [Invalid](TransactionStatus::Invalid)
127/// - [Dropped](TransactionStatus::Dropped)
128///
129/// See [`TransactionStatus::is_retriable`] for more details.
130#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub enum TransactionStatus<Hash, BlockHash> {
133	/// Transaction is part of the future queue.
134	Future,
135	/// Transaction is part of the ready queue.
136	Ready,
137	/// The transaction has been broadcast to the given peers.
138	Broadcast(Vec<String>),
139	/// Transaction has been included in block with given hash
140	/// at the given position.
141	#[serde(with = "v1_compatible")]
142	InBlock((BlockHash, TxIndex)),
143	/// The block this transaction was included in has been retracted.
144	Retracted(BlockHash),
145	/// Maximum number of finality watchers has been reached,
146	/// old watchers are being removed.
147	FinalityTimeout(BlockHash),
148	/// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
149	#[serde(with = "v1_compatible")]
150	Finalized((BlockHash, TxIndex)),
151	/// Transaction has been replaced in the pool, by another transaction
152	/// that provides the same tags. (e.g. same (sender, nonce)).
153	Usurped(Hash),
154	/// Transaction has been dropped from the pool because of the limit.
155	Dropped,
156	/// Transaction is no longer valid in the current state.
157	Invalid,
158}
159
160impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
161	/// Returns true if this is the last event emitted by [`TransactionStatusStream`].
162	pub fn is_final(&self) -> bool {
163		// The state must be kept in sync with `crate::graph::Sender`.
164		match self {
165			Self::Usurped(_) |
166			Self::Finalized(_) |
167			Self::FinalityTimeout(_) |
168			Self::Invalid |
169			Self::Dropped => true,
170			_ => false,
171		}
172	}
173
174	/// Returns true if the transaction could be re-submitted to the pool in the future.
175	///
176	/// For example, `TransactionStatus::Dropped` is retriable, because the transaction
177	/// may enter the pool if there is space for it in the future.
178	pub fn is_retriable(&self) -> bool {
179		match self {
180			// The number of finality watchers has been reached.
181			Self::FinalityTimeout(_) |
182			// An invalid transaction might be valid at a later time.
183			Self::Invalid |
184			// The transaction was dropped because of the limits of the pool.
185			// It can reenter the pool when other transactions are removed / finalized.
186			Self::Dropped => true,
187			_ => false,
188		}
189	}
190}
191
192/// The stream of transaction events.
193pub type TransactionStatusStream<Hash, BlockHash> =
194	dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
195
196/// The import notification event stream.
197pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
198
199/// Transaction hash type for a pool.
200pub type TxHash<P> = <P as TransactionPool>::Hash;
201/// Block hash type for a pool.
202pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
203/// Transaction type for a pool.
204pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
205/// Type of transactions event stream for a pool.
206pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
207/// Transaction type for a local pool.
208pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
209/// Transaction's index within the block in which it was included.
210pub type TxIndex = usize;
211/// Map containing validity errors associated with transaction hashes. Used to report invalid
212/// transactions to the pool.
213pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
214
215/// In-pool transaction interface.
216///
217/// The pool is container of transactions that are implementing this trait.
218/// See `sp_runtime::ValidTransaction` for details about every field.
219pub trait InPoolTransaction {
220	/// Transaction type.
221	type Transaction;
222	/// Transaction hash type.
223	type Hash;
224
225	/// Get the reference to the transaction data.
226	fn data(&self) -> &Self::Transaction;
227	/// Get hash of the transaction.
228	fn hash(&self) -> &Self::Hash;
229	/// Get priority of the transaction.
230	fn priority(&self) -> &TransactionPriority;
231	/// Get longevity of the transaction.
232	fn longevity(&self) -> &TransactionLongevity;
233	/// Get transaction dependencies.
234	fn requires(&self) -> &[TransactionTag];
235	/// Get tags that transaction provides.
236	fn provides(&self) -> &[TransactionTag];
237	/// Return a flag indicating if the transaction should be propagated to other peers.
238	fn is_propagable(&self) -> bool;
239}
240
241/// Transaction pool interface.
242#[async_trait]
243pub trait TransactionPool: Send + Sync {
244	/// Block type.
245	type Block: BlockT;
246	/// Transaction hash type.
247	type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
248	/// In-pool transaction type.
249	type InPoolTransaction: InPoolTransaction<
250		Transaction = Arc<TransactionFor<Self>>,
251		Hash = TxHash<Self>,
252	>;
253	/// Error type.
254	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
255
256	// *** RPC
257
258	/// Asynchronously imports a bunch of unverified transactions to the pool.
259	async fn submit_at(
260		&self,
261		at: <Self::Block as BlockT>::Hash,
262		source: TransactionSource,
263		xts: Vec<TransactionFor<Self>>,
264	) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
265
266	/// Asynchronously imports one unverified transaction to the pool.
267	async fn submit_one(
268		&self,
269		at: <Self::Block as BlockT>::Hash,
270		source: TransactionSource,
271		xt: TransactionFor<Self>,
272	) -> Result<TxHash<Self>, Self::Error>;
273
274	/// Asynchronously imports a single transaction and starts to watch their progress in the
275	/// pool.
276	async fn submit_and_watch(
277		&self,
278		at: <Self::Block as BlockT>::Hash,
279		source: TransactionSource,
280		xt: TransactionFor<Self>,
281	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
282
283	// *** Block production / Networking
284	/// Get an iterator for ready transactions ordered by priority.
285	///
286	/// Guaranteed to resolve only when transaction pool got updated at `at` block.
287	/// Guaranteed to resolve immediately when `None` is passed.
288	async fn ready_at(
289		&self,
290		at: <Self::Block as BlockT>::Hash,
291	) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
292
293	/// Get an iterator for ready transactions ordered by priority.
294	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
295
296	// *** Block production
297	/// Reports invalid transactions to the transaction pool.
298	///
299	/// This function takes a map where the key is a transaction hash and the value is an
300	/// optional error encountered during the transaction execution, possibly within a specific
301	/// block.
302	///
303	/// The transaction pool implementation decides which transactions to remove. Transactions
304	/// removed from the pool will be notified with `TransactionStatus::Invalid` event (if
305	/// `submit_and_watch` was used for submission).
306	///
307	/// If the error associated to transaction is `None`, the transaction will be forcibly removed
308	/// from the pool.
309	///
310	/// The optional `at` parameter provides additional context regarding the block where the error
311	/// occurred.
312	///
313	/// Function returns the transactions actually removed from the pool.
314	async fn report_invalid(
315		&self,
316		at: Option<<Self::Block as BlockT>::Hash>,
317		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
318	) -> Vec<Arc<Self::InPoolTransaction>>;
319
320	// *** logging
321	/// Get futures transaction list.
322	fn futures(&self) -> Vec<Self::InPoolTransaction>;
323
324	/// Returns pool status.
325	fn status(&self) -> PoolStatus;
326
327	// *** logging / RPC / networking
328	/// Return an event stream of transactions imported to the pool.
329	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
330
331	// *** networking
332	/// Notify the pool about transactions broadcast.
333	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
334
335	/// Returns transaction hash
336	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
337
338	/// Return specific ready transaction by hash, if there is one.
339	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
340
341	/// Asynchronously returns a set of ready transaction at given block within given timeout.
342	///
343	/// If the timeout is hit during method execution, then the best effort (without executing full
344	/// maintain process) set of ready transactions for given block is returned.
345	async fn ready_at_with_timeout(
346		&self,
347		at: <Self::Block as BlockT>::Hash,
348		timeout: std::time::Duration,
349	) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
350}
351
352/// An iterator of ready transactions.
353///
354/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
355/// last-returned element as invalid.
356///
357/// The implementation is then allowed, for performance reasons, to change the elements
358/// returned next, by e.g.  skipping elements that are known to depend on the reported
359/// transaction, which yields them invalid as well.
360pub trait ReadyTransactions: Iterator {
361	/// Report given transaction as invalid.
362	///
363	/// This might affect subsequent elements returned by the iterator, so dependent transactions
364	/// are skipped for performance reasons.
365	fn report_invalid(&mut self, _tx: &Self::Item);
366}
367
368/// A no-op implementation for an empty iterator.
369impl<T> ReadyTransactions for std::iter::Empty<T> {
370	fn report_invalid(&mut self, _tx: &T) {}
371}
372
373/// Events that the transaction pool listens for.
374#[derive(Debug)]
375pub enum ChainEvent<B: BlockT> {
376	/// New best block have been added to the chain.
377	NewBestBlock {
378		/// Hash of the block.
379		hash: B::Hash,
380		/// Tree route from old best to new best parent that was calculated on import.
381		///
382		/// If `None`, no re-org happened on import.
383		tree_route: Option<Arc<sp_blockchain::TreeRoute<B>>>,
384	},
385	/// An existing block has been finalized.
386	Finalized {
387		/// Hash of just finalized block.
388		hash: B::Hash,
389		/// Path from old finalized to new finalized parent.
390		tree_route: Arc<[B::Hash]>,
391	},
392}
393
394impl<B: BlockT> ChainEvent<B> {
395	/// Returns the block hash associated to the event.
396	pub fn hash(&self) -> B::Hash {
397		match self {
398			Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
399		}
400	}
401
402	/// Is `self == Self::Finalized`?
403	pub fn is_finalized(&self) -> bool {
404		matches!(self, Self::Finalized { .. })
405	}
406}
407
408/// Trait for transaction pool maintenance.
409#[async_trait]
410pub trait MaintainedTransactionPool: TransactionPool {
411	/// Perform maintenance
412	async fn maintain(&self, event: ChainEvent<Self::Block>);
413}
414
415/// Transaction pool interface for submitting local transactions that exposes a
416/// blocking interface for submission.
417pub trait LocalTransactionPool: Send + Sync {
418	/// Block type.
419	type Block: BlockT;
420	/// Transaction hash type.
421	type Hash: Hash + Eq + Member + Serialize;
422	/// Error type.
423	type Error: From<crate::error::Error> + crate::error::IntoPoolError;
424
425	/// Submits the given local unverified transaction to the pool blocking the
426	/// current thread for any necessary pre-verification.
427	/// NOTE: It MUST NOT be used for transactions that originate from the
428	/// network or RPC, since the validation is performed with
429	/// `TransactionSource::Local`.
430	fn submit_local(
431		&self,
432		at: <Self::Block as BlockT>::Hash,
433		xt: LocalTransactionFor<Self>,
434	) -> Result<Self::Hash, Self::Error>;
435}
436
437impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
438	type Block = T::Block;
439
440	type Hash = T::Hash;
441
442	type Error = T::Error;
443
444	fn submit_local(
445		&self,
446		at: <Self::Block as BlockT>::Hash,
447		xt: LocalTransactionFor<Self>,
448	) -> Result<Self::Hash, Self::Error> {
449		(**self).submit_local(at, xt)
450	}
451}
452
453/// An abstraction for [`LocalTransactionPool`]
454///
455/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
456/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
457/// the wrapping in a `Arc`.
458trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
459	/// Submit transaction.
460	///
461	/// The transaction will end up in the pool and be propagated to others.
462	fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
463}
464
465impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
466	fn submit_at(
467		&self,
468		at: <TPool::Block as BlockT>::Hash,
469		extrinsic: <TPool::Block as BlockT>::Extrinsic,
470	) -> Result<(), ()> {
471		log::trace!(
472			target: LOG_TARGET,
473			"(offchain call) Submitting a transaction to the pool: {:?}",
474			extrinsic
475		);
476
477		let result = self.submit_local(at, extrinsic);
478
479		result.map(|_| ()).map_err(|e| {
480			log::warn!(
481				target: LOG_TARGET,
482				"(offchain call) Error submitting a transaction to the pool: {}",
483				e
484			)
485		})
486	}
487}
488
489/// Factory for creating [`TransactionPoolExt`]s.
490///
491/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
492/// the wasm execution environment to send transactions from an offchain call to the  runtime.
493#[derive(Clone)]
494pub struct OffchainTransactionPoolFactory<Block: BlockT> {
495	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
496}
497
498impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
499	/// Creates a new instance using the given `tx_pool`.
500	pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
501		Self { pool: Arc::new(tx_pool) as Arc<_> }
502	}
503
504	/// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
505	///
506	/// Transactions that are being submitted by this instance will be submitted with `block_hash`
507	/// as context for validation.
508	pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
509		TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
510	}
511}
512
513/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`].
514struct OffchainTransactionPool<Block: BlockT> {
515	block_hash: Block::Hash,
516	pool: Arc<dyn OffchainSubmitTransaction<Block>>,
517}
518
519impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
520	fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
521		let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
522			Ok(t) => t,
523			Err(e) => {
524				log::error!(
525					target: LOG_TARGET,
526					"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
527				);
528
529				return Err(())
530			},
531		};
532
533		self.pool.submit_at(self.block_hash, extrinsic)
534	}
535}
536
537/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
538mod v1_compatible {
539	use serde::{Deserialize, Deserializer, Serialize, Serializer};
540
541	pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
542	where
543		S: Serializer,
544		H: Serialize,
545	{
546		let (hash, _) = data;
547		serde::Serialize::serialize(&hash, serializer)
548	}
549
550	pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
551	where
552		D: Deserializer<'de>,
553		H: Deserialize<'de>,
554	{
555		let hash: H = serde::Deserialize::deserialize(deserializer)?;
556		Ok((hash, 0))
557	}
558}
559
560/// Transaction pool that rejects all submitted transactions.
561///
562/// Could be used for example in tests.
563pub struct RejectAllTxPool<Block>(PhantomData<Block>);
564
565impl<Block> Default for RejectAllTxPool<Block> {
566	fn default() -> Self {
567		Self(PhantomData)
568	}
569}
570
571impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
572	type Block = Block;
573
574	type Hash = Block::Hash;
575
576	type Error = error::Error;
577
578	fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
579		Err(error::Error::ImmediatelyDropped)
580	}
581}
582
583#[cfg(test)]
584mod tests {
585	use super::*;
586
587	#[test]
588	fn tx_status_compatibility() {
589		let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
590		let ser = serde_json::to_string(&event).unwrap();
591
592		let exp = r#"{"inBlock":1}"#;
593		assert_eq!(ser, exp);
594
595		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
596		assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
597
598		let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
599		let ser = serde_json::to_string(&event).unwrap();
600
601		let exp = r#"{"finalized":1}"#;
602		assert_eq!(ser, exp);
603
604		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
605		assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
606	}
607}