use super::{
import_notification_sink::MultiViewImportNotificationSink,
multi_view_listener::{MultiViewListener, TxStatusStream},
view::{View, ViewPoolObserver},
};
use crate::{
fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController,
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
BaseSubmitOutcome, BlockHash, ExtrinsicFor, ExtrinsicHash, TransactionFor,
ValidatedPoolSubmitOutcome,
},
ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
};
use itertools::Itertools;
use parking_lot::RwLock;
use soil_client::blockchain::{HashAndNumber, TreeRoute};
use soil_client::transaction_pool::{
error::Error as PoolError, PoolStatus, TransactionTag as Tag, TxInvalidityReportMap,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
time::Instant,
};
use subsoil::runtime::{
generic::BlockId,
traits::{Block as BlockT, Header, One, Saturating},
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use tracing::{debug, instrument, trace, warn, Level};
#[derive(Clone)]
struct PendingTxSubmission<ChainApi>
where
ChainApi: graph::ChainApi,
{
xt: ExtrinsicFor<ChainApi>,
source: TimedTransactionSource,
}
type RemovalCallback<ChainApi> = Arc<
dyn Fn(
&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
ExtrinsicHash<ChainApi>,
) + Send
+ Sync,
>;
struct PendingTxRemoval<ChainApi>
where
ChainApi: graph::ChainApi,
{
xt_hash: ExtrinsicHash<ChainApi>,
listener_action: RemovalCallback<ChainApi>,
}
enum PreInsertAction<ChainApi>
where
ChainApi: graph::ChainApi,
{
SubmitTx(PendingTxSubmission<ChainApi>),
RemoveSubtree(PendingTxRemoval<ChainApi>),
}
struct PendingPreInsertTask<ChainApi>
where
ChainApi: graph::ChainApi,
{
action: PreInsertAction<ChainApi>,
processed: bool,
}
impl<ChainApi> PendingPreInsertTask<ChainApi>
where
ChainApi: graph::ChainApi,
{
fn new_submission_action(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource) -> Self {
Self {
processed: false,
action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source }),
}
}
fn new_removal_action(
xt_hash: ExtrinsicHash<ChainApi>,
listener: RemovalCallback<ChainApi>,
) -> Self {
Self {
processed: false,
action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
xt_hash,
listener_action: listener,
}),
}
}
fn mark_processed(&mut self) {
self.processed = true;
}
}
pub(super) struct ViewStore<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block>,
{
pub(super) api: Arc<ChainApi>,
pub(super) active_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
pub(super) inactive_views: RwLock<HashMap<Block::Hash, Arc<View<ChainApi>>>>,
pub(super) listener: Arc<MultiViewListener<ChainApi>>,
pub(super) most_recent_view: RwLock<Option<Arc<View<ChainApi>>>>,
pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
pub(super) import_notification_sink:
MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
}
pub(super) type ViewStoreSubmitOutcome<ChainApi> =
BaseSubmitOutcome<ChainApi, TxStatusStream<ChainApi>>;
impl<ChainApi: graph::ChainApi> From<ValidatedPoolSubmitOutcome<ChainApi>>
for ViewStoreSubmitOutcome<ChainApi>
{
fn from(value: ValidatedPoolSubmitOutcome<ChainApi>) -> Self {
Self::new(value.hash(), value.priority())
}
}
impl<ChainApi, Block> ViewStore<ChainApi, Block>
where
Block: BlockT,
ChainApi: graph::ChainApi<Block = Block> + 'static,
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
import_notification_sink: MultiViewImportNotificationSink<
Block::Hash,
ExtrinsicHash<ChainApi>,
>,
) -> Self {
Self {
api,
active_views: Default::default(),
inactive_views: Default::default(),
listener,
most_recent_view: RwLock::from(None),
dropped_stream_controller,
import_notification_sink,
pending_txs_tasks: Default::default(),
}
}
pub(super) async fn submit(
&self,
xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
) -> HashMap<Block::Hash, Vec<Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error>>> {
let submit_futures = {
let active_views = self.active_views.read();
active_views
.values()
.map(|view| {
let view = view.clone();
let xts = xts.clone();
async move {
(
view.at.hash,
view.submit_many(xts, ValidateTransactionPriority::Submitted)
.await
.into_iter()
.map(|r| r.map(Into::into))
.collect::<Vec<_>>(),
)
}
})
.collect::<Vec<_>>()
};
let results = futures::future::join_all(submit_futures).await;
HashMap::<_, _>::from_iter(results.into_iter())
}
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
let active_views = self.active_views.read().values().cloned().collect::<Vec<_>>();
let tx_hash = self.api.hash_and_length(&xt).0;
let result = active_views
.iter()
.map(|view| view.submit_local(xt.clone()))
.find_or_first(Result::is_ok);
match result {
Some(Err(error)) => {
trace!(
target: LOG_TARGET,
?tx_hash,
%error,
"submit_local failed"
);
Err(error)
},
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
Some(Ok(r)) => Ok(r.into()),
}
}
#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::sumbit_and_watch")]
pub(super) async fn submit_and_watch(
&self,
_at: Block::Hash,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
let tx_hash = self.api.hash_and_length(&xt).0;
let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into());
};
let submit_futures = {
let active_views = self.active_views.read();
active_views
.values()
.map(|view| {
let view = view.clone();
let xt = xt.clone();
let source = source.clone();
async move {
view.submit_one(source, xt, ValidateTransactionPriority::Submitted).await
}
})
.collect::<Vec<_>>()
};
let result = futures::future::join_all(submit_futures)
.await
.into_iter()
.find_or_first(Result::is_ok);
match result {
Some(Err(error)) => {
trace!(
target: LOG_TARGET,
?tx_hash,
%error,
"submit_and_watch failed"
);
return Err(error);
},
Some(Ok(result)) => {
Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher))
},
None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
}
}
pub(super) fn status(&self) -> HashMap<Block::Hash, PoolStatus> {
self.active_views.read().iter().map(|(h, v)| (*h, v.status())).collect()
}
pub(super) fn is_empty(&self) -> bool {
self.active_views.read().is_empty() && self.inactive_views.read().is_empty()
}
pub(super) fn find_view_descendent_up_to_number(
&self,
at: &HashAndNumber<Block>,
up_to: <<Block as BlockT>::Header as Header>::Number,
) -> Option<(Arc<View<ChainApi>>, Vec<Block::Hash>)> {
let mut enacted_blocks = Vec::new();
let mut at_hash = at.hash;
let mut at_number = at.number;
while at_number >= up_to {
if let Some((view, _)) = self.get_view_at(at_hash, true) {
return Some((view, enacted_blocks));
}
enacted_blocks.push(at_hash);
let header = self.api.block_header(at_hash).ok().flatten()?;
at_hash = *header.parent_hash();
at_number = at_number.saturating_sub(One::one());
}
None
}
pub(super) fn find_best_view(
&self,
tree_route: &TreeRoute<Block>,
) -> Option<Arc<View<ChainApi>>> {
let active_views = self.active_views.read();
let best_view = {
tree_route
.retracted()
.iter()
.chain(std::iter::once(tree_route.common_block()))
.chain(tree_route.enacted().iter())
.rev()
.find(|block| active_views.contains_key(&block.hash))
};
best_view.map(|h| {
active_views
.get(&h.hash)
.expect("hash was just found in the map's keys. qed")
.clone()
})
}
pub(super) fn ready(&self) -> ReadyIteratorFor<ChainApi> {
let ready_iterator =
self.most_recent_view.read().as_ref().map(|v| v.pool.validated_pool().ready());
if let Some(ready_iterator) = ready_iterator {
return Box::new(ready_iterator);
} else {
return Box::new(std::iter::empty());
}
}
pub(super) fn futures(
&self,
) -> Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>> {
self.most_recent_view
.read()
.as_ref()
.and_then(|view| self.futures_at(view.at.hash))
.unwrap_or_default()
}
pub(super) fn futures_at(
&self,
at: Block::Hash,
) -> Option<Vec<Transaction<ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>>>> {
self.get_view_at(at, true)
.map(|(v, _)| v.pool.validated_pool().pool.read().futures().cloned().collect())
}
pub(super) async fn finalize_route(
&self,
finalized_hash: Block::Hash,
tree_route: &[Block::Hash],
) -> Vec<ExtrinsicHash<ChainApi>> {
debug!(
target: LOG_TARGET,
?finalized_hash,
?tree_route,
"finalize_route"
);
let mut finalized_transactions = Vec::new();
for block in tree_route.iter().chain(std::iter::once(&finalized_hash)) {
let extrinsics = self
.api
.block_body(*block)
.await
.unwrap_or_else(|error| {
warn!(
target: LOG_TARGET,
%error,
"Finalize route: error request"
);
None
})
.unwrap_or_default()
.iter()
.map(|e| self.api.hash_and_length(&e).0)
.collect::<Vec<_>>();
extrinsics
.iter()
.enumerate()
.for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i));
finalized_transactions.extend(extrinsics);
}
debug!(
target: LOG_TARGET,
"finalize_route: done"
);
finalized_transactions
}
pub(super) fn ready_transaction(
&self,
at: Block::Hash,
tx_hash: &ExtrinsicHash<ChainApi>,
) -> Option<TransactionFor<ChainApi>> {
self.active_views
.read()
.get(&at)
.and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash))
}
#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::insert_new_view")]
pub(super) async fn insert_new_view(
&self,
view: Arc<View<ChainApi>>,
tree_route: &TreeRoute<Block>,
) {
self.apply_pending_tx_replacements(view.clone()).await;
let start = Instant::now();
self.insert_new_view_sync(view, tree_route);
debug!(
target: LOG_TARGET,
inactive_views = ?self.inactive_views.read().keys(),
duration = ?start.elapsed(),
"insert_new_view"
);
}
pub(super) fn insert_new_view_sync(
&self,
view: Arc<View<ChainApi>>,
tree_route: &TreeRoute<Block>,
) {
{
let mut most_recent_view_lock = self.most_recent_view.write();
let mut active_views = self.active_views.write();
let mut inactive_views = self.inactive_views.write();
std::iter::once(tree_route.common_block())
.chain(tree_route.enacted().iter())
.map(|block| block.hash)
.for_each(|hash| {
active_views.remove(&hash).map(|view| {
inactive_views.insert(hash, view);
});
});
active_views.insert(view.at.hash, view.clone());
most_recent_view_lock.replace(view.clone());
};
}
pub(super) fn get_view_at(
&self,
at: Block::Hash,
allow_inactive: bool,
) -> Option<(Arc<View<ChainApi>>, bool)> {
if let Some(view) = self.active_views.read().get(&at) {
return Some((view.clone(), false));
}
if allow_inactive {
if let Some(view) = self.inactive_views.read().get(&at) {
return Some((view.clone(), true));
}
};
None
}
pub(crate) async fn handle_finalized(
&self,
finalized_hash: Block::Hash,
tree_route: &[Block::Hash],
) -> Vec<ExtrinsicHash<ChainApi>> {
let finalized_xts = self.finalize_route(finalized_hash, tree_route).await;
let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
let mut dropped_views = vec![];
{
let mut active_views = self.active_views.write();
let mut inactive_views = self.inactive_views.write();
active_views.retain(|hash, v| {
let retain = match finalized_number {
Err(_) | Ok(None) => *hash == finalized_hash,
Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
Ok(Some(n)) => v.at.number > n,
};
if !retain {
dropped_views.push(*hash);
}
retain
});
inactive_views.retain(|hash, v| {
let retain = match finalized_number {
Err(_) | Ok(None) => false,
Ok(Some(n)) => v.at.number >= n,
};
if !retain {
dropped_views.push(*hash);
}
retain
});
debug!(
target: LOG_TARGET,
inactive_views = ?inactive_views.keys(),
?dropped_views,
"handle_finalized"
);
}
self.listener.remove_stale_controllers();
self.dropped_stream_controller.remove_transactions(finalized_xts.clone());
for view in dropped_views {
self.listener.remove_view(view);
self.dropped_stream_controller.remove_view(view);
}
finalized_xts
}
pub(crate) async fn finish_background_revalidations(&self) {
let start = Instant::now();
let finish_revalidation_futures = {
let active_views = self.active_views.read();
active_views
.values()
.map(|view| {
let view = view.clone();
async move { view.finish_revalidation().await }
})
.collect::<Vec<_>>()
};
debug!(
target: LOG_TARGET,
duration = ?start.elapsed(),
"finish_background_revalidations before"
);
futures::future::join_all(finish_revalidation_futures).await;
debug!(
target: LOG_TARGET,
duration = ?start.elapsed(),
"finish_background_revalidations after"
);
}
pub(crate) fn report_invalid(
&self,
at: Option<Block::Hash>,
invalid_tx_errors: TxInvalidityReportMap<ExtrinsicHash<ChainApi>>,
) -> Vec<TransactionFor<ChainApi>> {
let mut remove_from_view = vec![];
let mut remove_from_pool = vec![];
invalid_tx_errors.into_iter().for_each(|(hash, e)| match e {
Some(TransactionValidityError::Invalid(
InvalidTransaction::Future | InvalidTransaction::Stale,
)) => {
remove_from_view.push(hash);
},
_ => {
remove_from_pool.push(hash);
},
});
at.map(|at| {
self.get_view_at(at, true)
.map(|(view, _)| view.remove_subtree(&remove_from_view, false, |_, _| {}))
});
let mut removed = vec![];
for tx_hash in &remove_from_pool {
let removed_from_pool = self.remove_transaction_subtree(*tx_hash, |_, _| {});
removed_from_pool
.iter()
.find(|tx| tx.hash == *tx_hash)
.map(|tx| removed.push(tx.clone()));
}
self.listener.transactions_invalidated(&remove_from_pool);
removed
}
pub(super) async fn replace_transaction(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
replaced: ExtrinsicHash<ChainApi>,
) {
if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
entry.insert(PendingPreInsertTask::new_submission_action(xt.clone(), source.clone()));
} else {
return;
};
let tx_hash = self.api.hash_and_length(&xt).0;
trace!(
target: LOG_TARGET,
?replaced,
?tx_hash,
"replace_transaction"
);
self.replace_transaction_in_views(source, xt, tx_hash, replaced).await;
if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
replacement.mark_processed();
}
}
#[instrument(level = Level::TRACE, skip_all, target = "txpool", name = "view_store::apply_pending_tx_replacements")]
async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
let start = Instant::now();
let mut futures = vec![];
let mut replace_count = 0;
let mut remove_count = 0;
for replacement in self.pending_txs_tasks.read().values() {
match replacement.action {
PreInsertAction::SubmitTx(ref submission) => {
replace_count += 1;
let xt_hash = self.api.hash_and_length(&submission.xt).0;
futures.push(self.replace_transaction_in_view(
view.clone(),
submission.source.clone(),
submission.xt.clone(),
xt_hash,
));
},
PreInsertAction::RemoveSubtree(ref removal) => {
remove_count += 1;
view.remove_subtree(&[removal.xt_hash], true, &*removal.listener_action);
},
}
}
let _ = futures::future::join_all(futures).await;
self.pending_txs_tasks.write().retain(|_, r| !r.processed);
debug!(
target: LOG_TARGET,
at_hash = ?view.at.hash,
replace_count,
remove_count,
duration = ?start.elapsed(),
count = ?self.pending_txs_tasks.read().len(),
"apply_pending_tx_replacements"
);
}
async fn replace_transaction_in_view(
&self,
view: Arc<View<ChainApi>>,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
tx_hash: ExtrinsicHash<ChainApi>,
) {
if let Err(error) =
view.submit_one(source, xt, ValidateTransactionPriority::Maintained).await
{
trace!(
target: LOG_TARGET,
?tx_hash,
at_hash = ?view.at.hash,
%error,
"replace_transaction: submit failed"
);
}
}
async fn replace_transaction_in_views(
&self,
source: TimedTransactionSource,
xt: ExtrinsicFor<ChainApi>,
tx_hash: ExtrinsicHash<ChainApi>,
replaced: ExtrinsicHash<ChainApi>,
) {
let submit_futures = {
let active_views = self.active_views.read();
let inactive_views = self.inactive_views.read();
active_views
.iter()
.chain(inactive_views.iter())
.filter(|(_, view)| view.is_imported(&replaced))
.map(|(_, view)| {
self.replace_transaction_in_view(
view.clone(),
source.clone(),
xt.clone(),
tx_hash,
)
})
.collect::<Vec<_>>()
};
let _results = futures::future::join_all(submit_futures).await;
}
pub(super) fn remove_transaction_subtree<F>(
&self,
xt_hash: ExtrinsicHash<ChainApi>,
listener_action: F,
) -> Vec<TransactionFor<ChainApi>>
where
F: Fn(
&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
ExtrinsicHash<ChainApi>,
) + Clone
+ Send
+ Sync
+ 'static,
{
if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
entry.insert(PendingPreInsertTask::new_removal_action(
xt_hash,
Arc::from(listener_action.clone()),
));
};
let mut seen = HashSet::new();
let removed = self
.active_views
.read()
.iter()
.chain(self.inactive_views.read().iter())
.filter(|(_, view)| view.is_imported(&xt_hash))
.flat_map(|(_, view)| view.remove_subtree(&[xt_hash], true, &listener_action))
.filter_map(|xt| seen.insert(xt.hash).then(|| xt.clone()))
.collect();
if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
removal_action.mark_processed();
}
removed
}
pub(crate) fn finality_stall_view_cleanup(&self, at: &HashAndNumber<Block>, threshold: usize) {
let mut dropped_views = vec![];
{
let mut active_views = self.active_views.write();
let mut inactive_views = self.inactive_views.write();
let mut f = |hash: &BlockHash<ChainApi>, v: &View<ChainApi>| -> bool {
let diff = at.number.saturating_sub(v.at.number);
if diff.into() > threshold.into() {
dropped_views.push(*hash);
false
} else {
true
}
};
active_views.retain(|h, v| f(h, v));
inactive_views.retain(|h, v| f(h, v));
}
if !dropped_views.is_empty() {
for view in dropped_views {
self.listener.remove_view(view);
self.dropped_stream_controller.remove_view(view);
}
}
}
pub(crate) fn provides_tags_from_inactive_views(
&self,
block_hashes: Vec<&HashAndNumber<Block>>,
mut xts_hashes: Vec<ExtrinsicHash<ChainApi>>,
) -> HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>> {
let mut provides_tags_map = HashMap::new();
block_hashes.into_iter().for_each(|hn| {
if let Some((view, _)) = self.get_view_at(hn.hash, true) {
let provides_tags = view.pool.validated_pool().extrinsics_tags(&xts_hashes);
let xts_provides_tags = xts_hashes
.iter()
.zip(provides_tags.into_iter())
.filter_map(|(hash, maybe_tags)| maybe_tags.map(|tags| (*hash, tags)))
.collect::<HashMap<ExtrinsicHash<ChainApi>, Vec<Tag>>>();
xts_hashes.retain(|xth| !xts_provides_tags.contains_key(xth));
provides_tags_map.extend(xts_provides_tags);
}
});
provides_tags_map
}
}