sc_transaction_pool_api/lib.rs
1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Transaction pool client facing API.
20#![warn(missing_docs)]
21
22pub mod error;
23
24use async_trait::async_trait;
25use codec::Codec;
26use futures::Stream;
27use serde::{de::DeserializeOwned, Deserialize, Serialize};
28use sp_core::offchain::TransactionPoolExt;
29use sp_runtime::traits::{Block as BlockT, Member};
30use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
31
32const LOG_TARGET: &str = "txpool::api";
33
34pub use sp_runtime::transaction_validity::{
35 TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
36 TransactionValidityError,
37};
38
39/// Transaction pool status.
40#[derive(Debug, Clone)]
41pub struct PoolStatus {
42 /// Number of transactions in the ready queue.
43 pub ready: usize,
44 /// Sum of bytes of ready transaction encodings.
45 pub ready_bytes: usize,
46 /// Number of transactions in the future queue.
47 pub future: usize,
48 /// Sum of bytes of ready transaction encodings.
49 pub future_bytes: usize,
50}
51
52impl PoolStatus {
53 /// Returns true if there are no transactions in the pool.
54 pub fn is_empty(&self) -> bool {
55 self.ready == 0 && self.future == 0
56 }
57}
58
59/// Possible transaction status events.
60///
61/// These events are being emitted by `TransactionPool` watchers,
62/// which are also exposed over RPC.
63///
64/// The status events can be grouped based on their kinds as:
65/// 1. Entering/Moving within the pool:
66/// - [Future](TransactionStatus::Future)
67/// - [Ready](TransactionStatus::Ready)
68/// 2. Inside `Ready` queue:
69/// - [Broadcast](TransactionStatus::Broadcast)
70/// 3. Leaving the pool:
71/// - [InBlock](TransactionStatus::InBlock)
72/// - [Invalid](TransactionStatus::Invalid)
73/// - [Usurped](TransactionStatus::Usurped)
74/// - [Dropped](TransactionStatus::Dropped)
75/// 4. Re-entering the pool:
76/// - [Retracted](TransactionStatus::Retracted)
77/// 5. Block finalized:
78/// - [Finalized](TransactionStatus::Finalized)
79/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
80///
81/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
82/// Substrate validates the transaction before it enters the pool.
83///
84/// A transaction is placed in the `Future` queue if it will become valid at a future time.
85/// For example, submitting a transaction with a higher account nonce than the current
86/// expected nonce will place the transaction in the `Future` queue.
87///
88/// The events will always be received in the order described above, however
89/// there might be cases where transactions alternate between `Future` and `Ready`
90/// pool, and are `Broadcast` in the meantime.
91///
92/// There is also only single event causing the transaction to leave the pool.
93/// I.e. only one of the listed ones should be triggered.
94///
95/// Note that there are conditions that may cause transactions to reappear in the pool.
96/// 1. Due to possible forks, the transaction that ends up being in included
97/// in one block, may later re-enter the pool or be marked as invalid.
98/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
99/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
100/// resubmitted.
101/// 3. `Invalid` transaction may become valid at some point in the future.
102/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
103/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
104/// 4. `Retracted` transactions might be included in some next block.
105///
106/// The `FinalityTimeout` event will be emitted when the block did not reach finality
107/// within 512 blocks. This either indicates that finality is not available for your chain,
108/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
109/// re-subscribe for a particular transaction hash manually again.
110///
111/// ### Last Event
112///
113/// The stream is considered finished when one of the following events happen:
114/// - [Finalized](TransactionStatus::Finalized)
115/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
116/// - [Usurped](TransactionStatus::Usurped)
117/// - [Invalid](TransactionStatus::Invalid)
118/// - [Dropped](TransactionStatus::Dropped)
119///
120/// See [`TransactionStatus::is_final`] for more details.
121///
122/// ### Resubmit Transactions
123///
124/// Users might resubmit the transaction at a later time for the following events:
125/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
126/// - [Invalid](TransactionStatus::Invalid)
127/// - [Dropped](TransactionStatus::Dropped)
128///
129/// See [`TransactionStatus::is_retriable`] for more details.
130#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub enum TransactionStatus<Hash, BlockHash> {
133 /// Transaction is part of the future queue.
134 Future,
135 /// Transaction is part of the ready queue.
136 Ready,
137 /// The transaction has been broadcast to the given peers.
138 Broadcast(Vec<String>),
139 /// Transaction has been included in block with given hash
140 /// at the given position.
141 #[serde(with = "v1_compatible")]
142 InBlock((BlockHash, TxIndex)),
143 /// The block this transaction was included in has been retracted.
144 Retracted(BlockHash),
145 /// Maximum number of finality watchers has been reached,
146 /// old watchers are being removed.
147 FinalityTimeout(BlockHash),
148 /// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
149 #[serde(with = "v1_compatible")]
150 Finalized((BlockHash, TxIndex)),
151 /// Transaction has been replaced in the pool, by another transaction
152 /// that provides the same tags. (e.g. same (sender, nonce)).
153 Usurped(Hash),
154 /// Transaction has been dropped from the pool because of the limit.
155 Dropped,
156 /// Transaction is no longer valid in the current state.
157 Invalid,
158}
159
160impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
161 /// Returns true if this is the last event emitted by [`TransactionStatusStream`].
162 pub fn is_final(&self) -> bool {
163 // The state must be kept in sync with `crate::graph::Sender`.
164 match self {
165 Self::Usurped(_) |
166 Self::Finalized(_) |
167 Self::FinalityTimeout(_) |
168 Self::Invalid |
169 Self::Dropped => true,
170 _ => false,
171 }
172 }
173
174 /// Returns true if the transaction could be re-submitted to the pool in the future.
175 ///
176 /// For example, `TransactionStatus::Dropped` is retriable, because the transaction
177 /// may enter the pool if there is space for it in the future.
178 pub fn is_retriable(&self) -> bool {
179 match self {
180 // The number of finality watchers has been reached.
181 Self::FinalityTimeout(_) |
182 // An invalid transaction might be valid at a later time.
183 Self::Invalid |
184 // The transaction was dropped because of the limits of the pool.
185 // It can reenter the pool when other transactions are removed / finalized.
186 Self::Dropped => true,
187 _ => false,
188 }
189 }
190}
191
192/// The stream of transaction events.
193pub type TransactionStatusStream<Hash, BlockHash> =
194 dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
195
196/// The import notification event stream.
197pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
198
199/// Transaction hash type for a pool.
200pub type TxHash<P> = <P as TransactionPool>::Hash;
201/// Block hash type for a pool.
202pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
203/// Transaction type for a pool.
204pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
205/// Type of transactions event stream for a pool.
206pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
207/// Transaction type for a local pool.
208pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
209/// Transaction's index within the block in which it was included.
210pub type TxIndex = usize;
211/// Map containing validity errors associated with transaction hashes. Used to report invalid
212/// transactions to the pool.
213pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
214
215/// In-pool transaction interface.
216///
217/// The pool is container of transactions that are implementing this trait.
218/// See `sp_runtime::ValidTransaction` for details about every field.
219pub trait InPoolTransaction {
220 /// Transaction type.
221 type Transaction;
222 /// Transaction hash type.
223 type Hash;
224
225 /// Get the reference to the transaction data.
226 fn data(&self) -> &Self::Transaction;
227 /// Get hash of the transaction.
228 fn hash(&self) -> &Self::Hash;
229 /// Get priority of the transaction.
230 fn priority(&self) -> &TransactionPriority;
231 /// Get longevity of the transaction.
232 fn longevity(&self) -> &TransactionLongevity;
233 /// Get transaction dependencies.
234 fn requires(&self) -> &[TransactionTag];
235 /// Get tags that transaction provides.
236 fn provides(&self) -> &[TransactionTag];
237 /// Return a flag indicating if the transaction should be propagated to other peers.
238 fn is_propagable(&self) -> bool;
239}
240
241/// Transaction pool interface.
242#[async_trait]
243pub trait TransactionPool: Send + Sync {
244 /// Block type.
245 type Block: BlockT;
246 /// Transaction hash type.
247 type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
248 /// In-pool transaction type.
249 type InPoolTransaction: InPoolTransaction<
250 Transaction = Arc<TransactionFor<Self>>,
251 Hash = TxHash<Self>,
252 >;
253 /// Error type.
254 type Error: From<crate::error::Error> + crate::error::IntoPoolError;
255
256 // *** RPC
257
258 /// Asynchronously imports a bunch of unverified transactions to the pool.
259 async fn submit_at(
260 &self,
261 at: <Self::Block as BlockT>::Hash,
262 source: TransactionSource,
263 xts: Vec<TransactionFor<Self>>,
264 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
265
266 /// Asynchronously imports one unverified transaction to the pool.
267 async fn submit_one(
268 &self,
269 at: <Self::Block as BlockT>::Hash,
270 source: TransactionSource,
271 xt: TransactionFor<Self>,
272 ) -> Result<TxHash<Self>, Self::Error>;
273
274 /// Asynchronously imports a single transaction and starts to watch their progress in the
275 /// pool.
276 async fn submit_and_watch(
277 &self,
278 at: <Self::Block as BlockT>::Hash,
279 source: TransactionSource,
280 xt: TransactionFor<Self>,
281 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
282
283 // *** Block production / Networking
284 /// Get an iterator for ready transactions ordered by priority.
285 ///
286 /// Guaranteed to resolve only when transaction pool got updated at `at` block.
287 /// Guaranteed to resolve immediately when `None` is passed.
288 async fn ready_at(
289 &self,
290 at: <Self::Block as BlockT>::Hash,
291 ) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
292
293 /// Get an iterator for ready transactions ordered by priority.
294 fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
295
296 // *** Block production
297 /// Reports invalid transactions to the transaction pool.
298 ///
299 /// This function takes a map where the key is a transaction hash and the value is an
300 /// optional error encountered during the transaction execution, possibly within a specific
301 /// block.
302 ///
303 /// The transaction pool implementation decides which transactions to remove. Transactions
304 /// removed from the pool will be notified with `TransactionStatus::Invalid` event (if
305 /// `submit_and_watch` was used for submission).
306 ///
307 /// If the error associated to transaction is `None`, the transaction will be forcibly removed
308 /// from the pool.
309 ///
310 /// The optional `at` parameter provides additional context regarding the block where the error
311 /// occurred.
312 ///
313 /// Function returns the transactions actually removed from the pool.
314 async fn report_invalid(
315 &self,
316 at: Option<<Self::Block as BlockT>::Hash>,
317 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
318 ) -> Vec<Arc<Self::InPoolTransaction>>;
319
320 // *** logging
321 /// Get futures transaction list.
322 fn futures(&self) -> Vec<Self::InPoolTransaction>;
323
324 /// Returns pool status.
325 fn status(&self) -> PoolStatus;
326
327 // *** logging / RPC / networking
328 /// Return an event stream of transactions imported to the pool.
329 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
330
331 // *** networking
332 /// Notify the pool about transactions broadcast.
333 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
334
335 /// Returns transaction hash
336 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
337
338 /// Return specific ready transaction by hash, if there is one.
339 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
340
341 /// Asynchronously returns a set of ready transaction at given block within given timeout.
342 ///
343 /// If the timeout is hit during method execution, then the best effort (without executing full
344 /// maintain process) set of ready transactions for given block is returned.
345 async fn ready_at_with_timeout(
346 &self,
347 at: <Self::Block as BlockT>::Hash,
348 timeout: std::time::Duration,
349 ) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
350}
351
352/// An iterator of ready transactions.
353///
354/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
355/// last-returned element as invalid.
356///
357/// The implementation is then allowed, for performance reasons, to change the elements
358/// returned next, by e.g. skipping elements that are known to depend on the reported
359/// transaction, which yields them invalid as well.
360pub trait ReadyTransactions: Iterator {
361 /// Report given transaction as invalid.
362 ///
363 /// This might affect subsequent elements returned by the iterator, so dependent transactions
364 /// are skipped for performance reasons.
365 fn report_invalid(&mut self, _tx: &Self::Item);
366}
367
368/// A no-op implementation for an empty iterator.
369impl<T> ReadyTransactions for std::iter::Empty<T> {
370 fn report_invalid(&mut self, _tx: &T) {}
371}
372
373/// Events that the transaction pool listens for.
374#[derive(Debug)]
375pub enum ChainEvent<B: BlockT> {
376 /// New best block have been added to the chain.
377 NewBestBlock {
378 /// Hash of the block.
379 hash: B::Hash,
380 /// Tree route from old best to new best parent that was calculated on import.
381 ///
382 /// If `None`, no re-org happened on import.
383 tree_route: Option<Arc<sp_blockchain::TreeRoute<B>>>,
384 },
385 /// An existing block has been finalized.
386 Finalized {
387 /// Hash of just finalized block.
388 hash: B::Hash,
389 /// Path from old finalized to new finalized parent.
390 tree_route: Arc<[B::Hash]>,
391 },
392}
393
394impl<B: BlockT> ChainEvent<B> {
395 /// Returns the block hash associated to the event.
396 pub fn hash(&self) -> B::Hash {
397 match self {
398 Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
399 }
400 }
401
402 /// Is `self == Self::Finalized`?
403 pub fn is_finalized(&self) -> bool {
404 matches!(self, Self::Finalized { .. })
405 }
406}
407
408/// Trait for transaction pool maintenance.
409#[async_trait]
410pub trait MaintainedTransactionPool: TransactionPool {
411 /// Perform maintenance
412 async fn maintain(&self, event: ChainEvent<Self::Block>);
413}
414
415/// Transaction pool interface for submitting local transactions that exposes a
416/// blocking interface for submission.
417pub trait LocalTransactionPool: Send + Sync {
418 /// Block type.
419 type Block: BlockT;
420 /// Transaction hash type.
421 type Hash: Hash + Eq + Member + Serialize;
422 /// Error type.
423 type Error: From<crate::error::Error> + crate::error::IntoPoolError;
424
425 /// Submits the given local unverified transaction to the pool blocking the
426 /// current thread for any necessary pre-verification.
427 /// NOTE: It MUST NOT be used for transactions that originate from the
428 /// network or RPC, since the validation is performed with
429 /// `TransactionSource::Local`.
430 fn submit_local(
431 &self,
432 at: <Self::Block as BlockT>::Hash,
433 xt: LocalTransactionFor<Self>,
434 ) -> Result<Self::Hash, Self::Error>;
435}
436
437impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
438 type Block = T::Block;
439
440 type Hash = T::Hash;
441
442 type Error = T::Error;
443
444 fn submit_local(
445 &self,
446 at: <Self::Block as BlockT>::Hash,
447 xt: LocalTransactionFor<Self>,
448 ) -> Result<Self::Hash, Self::Error> {
449 (**self).submit_local(at, xt)
450 }
451}
452
453/// An abstraction for [`LocalTransactionPool`]
454///
455/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
456/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
457/// the wrapping in a `Arc`.
458trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
459 /// Submit transaction.
460 ///
461 /// The transaction will end up in the pool and be propagated to others.
462 fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
463}
464
465impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
466 fn submit_at(
467 &self,
468 at: <TPool::Block as BlockT>::Hash,
469 extrinsic: <TPool::Block as BlockT>::Extrinsic,
470 ) -> Result<(), ()> {
471 log::trace!(
472 target: LOG_TARGET,
473 "(offchain call) Submitting a transaction to the pool: {:?}",
474 extrinsic
475 );
476
477 let result = self.submit_local(at, extrinsic);
478
479 result.map(|_| ()).map_err(|e| {
480 log::warn!(
481 target: LOG_TARGET,
482 "(offchain call) Error submitting a transaction to the pool: {}",
483 e
484 )
485 })
486 }
487}
488
489/// Factory for creating [`TransactionPoolExt`]s.
490///
491/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
492/// the wasm execution environment to send transactions from an offchain call to the runtime.
493#[derive(Clone)]
494pub struct OffchainTransactionPoolFactory<Block: BlockT> {
495 pool: Arc<dyn OffchainSubmitTransaction<Block>>,
496}
497
498impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
499 /// Creates a new instance using the given `tx_pool`.
500 pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
501 Self { pool: Arc::new(tx_pool) as Arc<_> }
502 }
503
504 /// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
505 ///
506 /// Transactions that are being submitted by this instance will be submitted with `block_hash`
507 /// as context for validation.
508 pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
509 TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
510 }
511}
512
513/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`].
514struct OffchainTransactionPool<Block: BlockT> {
515 block_hash: Block::Hash,
516 pool: Arc<dyn OffchainSubmitTransaction<Block>>,
517}
518
519impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
520 fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
521 let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
522 Ok(t) => t,
523 Err(e) => {
524 log::error!(
525 target: LOG_TARGET,
526 "Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
527 );
528
529 return Err(())
530 },
531 };
532
533 self.pool.submit_at(self.block_hash, extrinsic)
534 }
535}
536
537/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
538mod v1_compatible {
539 use serde::{Deserialize, Deserializer, Serialize, Serializer};
540
541 pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
542 where
543 S: Serializer,
544 H: Serialize,
545 {
546 let (hash, _) = data;
547 serde::Serialize::serialize(&hash, serializer)
548 }
549
550 pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
551 where
552 D: Deserializer<'de>,
553 H: Deserialize<'de>,
554 {
555 let hash: H = serde::Deserialize::deserialize(deserializer)?;
556 Ok((hash, 0))
557 }
558}
559
560/// Transaction pool that rejects all submitted transactions.
561///
562/// Could be used for example in tests.
563pub struct RejectAllTxPool<Block>(PhantomData<Block>);
564
565impl<Block> Default for RejectAllTxPool<Block> {
566 fn default() -> Self {
567 Self(PhantomData)
568 }
569}
570
571impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
572 type Block = Block;
573
574 type Hash = Block::Hash;
575
576 type Error = error::Error;
577
578 fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
579 Err(error::Error::ImmediatelyDropped)
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[test]
588 fn tx_status_compatibility() {
589 let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
590 let ser = serde_json::to_string(&event).unwrap();
591
592 let exp = r#"{"inBlock":1}"#;
593 assert_eq!(ser, exp);
594
595 let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
596 assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
597
598 let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
599 let ser = serde_json::to_string(&event).unwrap();
600
601 let exp = r#"{"finalized":1}"#;
602 assert_eq!(ser, exp);
603
604 let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
605 assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
606 }
607}