soil_client/transaction_pool/mod.rs
1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Transaction pool client facing API.
8#![warn(missing_docs)]
9
10pub mod error;
11
12use async_trait::async_trait;
13use codec::Codec;
14use futures::Stream;
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
17use subsoil::core::offchain::TransactionPoolExt;
18use subsoil::runtime::traits::{Block as BlockT, Member};
19
20const LOG_TARGET: &str = "txpool::api";
21
22pub use subsoil::runtime::transaction_validity::{
23 TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
24 TransactionValidityError,
25};
26
27/// Transaction pool status.
28#[derive(Debug, Clone)]
29pub struct PoolStatus {
30 /// Number of transactions in the ready queue.
31 pub ready: usize,
32 /// Sum of bytes of ready transaction encodings.
33 pub ready_bytes: usize,
34 /// Number of transactions in the future queue.
35 pub future: usize,
36 /// Sum of bytes of ready transaction encodings.
37 pub future_bytes: usize,
38}
39
40impl PoolStatus {
41 /// Returns true if there are no transactions in the pool.
42 pub fn is_empty(&self) -> bool {
43 self.ready == 0 && self.future == 0
44 }
45}
46
47/// Possible transaction status events.
48///
49/// These events are being emitted by `TransactionPool` watchers,
50/// which are also exposed over RPC.
51///
52/// The status events can be grouped based on their kinds as:
53/// 1. Entering/Moving within the pool:
54/// - [Future](TransactionStatus::Future)
55/// - [Ready](TransactionStatus::Ready)
56/// 2. Inside `Ready` queue:
57/// - [Broadcast](TransactionStatus::Broadcast)
58/// 3. Leaving the pool:
59/// - [InBlock](TransactionStatus::InBlock)
60/// - [Invalid](TransactionStatus::Invalid)
61/// - [Usurped](TransactionStatus::Usurped)
62/// - [Dropped](TransactionStatus::Dropped)
63/// 4. Re-entering the pool:
64/// - [Retracted](TransactionStatus::Retracted)
65/// 5. Block finalized:
66/// - [Finalized](TransactionStatus::Finalized)
67/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
68///
69/// Transactions are first placed in either the `Ready` or `Future` queues of the transaction pool.
70/// Substrate validates the transaction before it enters the pool.
71///
72/// A transaction is placed in the `Future` queue if it will become valid at a future time.
73/// For example, submitting a transaction with a higher account nonce than the current
74/// expected nonce will place the transaction in the `Future` queue.
75///
76/// The events will always be received in the order described above, however
77/// there might be cases where transactions alternate between `Future` and `Ready`
78/// pool, and are `Broadcast` in the meantime.
79///
80/// There is also only single event causing the transaction to leave the pool.
81/// I.e. only one of the listed ones should be triggered.
82///
83/// Note that there are conditions that may cause transactions to reappear in the pool.
84/// 1. Due to possible forks, the transaction that ends up being in included
85/// in one block, may later re-enter the pool or be marked as invalid.
86/// 2. Transaction `Dropped` at one point, may later re-enter the pool if some other
87/// transactions are removed. A `Dropped` transaction may re-enter the pool only if it is
88/// resubmitted.
89/// 3. `Invalid` transaction may become valid at some point in the future.
90/// (Note that runtimes are encouraged to use `UnknownValidity` to inform the pool about
91/// such case). An `Invalid` transaction may re-enter the pool only if it is resubmitted.
92/// 4. `Retracted` transactions might be included in some next block.
93///
94/// The `FinalityTimeout` event will be emitted when the block did not reach finality
95/// within 512 blocks. This either indicates that finality is not available for your chain,
96/// or that finality gadget is lagging behind. If you choose to wait for finality longer, you can
97/// re-subscribe for a particular transaction hash manually again.
98///
99/// ### Last Event
100///
101/// The stream is considered finished when one of the following events happen:
102/// - [Finalized](TransactionStatus::Finalized)
103/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
104/// - [Usurped](TransactionStatus::Usurped)
105/// - [Invalid](TransactionStatus::Invalid)
106/// - [Dropped](TransactionStatus::Dropped)
107///
108/// See [`TransactionStatus::is_final`] for more details.
109///
110/// ### Resubmit Transactions
111///
112/// Users might resubmit the transaction at a later time for the following events:
113/// - [FinalityTimeout](TransactionStatus::FinalityTimeout)
114/// - [Invalid](TransactionStatus::Invalid)
115/// - [Dropped](TransactionStatus::Dropped)
116///
117/// See [`TransactionStatus::is_retriable`] for more details.
118#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
119#[serde(rename_all = "camelCase")]
120pub enum TransactionStatus<Hash, BlockHash> {
121 /// Transaction is part of the future queue.
122 Future,
123 /// Transaction is part of the ready queue.
124 Ready,
125 /// The transaction has been broadcast to the given peers.
126 Broadcast(Vec<String>),
127 /// Transaction has been included in block with given hash
128 /// at the given position.
129 #[serde(with = "v1_compatible")]
130 InBlock((BlockHash, TxIndex)),
131 /// The block this transaction was included in has been retracted.
132 Retracted(BlockHash),
133 /// Maximum number of finality watchers has been reached,
134 /// old watchers are being removed.
135 FinalityTimeout(BlockHash),
136 /// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
137 #[serde(with = "v1_compatible")]
138 Finalized((BlockHash, TxIndex)),
139 /// Transaction has been replaced in the pool, by another transaction
140 /// that provides the same tags. (e.g. same (sender, nonce)).
141 Usurped(Hash),
142 /// Transaction has been dropped from the pool because of the limit.
143 Dropped,
144 /// Transaction is no longer valid in the current state.
145 Invalid,
146}
147
148impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
149 /// Returns true if this is the last event emitted by [`TransactionStatusStream`].
150 pub fn is_final(&self) -> bool {
151 // The state must be kept in sync with `crate::graph::Sender`.
152 match self {
153 Self::Usurped(_)
154 | Self::Finalized(_)
155 | Self::FinalityTimeout(_)
156 | Self::Invalid
157 | Self::Dropped => true,
158 _ => false,
159 }
160 }
161
162 /// Returns true if the transaction could be re-submitted to the pool in the future.
163 ///
164 /// For example, `TransactionStatus::Dropped` is retriable, because the transaction
165 /// may enter the pool if there is space for it in the future.
166 pub fn is_retriable(&self) -> bool {
167 match self {
168 // The number of finality watchers has been reached.
169 Self::FinalityTimeout(_) |
170 // An invalid transaction might be valid at a later time.
171 Self::Invalid |
172 // The transaction was dropped because of the limits of the pool.
173 // It can reenter the pool when other transactions are removed / finalized.
174 Self::Dropped => true,
175 _ => false,
176 }
177 }
178}
179
180/// The stream of transaction events.
181pub type TransactionStatusStream<Hash, BlockHash> =
182 dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
183
184/// The import notification event stream.
185pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
186
187/// Transaction hash type for a pool.
188pub type TxHash<P> = <P as TransactionPool>::Hash;
189/// Block hash type for a pool.
190pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
191/// Transaction type for a pool.
192pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
193/// Type of transactions event stream for a pool.
194pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
195/// Transaction type for a local pool.
196pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
197/// Transaction's index within the block in which it was included.
198pub type TxIndex = usize;
199/// Map containing validity errors associated with transaction hashes. Used to report invalid
200/// transactions to the pool.
201pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
202
203/// In-pool transaction interface.
204///
205/// The pool is container of transactions that are implementing this trait.
206/// See `subsoil::runtime::ValidTransaction` for details about every field.
207pub trait InPoolTransaction {
208 /// Transaction type.
209 type Transaction;
210 /// Transaction hash type.
211 type Hash;
212
213 /// Get the reference to the transaction data.
214 fn data(&self) -> &Self::Transaction;
215 /// Get hash of the transaction.
216 fn hash(&self) -> &Self::Hash;
217 /// Get priority of the transaction.
218 fn priority(&self) -> &TransactionPriority;
219 /// Get longevity of the transaction.
220 fn longevity(&self) -> &TransactionLongevity;
221 /// Get transaction dependencies.
222 fn requires(&self) -> &[TransactionTag];
223 /// Get tags that transaction provides.
224 fn provides(&self) -> &[TransactionTag];
225 /// Return a flag indicating if the transaction should be propagated to other peers.
226 fn is_propagable(&self) -> bool;
227}
228
229/// Transaction pool interface.
230#[async_trait]
231pub trait TransactionPool: Send + Sync {
232 /// Block type.
233 type Block: BlockT;
234 /// Transaction hash type.
235 type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
236 /// In-pool transaction type.
237 type InPoolTransaction: InPoolTransaction<
238 Transaction = Arc<TransactionFor<Self>>,
239 Hash = TxHash<Self>,
240 >;
241 /// Error type.
242 type Error: From<crate::transaction_pool::error::Error>
243 + crate::transaction_pool::error::IntoPoolError;
244
245 // *** RPC
246
247 /// Asynchronously imports a bunch of unverified transactions to the pool.
248 async fn submit_at(
249 &self,
250 at: <Self::Block as BlockT>::Hash,
251 source: TransactionSource,
252 xts: Vec<TransactionFor<Self>>,
253 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
254
255 /// Asynchronously imports one unverified transaction to the pool.
256 async fn submit_one(
257 &self,
258 at: <Self::Block as BlockT>::Hash,
259 source: TransactionSource,
260 xt: TransactionFor<Self>,
261 ) -> Result<TxHash<Self>, Self::Error>;
262
263 /// Asynchronously imports a single transaction and starts to watch their progress in the
264 /// pool.
265 async fn submit_and_watch(
266 &self,
267 at: <Self::Block as BlockT>::Hash,
268 source: TransactionSource,
269 xt: TransactionFor<Self>,
270 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
271
272 // *** Block production / Networking
273 /// Get an iterator for ready transactions ordered by priority.
274 ///
275 /// Guaranteed to resolve only when transaction pool got updated at `at` block.
276 /// Guaranteed to resolve immediately when `None` is passed.
277 async fn ready_at(
278 &self,
279 at: <Self::Block as BlockT>::Hash,
280 ) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
281
282 /// Get an iterator for ready transactions ordered by priority.
283 fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
284
285 // *** Block production
286 /// Reports invalid transactions to the transaction pool.
287 ///
288 /// This function takes a map where the key is a transaction hash and the value is an
289 /// optional error encountered during the transaction execution, possibly within a specific
290 /// block.
291 ///
292 /// The transaction pool implementation decides which transactions to remove. Transactions
293 /// removed from the pool will be notified with `TransactionStatus::Invalid` event (if
294 /// `submit_and_watch` was used for submission).
295 ///
296 /// If the error associated to transaction is `None`, the transaction will be forcibly removed
297 /// from the pool.
298 ///
299 /// The optional `at` parameter provides additional context regarding the block where the error
300 /// occurred.
301 ///
302 /// Function returns the transactions actually removed from the pool.
303 async fn report_invalid(
304 &self,
305 at: Option<<Self::Block as BlockT>::Hash>,
306 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
307 ) -> Vec<Arc<Self::InPoolTransaction>>;
308
309 // *** logging
310 /// Get futures transaction list.
311 fn futures(&self) -> Vec<Self::InPoolTransaction>;
312
313 /// Returns pool status.
314 fn status(&self) -> PoolStatus;
315
316 // *** logging / RPC / networking
317 /// Return an event stream of transactions imported to the pool.
318 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
319
320 // *** networking
321 /// Notify the pool about transactions broadcast.
322 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
323
324 /// Returns transaction hash
325 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
326
327 /// Return specific ready transaction by hash, if there is one.
328 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
329
330 /// Asynchronously returns a set of ready transaction at given block within given timeout.
331 ///
332 /// If the timeout is hit during method execution, then the best effort (without executing full
333 /// maintain process) set of ready transactions for given block is returned.
334 async fn ready_at_with_timeout(
335 &self,
336 at: <Self::Block as BlockT>::Hash,
337 timeout: std::time::Duration,
338 ) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
339}
340
341/// An iterator of ready transactions.
342///
343/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
344/// last-returned element as invalid.
345///
346/// The implementation is then allowed, for performance reasons, to change the elements
347/// returned next, by e.g. skipping elements that are known to depend on the reported
348/// transaction, which yields them invalid as well.
349pub trait ReadyTransactions: Iterator {
350 /// Report given transaction as invalid.
351 ///
352 /// This might affect subsequent elements returned by the iterator, so dependent transactions
353 /// are skipped for performance reasons.
354 fn report_invalid(&mut self, _tx: &Self::Item);
355}
356
357/// A no-op implementation for an empty iterator.
358impl<T> ReadyTransactions for std::iter::Empty<T> {
359 fn report_invalid(&mut self, _tx: &T) {}
360}
361
362/// Events that the transaction pool listens for.
363#[derive(Debug)]
364pub enum ChainEvent<B: BlockT> {
365 /// New best block have been added to the chain.
366 NewBestBlock {
367 /// Hash of the block.
368 hash: B::Hash,
369 /// Tree route from old best to new best parent that was calculated on import.
370 ///
371 /// If `None`, no re-org happened on import.
372 tree_route: Option<Arc<crate::blockchain::TreeRoute<B>>>,
373 },
374 /// An existing block has been finalized.
375 Finalized {
376 /// Hash of just finalized block.
377 hash: B::Hash,
378 /// Path from old finalized to new finalized parent.
379 tree_route: Arc<[B::Hash]>,
380 },
381}
382
383impl<B: BlockT> ChainEvent<B> {
384 /// Returns the block hash associated to the event.
385 pub fn hash(&self) -> B::Hash {
386 match self {
387 Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
388 }
389 }
390
391 /// Is `self == Self::Finalized`?
392 pub fn is_finalized(&self) -> bool {
393 matches!(self, Self::Finalized { .. })
394 }
395}
396
397/// Trait for transaction pool maintenance.
398#[async_trait]
399pub trait MaintainedTransactionPool: TransactionPool {
400 /// Perform maintenance
401 async fn maintain(&self, event: ChainEvent<Self::Block>);
402}
403
404/// Transaction pool interface for submitting local transactions that exposes a
405/// blocking interface for submission.
406pub trait LocalTransactionPool: Send + Sync {
407 /// Block type.
408 type Block: BlockT;
409 /// Transaction hash type.
410 type Hash: Hash + Eq + Member + Serialize;
411 /// Error type.
412 type Error: From<crate::transaction_pool::error::Error>
413 + crate::transaction_pool::error::IntoPoolError;
414
415 /// Submits the given local unverified transaction to the pool blocking the
416 /// current thread for any necessary pre-verification.
417 /// NOTE: It MUST NOT be used for transactions that originate from the
418 /// network or RPC, since the validation is performed with
419 /// `TransactionSource::Local`.
420 fn submit_local(
421 &self,
422 at: <Self::Block as BlockT>::Hash,
423 xt: LocalTransactionFor<Self>,
424 ) -> Result<Self::Hash, Self::Error>;
425}
426
427impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
428 type Block = T::Block;
429
430 type Hash = T::Hash;
431
432 type Error = T::Error;
433
434 fn submit_local(
435 &self,
436 at: <Self::Block as BlockT>::Hash,
437 xt: LocalTransactionFor<Self>,
438 ) -> Result<Self::Hash, Self::Error> {
439 (**self).submit_local(at, xt)
440 }
441}
442
443/// An abstraction for [`LocalTransactionPool`]
444///
445/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
446/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
447/// the wrapping in a `Arc`.
448trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
449 /// Submit transaction.
450 ///
451 /// The transaction will end up in the pool and be propagated to others.
452 fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
453}
454
455impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
456 fn submit_at(
457 &self,
458 at: <TPool::Block as BlockT>::Hash,
459 extrinsic: <TPool::Block as BlockT>::Extrinsic,
460 ) -> Result<(), ()> {
461 log::trace!(
462 target: LOG_TARGET,
463 "(offchain call) Submitting a transaction to the pool: {:?}",
464 extrinsic
465 );
466
467 let result = self.submit_local(at, extrinsic);
468
469 result.map(|_| ()).map_err(|e| {
470 log::warn!(
471 target: LOG_TARGET,
472 "(offchain call) Error submitting a transaction to the pool: {}",
473 e
474 )
475 })
476 }
477}
478
479/// Factory for creating [`TransactionPoolExt`]s.
480///
481/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
482/// the wasm execution environment to send transactions from an offchain call to the runtime.
483#[derive(Clone)]
484pub struct OffchainTransactionPoolFactory<Block: BlockT> {
485 pool: Arc<dyn OffchainSubmitTransaction<Block>>,
486}
487
488impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
489 /// Creates a new instance using the given `tx_pool`.
490 pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
491 Self { pool: Arc::new(tx_pool) as Arc<_> }
492 }
493
494 /// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
495 ///
496 /// Transactions that are being submitted by this instance will be submitted with `block_hash`
497 /// as context for validation.
498 pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
499 TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
500 }
501}
502
503/// Wraps a `pool` and `block_hash` to implement [`subsoil::core::offchain::TransactionPool`].
504struct OffchainTransactionPool<Block: BlockT> {
505 block_hash: Block::Hash,
506 pool: Arc<dyn OffchainSubmitTransaction<Block>>,
507}
508
509impl<Block: BlockT> subsoil::core::offchain::TransactionPool for OffchainTransactionPool<Block> {
510 fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
511 let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
512 Ok(t) => t,
513 Err(e) => {
514 log::error!(
515 target: LOG_TARGET,
516 "Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
517 );
518
519 return Err(());
520 },
521 };
522
523 self.pool.submit_at(self.block_hash, extrinsic)
524 }
525}
526
527/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
528mod v1_compatible {
529 use serde::{Deserialize, Deserializer, Serialize, Serializer};
530
531 pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
532 where
533 S: Serializer,
534 H: Serialize,
535 {
536 let (hash, _) = data;
537 serde::Serialize::serialize(&hash, serializer)
538 }
539
540 pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
541 where
542 D: Deserializer<'de>,
543 H: Deserialize<'de>,
544 {
545 let hash: H = serde::Deserialize::deserialize(deserializer)?;
546 Ok((hash, 0))
547 }
548}
549
550/// Transaction pool that rejects all submitted transactions.
551///
552/// Could be used for example in tests.
553pub struct RejectAllTxPool<Block>(PhantomData<Block>);
554
555impl<Block> Default for RejectAllTxPool<Block> {
556 fn default() -> Self {
557 Self(PhantomData)
558 }
559}
560
561impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
562 type Block = Block;
563
564 type Hash = Block::Hash;
565
566 type Error = error::Error;
567
568 fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
569 Err(error::Error::ImmediatelyDropped)
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn tx_status_compatibility() {
579 let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
580 let ser = serde_json::to_string(&event).unwrap();
581
582 let exp = r#"{"inBlock":1}"#;
583 assert_eq!(ser, exp);
584
585 let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
586 assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
587
588 let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
589 let ser = serde_json::to_string(&event).unwrap();
590
591 let exp = r#"{"finalized":1}"#;
592 assert_eq!(ser, exp);
593
594 let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
595 assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
596 }
597}