use futures::{future::join_all, FutureExt};
use itertools::Itertools;
use parking_lot::RwLock;
use soil_client::blockchain::HashAndNumber;
use soil_client::transaction_pool::{
error::IntoMetricsLabel, TransactionPriority, TransactionSource,
};
use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::{
atomic::{self, AtomicU64},
mpsc::{
channel as sync_bridge_channel, Receiver as SyncBridgeReceiver,
Sender as SyncBridgeSender,
},
Arc,
},
time::Instant,
};
use subsoil::runtime::{
traits::Block as BlockT,
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use tracing::{debug, trace};
use crate::{
common::tracing_log_xt::log_xt_trace,
graph::{self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash},
ValidateTransactionPriority, LOG_TARGET,
};
use super::{
metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
view_store::ViewStore,
};
mod tx_mem_pool_map;
pub(crate) const TXMEMPOOL_REVALIDATION_PERIOD: u64 = 10;
pub(crate) const TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE: usize = 1000;
const SYNC_BRIDGE_EXPECT: &str = "The mempool blocking task shall not be terminated. qed.";
#[derive(strum::Display)]
#[strum(serialize_all = "snake_case")]
pub(super) enum InvalidTxReason {
Invalid(String),
Unknown(String),
Subtree(String),
ValidationFailed(String),
}
impl InvalidTxReason {
pub(super) fn reason(&self) -> &String {
match self {
InvalidTxReason::Invalid(inner)
| InvalidTxReason::Unknown(inner)
| InvalidTxReason::Subtree(inner)
| InvalidTxReason::ValidationFailed(inner) => inner,
}
}
pub(super) fn category(&self) -> String {
self.to_string()
}
}
pub(crate) struct TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
watched: bool,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
source: TimedTransactionSource,
validated_at: AtomicU64,
priority: RwLock<Option<TransactionPriority>>,
}
impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
pub(crate) fn is_watched(&self) -> bool {
self.watched
}
fn new_unwatched(
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
validated_at: u64,
) -> Self {
Self::new(false, source, tx, bytes, validated_at)
}
fn new_watched(
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
validated_at: u64,
) -> Self {
Self::new(true, source, tx, bytes, validated_at)
}
fn new(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
validated_at: u64,
) -> Self {
Self::new_with_optional_priority(watched, source, tx, bytes, None, validated_at)
}
fn new_with_priority(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
priority: TransactionPriority,
validated_at: u64,
) -> Self {
Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority), validated_at)
}
fn new_with_optional_priority(
watched: bool,
source: TransactionSource,
tx: ExtrinsicFor<ChainApi>,
bytes: usize,
priority: Option<TransactionPriority>,
validated_at: u64,
) -> Self {
Self {
watched,
tx,
source: TimedTransactionSource::from_transaction_source(source, true),
validated_at: AtomicU64::new(validated_at),
bytes,
priority: priority.into(),
}
}
pub(crate) fn tx(&self) -> ExtrinsicFor<ChainApi> {
self.tx.clone()
}
pub(crate) fn source(&self) -> TimedTransactionSource {
self.source.clone()
}
pub(crate) fn priority(&self) -> Option<TransactionPriority> {
*self.priority.read()
}
}
impl<ChainApi, Block> std::fmt::Debug for TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxInMemPool")
.field("watched", &self.watched)
.field("tx", &"...")
.field("bytes", &self.bytes)
.field("source", &self.source)
.field("validated_at", &self.validated_at)
.field("priority", &self.priority)
.finish()
}
}
impl<ChainApi, Block> std::cmp::PartialEq for TxInMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn eq(&self, other: &Self) -> bool {
self.watched == other.watched
&& self.tx == other.tx
&& self.bytes == other.bytes
&& self.source == other.source
&& *self.priority.read() == *other.priority.read()
&& self.validated_at.load(atomic::Ordering::Relaxed)
== other.validated_at.load(atomic::Ordering::Relaxed)
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
struct MempoolTxPriority(pub Option<TransactionPriority>);
impl Ord for MempoolTxPriority {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (&self.0, &other.0) {
(Some(a), Some(b)) => a.cmp(b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
}
}
}
impl From<Option<TransactionPriority>> for MempoolTxPriority {
fn from(value: Option<TransactionPriority>) -> Self {
MempoolTxPriority(value)
}
}
impl PartialOrd for MempoolTxPriority {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<ChainApi, Block> tx_mem_pool_map::Size for Arc<TxInMemPool<ChainApi, Block>>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn size(&self) -> usize {
self.bytes
}
}
impl<ChainApi, Block> tx_mem_pool_map::PriorityAndTimestamp for Arc<TxInMemPool<ChainApi, Block>>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
type Priority = MempoolTxPriority;
type Timestamp = Option<Instant>;
fn priority(&self) -> Self::Priority {
TxInMemPool::priority(self).into()
}
fn timestamp(&self) -> Self::Timestamp {
self.source().timestamp
}
}
type InternalTxMemPoolMap<ChainApi, Block> = tx_mem_pool_map::SizeTrackedStore<
ExtrinsicHash<ChainApi>,
tx_mem_pool_map::PriorityKey<MempoolTxPriority, Option<Instant>>,
Arc<TxInMemPool<ChainApi, Block>>,
>;
pub type TxMemPoolBlockingTask = Pin<Box<dyn Future<Output = ()> + Send>>;
pub(super) struct TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
sync_channel: SyncBridgeSender<TxMemPoolSyncRequest<ChainApi, Block>>,
transactions: InternalTxMemPoolMap<ChainApi, Block>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
}
#[derive(Debug)]
pub(super) struct InsertionInfo<Hash> {
pub(super) hash: Hash,
pub(super) source: TimedTransactionSource,
pub(super) removed: Vec<Hash>,
}
impl<Hash> InsertionInfo<Hash> {
fn new(hash: Hash, source: TimedTransactionSource) -> Self {
Self::new_with_removed(hash, source, Default::default())
}
fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
Self { hash, source, removed }
}
}
impl<ChainApi, Block> TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
metrics: PrometheusMetrics,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> (Self, TxMemPoolBlockingTask) {
let (sync_channel, rx) = sync_bridge_channel();
let task = Self::sync_bridge_task(rx);
(
Self {
api,
listener,
sync_channel,
transactions: Default::default(),
metrics,
max_transactions_count,
max_transactions_total_bytes,
},
task.boxed(),
)
}
#[cfg(test)]
fn new_test(
api: Arc<ChainApi>,
max_transactions_count: usize,
max_transactions_total_bytes: usize,
) -> Self {
let (sync_channel, rx) = sync_bridge_channel();
tokio::task::spawn_blocking(move || Self::sync_bridge_task(rx));
Self {
api,
listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0),
transactions: Default::default(),
metrics: Default::default(),
sync_channel,
max_transactions_count,
max_transactions_total_bytes,
}
}
pub(super) async fn get_by_hash(
&self,
hash: ExtrinsicHash<ChainApi>,
) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
self.transactions.read().await.get(&hash).map(Clone::clone)
}
pub async fn unwatched_and_watched_count(&self) -> (usize, usize) {
let transactions = self.transactions.read().await;
let watched_count = transactions.values().filter(|t| t.is_watched()).count();
(transactions.len() - watched_count, watched_count)
}
pub fn len(&self) -> usize {
self.transactions.len()
}
#[cfg(test)]
pub fn bytes(&self) -> usize {
return self.transactions.bytes();
}
fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
length > self.max_transactions_count
|| current_total_bytes > self.max_transactions_total_bytes
}
async fn try_insert(
&self,
tx_hash: ExtrinsicHash<ChainApi>,
tx: TxInMemPool<ChainApi, Block>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>
{
let mut transactions = self.transactions.write().await;
let bytes = self.transactions.bytes();
let result = match (
self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
transactions.contains_key(&tx_hash),
) {
(false, false) => {
let source = tx.source();
transactions.insert(tx_hash, Arc::from(tx));
Ok(InsertionInfo::new(tx_hash, source))
},
(_, true) => {
Err(soil_client::transaction_pool::error::Error::AlreadyImported(Box::new(tx_hash)))
},
(true, _) => Err(soil_client::transaction_pool::error::Error::ImmediatelyDropped),
};
trace!(
target: LOG_TARGET,
?tx_hash,
result_hash = ?result.as_ref().map(|r| r.hash),
"mempool::try_insert"
);
result
}
pub(super) async fn try_insert_with_replacement(
&self,
new_tx: ExtrinsicFor<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
validated_at: u64,
watched: bool,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>
{
let (hash, length) = self.api.hash_and_length(&new_tx);
let new_tx =
TxInMemPool::new_with_priority(watched, source, new_tx, length, priority, validated_at);
if new_tx.bytes > self.max_transactions_total_bytes {
return Err(soil_client::transaction_pool::error::Error::ImmediatelyDropped);
}
let mut transactions = self.transactions.write().await;
if transactions.contains_key(&hash) {
return Err(soil_client::transaction_pool::error::Error::AlreadyImported(Box::new(
hash,
)));
}
let source = new_tx.source();
let new_tx = Arc::new(new_tx);
let insertion_result = transactions.try_insert_with_replacement(
self.max_transactions_total_bytes,
hash,
new_tx,
);
debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
match insertion_result {
None => Err(soil_client::transaction_pool::error::Error::ImmediatelyDropped),
Some(to_be_removed) => Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed)),
}
}
pub(super) async fn extend_unwatched(
&self,
source: TransactionSource,
validated_at: u64,
xts: &[ExtrinsicFor<ChainApi>],
) -> Vec<
Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>,
> {
let insert_futures = xts.into_iter().map(|xt| {
let api = self.api.clone();
let xt = xt.clone();
async move {
let (hash, length) = api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_unwatched(source, xt, length, validated_at))
.await
}
});
join_all(insert_futures).await
}
pub(super) async fn push_watched(
&self,
source: TransactionSource,
validated_at: u64,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>
{
let (hash, length) = self.api.hash_and_length(&xt);
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length, validated_at))
.await
}
pub(super) async fn with_transactions<F, R>(&self, f: F) -> R
where
F: Fn(
std::collections::hash_map::Iter<
ExtrinsicHash<ChainApi>,
Arc<TxInMemPool<ChainApi, Block>>,
>,
) -> R,
{
self.transactions.read().await.with_items(f)
}
pub(super) async fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) {
log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction");
let mut transactions = self.transactions.write().await;
for tx_hash in tx_hashes {
transactions.remove(tx_hash);
}
}
async fn revalidate_inner(
&self,
view_store: Arc<ViewStore<ChainApi, Block>>,
finalized_block: HashAndNumber<Block>,
) -> HashSet<ExtrinsicHash<ChainApi>> {
trace!(
target: LOG_TARGET,
?finalized_block,
"mempool::revalidate_inner"
);
let start = Instant::now();
let (total_count, to_be_validated) = {
(
self.transactions.len(),
self.with_transactions(|iter| {
iter.filter(|(_, xt)| {
let finalized_block_number = finalized_block.number.into().as_u64();
xt.validated_at.load(atomic::Ordering::Relaxed)
+ TXMEMPOOL_REVALIDATION_PERIOD
< finalized_block_number
})
.sorted_by_key(|(_, tx)| tx.validated_at.load(atomic::Ordering::Relaxed))
.take(TXMEMPOOL_MAX_REVALIDATION_BATCH_SIZE)
.map(|(k, v)| (*k, v.clone()))
.collect::<Vec<_>>()
})
.await,
)
};
let validations_futures = to_be_validated.into_iter().map(|(xt_hash, xt)| {
self.api
.validate_transaction(
finalized_block.hash,
xt.source.clone().into(),
xt.tx(),
ValidateTransactionPriority::Maintained,
)
.map(move |validation_result| {
xt.validated_at
.store(finalized_block.number.into().as_u64(), atomic::Ordering::Relaxed);
(xt_hash, validation_result)
})
});
let validation_results = futures::future::join_all(validations_futures).await;
let validated_count = validation_results.len();
let duration = start.elapsed();
let invalid_hashes = validation_results
.into_iter()
.filter_map(|(tx_hash, validation_result)| match validation_result {
Ok(Ok(_) | Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => {
None
},
Err(ref error) => {
trace!(
target: LOG_TARGET,
?tx_hash,
?validation_result,
"mempool::revalidate_inner error during revalidation"
);
Some((tx_hash, InvalidTxReason::ValidationFailed(error.label())))
},
Ok(Err(TransactionValidityError::Unknown(error))) => {
trace!(
target: LOG_TARGET,
?tx_hash,
?validation_result,
"mempool::revalidate_inner cannot determine transaction validity"
);
Some((tx_hash, InvalidTxReason::Unknown(error.as_ref().to_string())))
},
Ok(Err(TransactionValidityError::Invalid(error))) => {
trace!(
target: LOG_TARGET,
?tx_hash,
?validation_result,
"mempool::revalidate_inner transaction is invalid"
);
Some((tx_hash, InvalidTxReason::Invalid(error.as_ref().to_string())))
},
})
.collect::<Vec<_>>();
let mut invalid_hashes_subtrees = Vec::new();
for (tx, reason) in &invalid_hashes {
let txs_in_subtree = view_store
.remove_transaction_subtree(*tx, |_, _| {})
.into_iter()
.map(|tx| (tx.hash, InvalidTxReason::Subtree(reason.to_string())));
invalid_hashes_subtrees.extend(txs_in_subtree);
}
let revalidated_invalid_hashes_len = invalid_hashes.len();
let invalid_hashes = invalid_hashes
.into_iter()
.chain(invalid_hashes_subtrees)
.map(|(tx, reason)| {
self.metrics
.report(|metrics| metrics.mempool_revalidation_invalid_txs.observe(&reason, 1));
tx
})
.collect::<HashSet<_>>();
debug!(
target: LOG_TARGET,
?finalized_block,
validated_count,
total_count,
invalid_hashes_subtrees_len = invalid_hashes.len(),
revalidated_invalid_hashes_len,
?duration,
"mempool::revalidate_inner"
);
invalid_hashes
}
pub(super) async fn purge_finalized_transactions(
&self,
finalized_xts: &Vec<ExtrinsicHash<ChainApi>>,
) {
debug!(
target: LOG_TARGET,
count = finalized_xts.len(),
"purge_finalized_transactions"
);
log_xt_trace!(target: LOG_TARGET, finalized_xts, "purged finalized transactions");
let mut transactions = self.transactions.write().await;
finalized_xts.iter().for_each(|t| {
transactions.remove(t);
});
}
pub(super) async fn revalidate(
&self,
view_store: Arc<ViewStore<ChainApi, Block>>,
finalized_block: HashAndNumber<Block>,
) {
let invalid_hashes_subtrees =
self.revalidate_inner(view_store.clone(), finalized_block.clone()).await;
{
let mut transactions = self.transactions.write().await;
invalid_hashes_subtrees.iter().for_each(|tx_hash| {
transactions.remove(&tx_hash);
});
};
let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::<Vec<_>>();
self.listener.transactions_invalidated(invalid_hashes_subtrees.as_slice());
view_store
.import_notification_sink
.clean_notified_items(invalid_hashes_subtrees.as_slice());
view_store
.dropped_stream_controller
.remove_transactions(invalid_hashes_subtrees);
trace!(
target: LOG_TARGET,
?finalized_block,
"mempool::revalidate"
);
}
pub(super) async fn update_transaction_priority(
&self,
hash: ExtrinsicHash<ChainApi>,
prio: Option<TransactionPriority>,
) {
if let Some(priority) = prio {
let mut transactions = self.transactions.write().await;
transactions.update_item(&hash, |t| {
*t.priority.write() = Some(priority);
});
}
}
pub(super) async fn count_unknown_transactions<'a>(
&self,
hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>,
) -> usize {
let transactions = self.transactions.read().await;
hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count()
}
}
type ExtendUnwatchedResult<ChainApi> = Vec<
Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>,
>;
type TryInsertWithReplacementResult<ChainApi> =
Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>;
enum TxMemPoolSyncRequest<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
RemoveTransactions(
Arc<TxMemPool<ChainApi, Block>>,
Vec<ExtrinsicHash<ChainApi>>,
SyncBridgeSender<()>,
),
ExtendUnwatched(
Arc<TxMemPool<ChainApi, Block>>,
TransactionSource,
u64,
Vec<ExtrinsicFor<ChainApi>>,
SyncBridgeSender<ExtendUnwatchedResult<ChainApi>>,
),
UpdateTransactionPriority(
Arc<TxMemPool<ChainApi, Block>>,
ExtrinsicHash<ChainApi>,
Option<TransactionPriority>,
SyncBridgeSender<()>,
),
TryInsertWithReplacement(
Arc<TxMemPool<ChainApi, Block>>,
ExtrinsicFor<ChainApi>,
TransactionPriority,
TransactionSource,
u64,
bool,
SyncBridgeSender<TryInsertWithReplacementResult<ChainApi>>,
),
}
impl<ChainApi, Block> TxMemPoolSyncRequest<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
fn remove_transactions(
mempool: Arc<TxMemPool<ChainApi, Block>>,
hashes: Vec<ExtrinsicHash<ChainApi>>,
) -> (SyncBridgeReceiver<()>, Self) {
let (tx, rx) = sync_bridge_channel();
(rx, Self::RemoveTransactions(mempool, hashes, tx))
}
fn extend_unwatched(
mempool: Arc<TxMemPool<ChainApi, Block>>,
source: TransactionSource,
validated_at: u64,
xts: Vec<ExtrinsicFor<ChainApi>>,
) -> (SyncBridgeReceiver<ExtendUnwatchedResult<ChainApi>>, Self) {
let (tx, rx) = sync_bridge_channel();
(rx, Self::ExtendUnwatched(mempool, source, validated_at, xts, tx))
}
fn update_transaction_priority(
mempool: Arc<TxMemPool<ChainApi, Block>>,
hash: ExtrinsicHash<ChainApi>,
prio: Option<TransactionPriority>,
) -> (SyncBridgeReceiver<()>, Self) {
let (tx, rx) = sync_bridge_channel();
(rx, Self::UpdateTransactionPriority(mempool, hash, prio, tx))
}
fn try_insert_with_replacement(
mempool: Arc<TxMemPool<ChainApi, Block>>,
new_tx: ExtrinsicFor<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
validated_at: u64,
watched: bool,
) -> (SyncBridgeReceiver<TryInsertWithReplacementResult<ChainApi>>, Self) {
let (tx, rx) = sync_bridge_channel();
(
rx,
Self::TryInsertWithReplacement(
mempool,
new_tx,
priority,
source,
validated_at,
watched,
tx,
),
)
}
}
impl<ChainApi, Block> TxMemPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
async fn sync_bridge_task(rx: SyncBridgeReceiver<TxMemPoolSyncRequest<ChainApi, Block>>) {
for request in rx {
Self::handle_request(request).await;
}
}
async fn handle_request(request: TxMemPoolSyncRequest<ChainApi, Block>) {
match request {
TxMemPoolSyncRequest::RemoveTransactions(mempool, hashes, tx) => {
mempool.remove_transactions(&hashes).await;
if let Err(error) = tx.send(()) {
debug!(target: LOG_TARGET, ?error, "RemoveTransaction: sending response failed");
}
},
TxMemPoolSyncRequest::ExtendUnwatched(mempool, source, validated_at, txs, tx) => {
let result = mempool.extend_unwatched(source, validated_at, &txs).await;
if let Err(error) = tx.send(result) {
debug!(target: LOG_TARGET, ?error, "ExtendUnwatched: sending response failed");
}
},
TxMemPoolSyncRequest::UpdateTransactionPriority(mempool, hash, prio, tx) => {
let result = mempool.update_transaction_priority(hash, prio).await;
if let Err(error) = tx.send(result) {
debug!(target: LOG_TARGET, ?error, "UpdateTransactionPriority2: sending response failed");
}
},
TxMemPoolSyncRequest::TryInsertWithReplacement(
mempool,
new_tx,
priority,
source,
validated_at,
watched,
tx,
) => {
let result = mempool
.try_insert_with_replacement(new_tx, priority, source, validated_at, watched)
.await;
if let Err(error) = tx.send(result) {
debug!(target: LOG_TARGET, ?error, "TryInsertWithReplacementSync: sending response failed");
}
},
}
}
pub(super) fn try_insert_with_replacement_sync(
self: Arc<Self>,
new_tx: ExtrinsicFor<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
validated_at: u64,
watched: bool,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>
{
let (response, request) = TxMemPoolSyncRequest::try_insert_with_replacement(
self.clone(),
new_tx,
priority,
source,
validated_at,
watched,
);
let _ = self.sync_channel.send(request);
response.recv().expect(SYNC_BRIDGE_EXPECT)
}
pub(super) fn extend_unwatched_sync(
self: Arc<Self>,
source: TransactionSource,
validated_at: u64,
xts: Vec<ExtrinsicFor<ChainApi>>,
) -> Vec<
Result<InsertionInfo<ExtrinsicHash<ChainApi>>, soil_client::transaction_pool::error::Error>,
> {
let (response, request) =
TxMemPoolSyncRequest::extend_unwatched(self.clone(), source, validated_at, xts);
let _ = self.sync_channel.send(request);
response.recv().expect(SYNC_BRIDGE_EXPECT)
}
pub(super) fn remove_transactions_sync(
self: Arc<Self>,
tx_hashes: Vec<ExtrinsicHash<ChainApi>>,
) {
let (response, request) =
TxMemPoolSyncRequest::remove_transactions(self.clone(), tx_hashes);
let _ = self.sync_channel.send(request);
response.recv().expect(SYNC_BRIDGE_EXPECT)
}
pub(super) fn update_transaction_priority_sync(
self: Arc<Self>,
hash: ExtrinsicHash<ChainApi>,
prio: Option<TransactionPriority>,
) {
let (response, request) =
TxMemPoolSyncRequest::update_transaction_priority(self.clone(), hash, prio);
let _ = self.sync_channel.send(request);
response.recv().expect(SYNC_BRIDGE_EXPECT)
}
}
#[cfg(test)]
mod tx_mem_pool_tests {
use futures::future::join_all;
use soil_test_node_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
use soil_test_node_runtime_client::Sr25519Keyring::*;
use crate::{
common::tests::TestApi, fork_aware_txpool::view_store::ViewStoreSubmitOutcome,
graph::ChainApi,
};
use super::*;
fn uxt(nonce: u64) -> Extrinsic {
crate::common::tests::uxt(Transfer {
from: Alice.into(),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
})
}
#[tokio::test]
async fn extend_unwatched_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().take(max).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
}
#[tokio::test]
async fn extend_unwatched_detects_already_imported() {
subsoil::tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
xts.push(xts.iter().last().unwrap().clone());
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().take(max - 1).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
soil_client::transaction_pool::error::Error::AlreadyImported(_)
));
}
#[tokio::test]
async fn push_obeys_limit() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
let xt = Arc::from(uxt(98));
let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
assert!(matches!(
result.unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
let xt = Arc::from(uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
assert!(matches!(
result.pop().unwrap().unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
}
#[tokio::test]
async fn push_detects_already_imported() {
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
let xt0 = xts.iter().last().unwrap().clone();
let xt1 = xts.iter().next().unwrap().clone();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
let result = mempool.push_watched(TransactionSource::External, 0, xt0).await;
assert!(matches!(
result.unwrap_err(),
soil_client::transaction_pool::error::Error::AlreadyImported(_)
));
let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt1]).await;
assert!(matches!(
result.pop().unwrap().unwrap_err(),
soil_client::transaction_pool::error::Error::AlreadyImported(_)
));
}
#[tokio::test]
async fn count_works() {
subsoil::tracing::try_init_simple();
trace!(target:LOG_TARGET,line=line!(),"xxx");
let max = 100;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api, max, usize::MAX);
trace!(target:LOG_TARGET,line=line!(),"xxx");
let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
trace!(target:LOG_TARGET,line=line!(),"xxx");
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts0).await;
trace!(target:LOG_TARGET,line=line!(),"xxx");
assert!(results.iter().all(Result::is_ok));
trace!(target:LOG_TARGET,line=line!(),"xxx");
let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
trace!(target:LOG_TARGET,line=line!(),"xxx");
let results = xts1
.into_iter()
.map(|t| mempool.push_watched(TransactionSource::External, 0, t));
trace!(target:LOG_TARGET,line=line!(),"xxx");
let results = join_all(results).await;
trace!(target:LOG_TARGET,line=line!(),"xxx");
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.unwatched_and_watched_count().await, (10, 5));
}
const LARGE_XT_SIZE: usize = 1129;
fn large_uxt(x: usize) -> Extrinsic {
ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
}
#[tokio::test]
async fn push_obeys_size_limit() {
subsoil::tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
let xt = Arc::from(large_uxt(98));
let result = mempool.push_watched(TransactionSource::External, 0, xt).await;
assert!(matches!(
result.unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
let xt = Arc::from(large_uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, 0, &[xt]).await;
assert!(matches!(
result.pop().unwrap().unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
}
#[tokio::test]
async fn replacing_txs_works_for_same_tx_size() {
subsoil::tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let low_prio = 0u64;
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
for o in submit_outcomes {
mempool.update_transaction_priority(o.hash(), o.priority()).await;
}
let xt = Arc::from(large_uxt(98));
let hash = api.hash_and_length(&xt).0;
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
.await
.unwrap();
assert_eq!(result.hash, hash);
assert_eq!(result.removed, hashes[0..1]);
}
#[tokio::test]
async fn replacing_txs_removes_proper_size_of_txs() {
subsoil::tracing::try_init_simple();
let max = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let low_prio = 0u64;
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
for o in submit_outcomes {
mempool.update_transaction_priority(o.hash(), o.priority()).await;
}
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
let (hash, length) = api.hash_and_length(&xt);
assert_eq!(length, 1130);
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
.await
.unwrap();
assert_eq!(result.hash, hash);
assert_eq!(result.removed, hashes[0..2]);
}
#[tokio::test]
async fn replacing_txs_removes_proper_size_and_prios() {
subsoil::tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let (submit_outcomes, hashes): (Vec<ViewStoreSubmitOutcome<TestApi>>, Vec<_>) = xts
.iter()
.enumerate()
.map(|(prio, t)| {
let h = api.hash_and_length(t).0;
(ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
})
.unzip();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
for o in submit_outcomes {
mempool.update_transaction_priority(o.hash(), o.priority()).await;
}
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
let (hash, length) = api.hash_and_length(&xt);
assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
.await
.unwrap();
assert_eq!(result.hash, hash);
assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
}
#[tokio::test]
async fn replacing_txs_skips_lower_prio_tx() {
subsoil::tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = 100u64;
let low_prio = 10u64;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let submit_outcomes: Vec<ViewStoreSubmitOutcome<TestApi>> = xts
.iter()
.map(|t| {
let h = api.hash_and_length(t).0;
ViewStoreSubmitOutcome::new(h, Some(hi_prio))
})
.collect::<Vec<_>>();
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
for o in submit_outcomes {
mempool.update_transaction_priority(o.hash(), o.priority()).await;
}
let xt = Arc::from(large_uxt(98));
let result = mempool
.try_insert_with_replacement(xt, low_prio, TransactionSource::External, 0, false)
.await;
assert!(matches!(
result.unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
}
#[tokio::test]
async fn replacing_txs_is_skipped_if_prios_are_not_set() {
subsoil::tracing::try_init_simple();
const COUNT: usize = 10;
let api = Arc::from(TestApi::default());
let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
let hi_prio = u64::MAX;
let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
let results = mempool.extend_unwatched(TransactionSource::External, 0, &xts).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(mempool.bytes(), total_xts_bytes);
let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
let length = api.hash_and_length(&xt).1;
assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
let result = mempool
.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, 0, false)
.await;
assert!(matches!(
result.unwrap_err(),
soil_client::transaction_pool::error::Error::ImmediatelyDropped
));
}
}