#![warn(missing_docs)]
pub mod error;
use async_trait::async_trait;
use codec::Codec;
use futures::Stream;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sp_core::offchain::TransactionPoolExt;
use sp_runtime::traits::{Block as BlockT, Member};
use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
const LOG_TARGET: &str = "txpool::api";
pub use sp_runtime::transaction_validity::{
TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
TransactionValidityError,
};
#[derive(Debug, Clone)]
pub struct PoolStatus {
pub ready: usize,
pub ready_bytes: usize,
pub future: usize,
pub future_bytes: usize,
}
impl PoolStatus {
pub fn is_empty(&self) -> bool {
self.ready == 0 && self.future == 0
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TransactionStatus<Hash, BlockHash> {
Future,
Ready,
Broadcast(Vec<String>),
#[serde(with = "v1_compatible")]
InBlock((BlockHash, TxIndex)),
Retracted(BlockHash),
FinalityTimeout(BlockHash),
#[serde(with = "v1_compatible")]
Finalized((BlockHash, TxIndex)),
Usurped(Hash),
Dropped,
Invalid,
}
impl<Hash, BlockHash> TransactionStatus<Hash, BlockHash> {
pub fn is_final(&self) -> bool {
match self {
Self::Usurped(_) |
Self::Finalized(_) |
Self::FinalityTimeout(_) |
Self::Invalid |
Self::Dropped => true,
_ => false,
}
}
pub fn is_retriable(&self) -> bool {
match self {
Self::FinalityTimeout(_) |
Self::Invalid |
Self::Dropped => true,
_ => false,
}
}
}
pub type TransactionStatusStream<Hash, BlockHash> =
dyn Stream<Item = TransactionStatus<Hash, BlockHash>> + Send;
pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>;
pub type TxHash<P> = <P as TransactionPool>::Hash;
pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
pub type TxIndex = usize;
pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
pub trait InPoolTransaction {
type Transaction;
type Hash;
fn data(&self) -> &Self::Transaction;
fn hash(&self) -> &Self::Hash;
fn priority(&self) -> &TransactionPriority;
fn longevity(&self) -> &TransactionLongevity;
fn requires(&self) -> &[TransactionTag];
fn provides(&self) -> &[TransactionTag];
fn is_propagable(&self) -> bool;
}
#[async_trait]
pub trait TransactionPool: Send + Sync {
type Block: BlockT;
type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
type InPoolTransaction: InPoolTransaction<
Transaction = Arc<TransactionFor<Self>>,
Hash = TxHash<Self>,
>;
type Error: From<crate::error::Error> + crate::error::IntoPoolError;
async fn submit_at(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
async fn submit_one(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> Result<TxHash<Self>, Self::Error>;
async fn submit_and_watch(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error>;
async fn ready_at(
&self,
at: <Self::Block as BlockT>::Hash,
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
async fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
) -> Vec<Arc<Self::InPoolTransaction>>;
fn futures(&self) -> Vec<Self::InPoolTransaction>;
fn status(&self) -> PoolStatus;
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>>;
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
async fn ready_at_with_timeout(
&self,
at: <Self::Block as BlockT>::Hash,
timeout: std::time::Duration,
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
}
pub trait ReadyTransactions: Iterator {
fn report_invalid(&mut self, _tx: &Self::Item);
}
impl<T> ReadyTransactions for std::iter::Empty<T> {
fn report_invalid(&mut self, _tx: &T) {}
}
#[derive(Debug)]
pub enum ChainEvent<B: BlockT> {
NewBestBlock {
hash: B::Hash,
tree_route: Option<Arc<sp_blockchain::TreeRoute<B>>>,
},
Finalized {
hash: B::Hash,
tree_route: Arc<[B::Hash]>,
},
}
impl<B: BlockT> ChainEvent<B> {
pub fn hash(&self) -> B::Hash {
match self {
Self::NewBestBlock { hash, .. } | Self::Finalized { hash, .. } => *hash,
}
}
pub fn is_finalized(&self) -> bool {
matches!(self, Self::Finalized { .. })
}
}
#[async_trait]
pub trait MaintainedTransactionPool: TransactionPool {
async fn maintain(&self, event: ChainEvent<Self::Block>);
}
pub trait LocalTransactionPool: Send + Sync {
type Block: BlockT;
type Hash: Hash + Eq + Member + Serialize;
type Error: From<crate::error::Error> + crate::error::IntoPoolError;
fn submit_local(
&self,
at: <Self::Block as BlockT>::Hash,
xt: LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error>;
}
impl<T: LocalTransactionPool> LocalTransactionPool for Arc<T> {
type Block = T::Block;
type Hash = T::Hash;
type Error = T::Error;
fn submit_local(
&self,
at: <Self::Block as BlockT>::Hash,
xt: LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
(**self).submit_local(at, xt)
}
}
trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
}
impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
fn submit_at(
&self,
at: <TPool::Block as BlockT>::Hash,
extrinsic: <TPool::Block as BlockT>::Extrinsic,
) -> Result<(), ()> {
log::trace!(
target: LOG_TARGET,
"(offchain call) Submitting a transaction to the pool: {:?}",
extrinsic
);
let result = self.submit_local(at, extrinsic);
result.map(|_| ()).map_err(|e| {
log::warn!(
target: LOG_TARGET,
"(offchain call) Error submitting a transaction to the pool: {}",
e
)
})
}
}
#[derive(Clone)]
pub struct OffchainTransactionPoolFactory<Block: BlockT> {
pool: Arc<dyn OffchainSubmitTransaction<Block>>,
}
impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: T) -> Self {
Self { pool: Arc::new(tx_pool) as Arc<_> }
}
pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
}
}
struct OffchainTransactionPool<Block: BlockT> {
block_hash: Block::Hash,
pool: Arc<dyn OffchainSubmitTransaction<Block>>,
}
impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
Ok(t) => t,
Err(e) => {
log::error!(
target: LOG_TARGET,
"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
);
return Err(());
},
};
self.pool.submit_at(self.block_hash, extrinsic)
}
}
mod v1_compatible {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
H: Serialize,
{
let (hash, _) = data;
serde::Serialize::serialize(&hash, serializer)
}
pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
where
D: Deserializer<'de>,
H: Deserialize<'de>,
{
let hash: H = serde::Deserialize::deserialize(deserializer)?;
Ok((hash, 0))
}
}
pub struct RejectAllTxPool<Block>(PhantomData<Block>);
impl<Block> Default for RejectAllTxPool<Block> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<Block: BlockT> LocalTransactionPool for RejectAllTxPool<Block> {
type Block = Block;
type Hash = Block::Hash;
type Error = error::Error;
fn submit_local(&self, _: Block::Hash, _: Block::Extrinsic) -> Result<Self::Hash, Self::Error> {
Err(error::Error::ImmediatelyDropped)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tx_status_compatibility() {
let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"inBlock":1}"#;
assert_eq!(ser, exp);
let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"finalized":1}"#;
assert_eq!(ser, exp);
let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
}
}