use crate::{log, network_service, platform::PlatformRef, runtime_service, sync_service};
use alloc::{
borrow::ToOwned as _,
boxed::Box,
format,
string::{String, ToString as _},
sync::Arc,
vec::Vec,
};
use core::{cmp, iter, num::NonZero, pin, time::Duration};
use futures_channel::oneshot;
use futures_lite::FutureExt as _;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt as _, StreamExt as _, future};
use itertools::Itertools as _;
use smoldot::{
header,
informant::HashDisplay,
libp2p::peer_id::PeerId,
network::codec,
transactions::{light_pool, validate},
};
pub struct Config<TPlat: PlatformRef> {
pub log_name: String,
pub platform: TPlat,
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
pub max_pending_transactions: NonZero<u32>,
pub max_concurrent_downloads: NonZero<u32>,
pub max_concurrent_validations: NonZero<u32>,
}
pub struct TransactionsService<TPlat: PlatformRef> {
to_background: async_lock::Mutex<async_channel::Sender<ToBackground>>,
background_task_config: BackgroundTaskConfig<TPlat>,
}
impl<TPlat: PlatformRef> TransactionsService<TPlat> {
pub fn new(config: Config<TPlat>) -> Self {
let log_target = format!("tx-service-{}", config.log_name);
let (to_background, from_foreground) = async_channel::bounded(8);
let background_task_config = BackgroundTaskConfig {
log_target: log_target.clone(),
platform: config.platform.clone(),
sync_service: config.sync_service,
runtime_service: config.runtime_service,
network_service: config.network_service,
max_concurrent_downloads: usize::try_from(config.max_concurrent_downloads.get())
.unwrap_or(usize::MAX),
max_pending_transactions: usize::try_from(config.max_pending_transactions.get())
.unwrap_or(usize::MAX),
max_concurrent_validations: usize::try_from(config.max_concurrent_validations.get())
.unwrap_or(usize::MAX),
};
let task = Box::pin(background_task::<TPlat>(
background_task_config.clone(),
from_foreground,
));
config.platform.spawn_task(log_target.clone().into(), {
let platform = config.platform.clone();
async move {
task.await;
log!(&platform, Debug, &log_target, "shutdown");
}
});
TransactionsService {
to_background: async_lock::Mutex::new(to_background),
background_task_config,
}
}
pub async fn submit_and_watch_transaction(
&self,
transaction_bytes: Vec<u8>,
channel_size: usize,
detached: bool,
) -> TransactionWatcher {
let (updates_report, rx) = async_channel::bounded(channel_size);
self.send_to_background(ToBackground::SubmitTransaction {
transaction_bytes,
updates_report: Some((updates_report, detached)),
})
.await;
TransactionWatcher {
rx,
has_yielded_drop_reason: false,
_dummy_keep_alive: self.to_background.lock().await.clone(),
}
}
pub async fn submit_transaction(&self, transaction_bytes: Vec<u8>) {
self.send_to_background(ToBackground::SubmitTransaction {
transaction_bytes,
updates_report: None,
})
.await;
}
async fn send_to_background(&self, message: ToBackground) {
let mut lock = self.to_background.lock().await;
if lock.is_closed() {
let log_target = self.background_task_config.log_target.clone();
let (tx, rx) = async_channel::bounded(8);
let platform = self.background_task_config.platform.clone();
let task = background_task(self.background_task_config.clone(), rx);
self.background_task_config.platform.spawn_task(
log_target.clone().into(),
async move {
platform.sleep(Duration::from_secs(2)).await;
log!(&platform, Debug, &log_target, "restart");
task.await;
log!(&platform, Debug, &log_target, "shutdown");
},
);
*lock = tx;
}
let _ = lock.send(message).await;
}
}
#[pin_project::pin_project]
pub struct TransactionWatcher {
#[pin]
rx: async_channel::Receiver<TransactionStatus>,
has_yielded_drop_reason: bool,
_dummy_keep_alive: async_channel::Sender<ToBackground>,
}
impl TransactionWatcher {
pub async fn next(self: pin::Pin<&mut Self>) -> Option<TransactionStatus> {
let mut this = self.project();
if *this.has_yielded_drop_reason {
debug_assert!(this.rx.is_closed() || this.rx.next().await.is_none());
return None;
}
match this.rx.next().await {
Some(update) => {
if matches!(update, TransactionStatus::Dropped(_)) {
debug_assert!(!*this.has_yielded_drop_reason);
*this.has_yielded_drop_reason = true;
}
Some(update)
}
None => {
*this.has_yielded_drop_reason = true;
Some(TransactionStatus::Dropped(DropReason::Crashed))
}
}
}
}
#[derive(Debug, Clone)]
pub enum TransactionStatus {
Broadcast(Vec<PeerId>),
Validated,
IncludedBlockUpdate {
block_hash: Option<([u8; 32], u32)>,
},
Dropped(DropReason),
}
#[derive(Debug, Clone)]
pub enum DropReason {
Finalized { block_hash: [u8; 32], index: u32 },
GapInChain,
MaxPendingTransactionsReached,
Invalid(validate::TransactionValidityError),
ValidateError(ValidateTransactionError),
Crashed,
}
#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
pub enum ValidateTransactionError {
InvalidRuntime(runtime_service::RuntimeError),
ApiVersionRequirementUnfulfilled,
Crash,
#[display("Error during the execution of the runtime: {_0}")]
Execution(runtime_service::RuntimeCallExecutionError),
#[display("Error trying to access the storage required for the runtime call")]
Inaccessible(#[error(not(source))] Vec<runtime_service::RuntimeCallInaccessibleError>),
OutputDecodeError(validate::DecodeError),
}
#[derive(Debug, Clone)]
enum InvalidOrError {
Invalid(validate::TransactionValidityError),
ValidateError(ValidateTransactionError),
}
#[derive(Debug, Clone)]
enum ValidationError {
InvalidOrError(InvalidOrError),
ObsoleteSubscription,
}
enum ToBackground {
SubmitTransaction {
transaction_bytes: Vec<u8>,
updates_report: Option<(async_channel::Sender<TransactionStatus>, bool)>,
},
}
#[derive(Clone)]
struct BackgroundTaskConfig<TPlat: PlatformRef> {
log_target: String,
platform: TPlat,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
max_concurrent_downloads: usize,
max_pending_transactions: usize,
max_concurrent_validations: usize,
}
async fn background_task<TPlat: PlatformRef>(
config: BackgroundTaskConfig<TPlat>,
from_foreground: async_channel::Receiver<ToBackground>,
) {
let transactions_capacity = cmp::min(8, config.max_pending_transactions);
let blocks_capacity = 32;
let mut from_foreground = pin::pin!(from_foreground);
let mut worker = Worker {
platform: config.platform,
sync_service: config.sync_service,
runtime_service: config.runtime_service,
network_service: config.network_service,
pending_transactions: light_pool::LightPool::new(light_pool::Config {
transactions_capacity,
blocks_capacity,
finalized_block_hash: [0; 32], }),
block_downloads: FuturesUnordered::new(),
validations_in_progress: FuturesUnordered::new(),
next_reannounce: FuturesUnordered::new(),
max_concurrent_downloads: config.max_concurrent_downloads,
max_pending_transactions: config.max_pending_transactions,
};
'channels_rebuild: loop {
let mut subscribe_all = {
let sub_future = async {
Some(
worker
.runtime_service
.subscribe_all(32, NonZero::<usize>::new(usize::MAX).unwrap())
.await,
)
};
let from_foreground = &mut from_foreground;
let messages_process = async move {
loop {
match from_foreground.next().await {
Some(ToBackground::SubmitTransaction {
updates_report: Some(updates_report),
..
}) => {
let _ = updates_report
.0
.send(TransactionStatus::Dropped(DropReason::GapInChain))
.await;
}
Some(ToBackground::SubmitTransaction { .. }) => {}
None => break None,
}
}
};
match sub_future.or(messages_process).await {
Some(s) => s,
None => return,
}
};
let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
for (_, pending) in worker.pending_transactions.transactions_iter_mut() {
pending.update_status(TransactionStatus::Dropped(DropReason::GapInChain));
}
let dropped_transactions = worker
.pending_transactions
.transactions_iter()
.map(|(tx_id, _)| {
HashDisplay(worker.pending_transactions.scale_encoding(tx_id).unwrap())
})
.join(",");
worker.pending_transactions = light_pool::LightPool::new(light_pool::Config {
transactions_capacity,
blocks_capacity,
finalized_block_hash: initial_finalized_block_hash,
});
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
worker.pending_transactions.add_block(
hash,
&block.parent_hash,
Block {
scale_encoded_header: block.scale_encoded_header,
failed_downloads: 0,
downloading: false,
},
);
if block.is_new_best {
worker.set_best_block(&config.log_target, &hash);
}
}
worker.block_downloads.clear();
worker.validations_in_progress.clear();
worker.next_reannounce.clear();
log!(
&worker.platform,
Debug,
&config.log_target,
"reset",
new_finalized = HashDisplay(&initial_finalized_block_hash),
subscription_id = ?subscribe_all.new_blocks.id(),
dropped_transactions
);
loop {
if worker.pending_transactions.oldest_block_finality_lag() >= 32 {
continue 'channels_rebuild;
}
while let Some(tx_id) = {
let id = worker
.pending_transactions
.transactions_iter()
.find(|(_, tx)| {
!tx.status_update.iter().any(|s| !s.is_closed()) && !tx.detached
})
.map(|(id, _)| id);
id
} {
worker.pending_transactions.remove_transaction(tx_id);
}
while worker.validations_in_progress.len() < config.max_concurrent_validations {
let to_start_validate = worker
.pending_transactions
.unvalidated_transactions()
.find(|(_, tx)| tx.validation_in_progress.is_none())
.map(|(tx_id, ..)| tx_id);
let to_start_validate = match to_start_validate {
Some(tx_id) => tx_id,
None => break,
};
let validation_future = {
let block_hash = *worker.pending_transactions.best_block_hash();
let scale_encoded_header =
match worker.pending_transactions.block_user_data(&block_hash) {
Some(b) => b.scale_encoded_header.clone(),
None => break,
};
let runtime_service = worker.runtime_service.clone();
let platform = worker.platform.clone();
let log_target = config.log_target.clone();
let relay_chain_sync_subscription_id = subscribe_all.new_blocks.id();
let scale_encoded_transaction = worker
.pending_transactions
.scale_encoding(to_start_validate)
.unwrap()
.to_owned();
async move {
let result = validate_transaction(
&platform,
&log_target,
&runtime_service,
relay_chain_sync_subscription_id,
block_hash,
&scale_encoded_header,
scale_encoded_transaction,
validate::TransactionSource::External,
)
.await;
(block_hash, result)
}
};
let (result_tx, result_rx) = oneshot::channel();
worker
.validations_in_progress
.push(Box::pin(validation_future.map(move |result| {
let _ = result_tx.send(result);
to_start_validate
})));
let tx = worker
.pending_transactions
.transaction_user_data_mut(to_start_validate)
.unwrap();
debug_assert!(tx.validation_in_progress.is_none());
tx.validation_in_progress = Some(result_rx);
}
loop {
let (tx_id, _, error) = match worker
.pending_transactions
.invalid_transactions_finalized_block()
.next()
{
Some(v) => v,
None => break,
};
let error = error.clone();
let (tx_body, mut transaction) =
worker.pending_transactions.remove_transaction(tx_id);
log!(
&worker.platform,
Debug,
&config.log_target,
"discarded",
tx_hash = HashDisplay(&blake2_hash(&tx_body)),
?error
);
transaction.update_status(TransactionStatus::Dropped(match error {
InvalidOrError::Invalid(err) => DropReason::Invalid(err),
InvalidOrError::ValidateError(err) => DropReason::ValidateError(err),
}));
}
while worker.block_downloads.len() < worker.max_concurrent_downloads {
let block_hash_number = worker
.pending_transactions
.missing_block_bodies()
.find(|(_, block)| {
if block.downloading {
return false;
}
if block.failed_downloads >= 1 {
return false;
}
true
})
.map(|(hash, block)| {
let decoded = header::decode(
&block.scale_encoded_header,
worker.sync_service.block_number_bytes(),
)
.unwrap();
(*hash, decoded.number)
});
let (block_hash, block_number) = match block_hash_number {
Some(b) => b,
None => break,
};
worker.block_downloads.push({
let download_future = worker.sync_service.clone().block_query(
block_number,
block_hash,
codec::BlocksRequestFields {
body: true,
header: true, justifications: false,
},
3,
Duration::from_secs(8),
NonZero::<u32>::new(3).unwrap(),
);
Box::pin(async move {
(
block_hash,
download_future.await.and_then(|b| b.body.ok_or(())),
)
})
});
worker
.pending_transactions
.block_user_data_mut(&block_hash)
.unwrap()
.downloading = true;
log!(
&worker.platform,
Debug,
&config.log_target,
"blocks-download-started",
block = HashDisplay(&block_hash)
);
}
for block in worker.pending_transactions.prune_finalized_with_body() {
log!(
&worker.platform,
Debug,
&config.log_target,
"finalized",
block = HashDisplay(&block.block_hash),
body_transactions = block
.included_transactions
.iter()
.map(|tx| HashDisplay(&blake2_hash(&tx.scale_encoding)).to_string())
.join(", ")
);
subscribe_all.new_blocks.unpin_block(block.block_hash).await;
debug_assert!(!block.user_data.downloading);
for mut tx in block.included_transactions {
let body_index = u32::try_from(tx.index_in_block).unwrap();
tx.user_data
.update_status(TransactionStatus::Dropped(DropReason::Finalized {
block_hash: block.block_hash,
index: body_index,
}));
}
}
futures_lite::future::yield_now().await;
enum WakeUpReason {
Notification(Option<runtime_service::Notification>),
BlockDownloadFinished([u8; 32], Result<Vec<Vec<u8>>, ()>),
MustMaybeReannounce(light_pool::TransactionId),
MaybeValidated(light_pool::TransactionId),
ForegroundMessage(Option<ToBackground>),
}
let wake_up_reason: WakeUpReason = {
async { WakeUpReason::Notification(subscribe_all.new_blocks.next().await) }
.or(async {
if !worker.block_downloads.is_empty() {
let (block_hash, result) =
worker.block_downloads.select_next_some().await;
WakeUpReason::BlockDownloadFinished(block_hash, result)
} else {
future::pending().await
}
})
.or(async {
if !worker.next_reannounce.is_empty() {
WakeUpReason::MustMaybeReannounce(
worker.next_reannounce.select_next_some().await,
)
} else {
future::pending().await
}
})
.or(async {
if !worker.validations_in_progress.is_empty() {
WakeUpReason::MaybeValidated(
worker.validations_in_progress.select_next_some().await,
)
} else {
future::pending().await
}
})
.or(async { WakeUpReason::ForegroundMessage(from_foreground.next().await) })
.await
};
match wake_up_reason {
WakeUpReason::Notification(Some(runtime_service::Notification::Block(
new_block,
))) => {
let hash =
header::hash_from_scale_encoded_header(&new_block.scale_encoded_header);
worker.pending_transactions.add_block(
header::hash_from_scale_encoded_header(&new_block.scale_encoded_header),
&new_block.parent_hash,
Block {
scale_encoded_header: new_block.scale_encoded_header,
failed_downloads: 0,
downloading: false,
},
);
if new_block.is_new_best {
worker.set_best_block(&config.log_target, &hash);
}
}
WakeUpReason::Notification(Some(runtime_service::Notification::Finalized {
hash,
best_block_hash_if_changed,
..
})) => {
if let Some(best_block_hash_if_changed) = best_block_hash_if_changed {
worker.set_best_block(&config.log_target, &best_block_hash_if_changed);
}
for pruned in worker.pending_transactions.set_finalized_block(&hash) {
log!(
&worker.platform,
Debug,
&config.log_target,
"pruned-block-discard",
block = HashDisplay(&pruned.0),
);
subscribe_all.new_blocks.unpin_block(pruned.0).await;
}
}
WakeUpReason::Notification(Some(
runtime_service::Notification::BestBlockChanged { hash },
)) => {
worker.set_best_block(&config.log_target, &hash);
}
WakeUpReason::Notification(None) => continue 'channels_rebuild,
WakeUpReason::BlockDownloadFinished(block_hash, mut block_body) => {
let block = match worker.pending_transactions.block_user_data_mut(&block_hash) {
Some(b) => b,
None => {
continue;
}
};
debug_assert!(block.downloading);
block.downloading = false;
if let Ok(body) = &block_body {
if header::extrinsics_root(body)
!= *header::decode(
&block.scale_encoded_header,
worker.sync_service.block_number_bytes(),
)
.unwrap()
.extrinsics_root
{
block_body = Err(());
}
}
if let Ok(block_body) = block_body {
let block_body_size = block_body.len();
let included_transactions = worker
.pending_transactions
.set_block_body(&block_hash, block_body.into_iter())
.collect::<Vec<_>>();
log!(
&worker.platform,
Debug,
&config.log_target,
"blocks-download-success",
block = HashDisplay(&block_hash),
included_transactions = included_transactions
.iter()
.map(|(id, _)| HashDisplay(&blake2_hash(
worker.pending_transactions.scale_encoding(*id).unwrap()
))
.to_string())
.join(", ")
);
for (tx_id, body_index) in included_transactions {
debug_assert!(body_index < block_body_size);
let tx = worker
.pending_transactions
.transaction_user_data_mut(tx_id)
.unwrap();
let body_index = u32::try_from(body_index).unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate {
block_hash: Some((block_hash, body_index)),
});
}
} else {
block.failed_downloads = block.failed_downloads.saturating_add(1);
log!(
&worker.platform,
Debug,
&config.log_target,
"blocks-download-failure",
block = HashDisplay(&block_hash)
);
}
}
WakeUpReason::MustMaybeReannounce(maybe_reannounce_tx_id) => {
if worker
.pending_transactions
.transaction_user_data(maybe_reannounce_tx_id)
.is_none()
{
continue;
}
if worker
.pending_transactions
.is_included_best_chain(maybe_reannounce_tx_id)
|| !worker
.pending_transactions
.is_valid_against_best_block(maybe_reannounce_tx_id)
{
continue;
}
let now = worker.platform.now();
let tx = worker
.pending_transactions
.transaction_user_data_mut(maybe_reannounce_tx_id)
.unwrap();
if tx.when_reannounce > now {
continue;
}
tx.when_reannounce = now + Duration::from_secs(5);
worker.next_reannounce.push({
let platform = worker.platform.clone();
Box::pin(async move {
platform.sleep(Duration::from_secs(5)).await;
maybe_reannounce_tx_id
})
});
let peers_sent = worker
.network_service
.clone()
.announce_transaction(
worker
.pending_transactions
.scale_encoding(maybe_reannounce_tx_id)
.unwrap(),
)
.await;
log!(
&worker.platform,
Debug,
&config.log_target,
"announced-to-network",
transaction = HashDisplay(&blake2_hash(
worker
.pending_transactions
.scale_encoding(maybe_reannounce_tx_id)
.unwrap()
)),
peers = peers_sent.iter().join(", ")
);
if !peers_sent.is_empty() {
worker
.pending_transactions
.transaction_user_data_mut(maybe_reannounce_tx_id)
.unwrap()
.update_status(TransactionStatus::Broadcast(peers_sent));
}
}
WakeUpReason::MaybeValidated(maybe_validated_tx_id) => {
let (block_hash, validation_result) = match worker
.pending_transactions
.transaction_user_data_mut(maybe_validated_tx_id)
{
None => continue, Some(tx) => match tx
.validation_in_progress
.as_mut()
.and_then(|f| f.now_or_never())
{
None => continue, Some(Err(_)) => unreachable!(), Some(Ok(result)) => {
tx.validation_in_progress = None;
result
}
},
};
let tx_hash = blake2_hash(
worker
.pending_transactions
.scale_encoding(maybe_validated_tx_id)
.unwrap(),
);
if !worker.pending_transactions.has_block(&block_hash) {
log!(
&worker.platform,
Debug,
&config.log_target,
"transaction-validation-obsolete-block",
transaction = HashDisplay(&tx_hash),
block = HashDisplay(&block_hash)
);
continue;
}
let validation_result = match validation_result {
Ok(result) => {
log!(
&worker.platform,
Debug,
&config.log_target,
"transaction-validation-success",
transaction = HashDisplay(&tx_hash),
block = HashDisplay(&block_hash),
priority = result.priority,
longevity = result.longevity,
propagate = ?result.propagate,
);
log!(
&worker.platform,
Info,
&config.log_target,
format!(
"Successfully validated transaction {}",
HashDisplay(&tx_hash)
)
);
worker
.pending_transactions
.transaction_user_data_mut(maybe_validated_tx_id)
.unwrap_or_else(|| unreachable!())
.update_status(TransactionStatus::Validated);
worker
.next_reannounce
.push(Box::pin(async move { maybe_validated_tx_id }));
Ok(result)
}
Err(ValidationError::ObsoleteSubscription) => {
continue 'channels_rebuild;
}
Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(error))) => {
log!(
&worker.platform,
Debug,
&config.log_target,
"transaction-validation-invalid-tx",
transaction = HashDisplay(&tx_hash),
block = HashDisplay(&block_hash),
?error,
);
log!(
&worker.platform,
Warn,
&config.log_target,
format!(
"Transaction {} invalid against block {}: {}",
HashDisplay(&tx_hash),
HashDisplay(&block_hash),
error,
)
);
Err(InvalidOrError::Invalid(error))
}
Err(ValidationError::InvalidOrError(InvalidOrError::ValidateError(
error,
))) => {
log!(
&worker.platform,
Debug,
&config.log_target,
"transaction-validation-error",
transaction = HashDisplay(&tx_hash),
block = HashDisplay(&block_hash),
?error,
);
log!(
&worker.platform,
Warn,
&config.log_target,
format!(
"Failed to validate transaction {}: {}",
HashDisplay(&tx_hash),
error
)
);
Err(InvalidOrError::ValidateError(error))
}
};
worker.pending_transactions.set_validation_result(
maybe_validated_tx_id,
&block_hash,
validation_result,
);
}
WakeUpReason::ForegroundMessage(None) => return,
WakeUpReason::ForegroundMessage(Some(ToBackground::SubmitTransaction {
transaction_bytes,
updates_report,
})) => {
let existing_tx_id = worker
.pending_transactions
.find_transaction(&transaction_bytes)
.next();
if let Some(existing_tx_id) = existing_tx_id {
let existing_tx = worker
.pending_transactions
.transaction_user_data_mut(existing_tx_id)
.unwrap();
if let Some((channel, detached)) = updates_report {
existing_tx.add_status_update(channel);
if detached {
existing_tx.detached = true;
}
}
continue;
}
if worker.pending_transactions.num_transactions()
>= worker.max_pending_transactions
{
if let Some((updates_report, _)) = updates_report {
let _ = updates_report.try_send(TransactionStatus::Dropped(
DropReason::MaxPendingTransactionsReached,
));
}
continue;
}
worker.pending_transactions.add_unvalidated(
transaction_bytes,
PendingTransaction {
when_reannounce: worker.platform.now(),
detached: match &updates_report {
Some((_, true)) | None => true,
Some((_, false)) => false,
},
status_update: {
let mut vec = Vec::with_capacity(1);
if let Some((channel, _)) = updates_report {
vec.push(channel);
}
vec
},
latest_status: None,
validation_in_progress: None,
},
);
}
}
}
}
}
struct Worker<TPlat: PlatformRef> {
platform: TPlat,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
pending_transactions: light_pool::LightPool<PendingTransaction<TPlat>, Block, InvalidOrError>,
max_pending_transactions: usize,
block_downloads:
FuturesUnordered<future::BoxFuture<'static, ([u8; 32], Result<Vec<Vec<u8>>, ()>)>>,
validations_in_progress:
FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
next_reannounce: FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
max_concurrent_downloads: usize,
}
impl<TPlat: PlatformRef> Worker<TPlat> {
fn set_best_block(&mut self, log_target: &str, new_best_block_hash: &[u8; 32]) {
let updates = self
.pending_transactions
.set_best_block(new_best_block_hash);
log!(
&self.platform,
Debug,
&log_target,
"best-chain-update",
new_best_block = HashDisplay(new_best_block_hash),
included_transactions = updates
.included_transactions
.iter()
.map(|(id, _, _)| HashDisplay(&blake2_hash(
self.pending_transactions.scale_encoding(*id).unwrap()
))
.to_string())
.join(", "),
retracted_transactions = updates
.retracted_transactions
.iter()
.map(|(id, _, _)| HashDisplay(&blake2_hash(
self.pending_transactions.scale_encoding(*id).unwrap()
))
.to_string())
.join(", ")
);
for (tx_id, _, _) in updates.retracted_transactions {
let tx = self
.pending_transactions
.transaction_user_data_mut(tx_id)
.unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate { block_hash: None });
}
for (tx_id, block_hash, block_body_index) in updates.included_transactions {
let tx = self
.pending_transactions
.transaction_user_data_mut(tx_id)
.unwrap();
let block_body_index = u32::try_from(block_body_index).unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate {
block_hash: Some((block_hash, block_body_index)),
});
}
}
}
struct Block {
scale_encoded_header: Vec<u8>,
failed_downloads: u8,
downloading: bool,
}
struct PendingTransaction<TPlat: PlatformRef> {
when_reannounce: TPlat::Instant,
status_update: Vec<async_channel::Sender<TransactionStatus>>,
detached: bool,
latest_status: Option<TransactionStatus>,
validation_in_progress: Option<
oneshot::Receiver<(
[u8; 32],
Result<validate::ValidTransaction, ValidationError>,
)>,
>,
}
impl<TPlat: PlatformRef> PendingTransaction<TPlat> {
fn add_status_update(&mut self, channel: async_channel::Sender<TransactionStatus>) {
if let Some(latest_status) = &self.latest_status {
if channel.try_send(latest_status.clone()).is_err() {
return;
}
}
self.status_update.push(channel);
}
fn update_status(&mut self, status: TransactionStatus) {
for n in 0..self.status_update.len() {
let channel = self.status_update.swap_remove(n);
if channel.try_send(status.clone()).is_ok() {
self.status_update.push(channel);
}
}
self.latest_status = Some(status);
}
}
async fn validate_transaction<TPlat: PlatformRef>(
platform: &TPlat,
log_target: &str,
relay_chain_sync: &Arc<runtime_service::RuntimeService<TPlat>>,
relay_chain_sync_subscription_id: runtime_service::SubscriptionId,
block_hash: [u8; 32],
block_scale_encoded_header: &[u8],
scale_encoded_transaction: impl AsRef<[u8]> + Clone,
source: validate::TransactionSource,
) -> Result<validate::ValidTransaction, ValidationError> {
log!(
platform,
Debug,
&log_target,
"transaction-validation-started",
transaction = HashDisplay(&blake2_hash(scale_encoded_transaction.as_ref())),
block = HashDisplay(&block_hash),
block_height = header::decode(
block_scale_encoded_header,
relay_chain_sync.block_number_bytes()
)
.ok()
.map(|h| format!("#{}", h.number))
.unwrap_or_else(|| "unknown".to_owned())
);
let (pinned_runtime, block_state_root_hash, block_number) = match relay_chain_sync
.pin_pinned_block_runtime(relay_chain_sync_subscription_id, block_hash)
.await
{
Ok(r) => r,
Err(runtime_service::PinPinnedBlockRuntimeError::ObsoleteSubscription) => {
return Err(ValidationError::ObsoleteSubscription);
}
Err(runtime_service::PinPinnedBlockRuntimeError::BlockNotPinned) => unreachable!(),
};
let runtime_call_future = relay_chain_sync.runtime_call(
pinned_runtime,
block_hash,
block_number,
block_state_root_hash,
validate::VALIDATION_FUNCTION_NAME.to_owned(),
Some(("TaggedTransactionQueue".to_owned(), 3..=3)),
validate::validate_transaction_runtime_parameters_v3(
iter::once(scale_encoded_transaction.as_ref()),
source,
&block_hash,
)
.fold(Vec::new(), |mut a, b| {
a.extend_from_slice(b.as_ref());
a
}),
3,
Duration::from_secs(8),
NonZero::<u32>::new(1).unwrap(),
);
let success = match runtime_call_future.await {
Ok(output) => output,
Err(runtime_service::RuntimeCallError::Execution(error)) => {
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Execution(error)),
));
}
Err(runtime_service::RuntimeCallError::Crash) => {
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Crash),
));
}
Err(runtime_service::RuntimeCallError::Inaccessible(errors)) => {
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Inaccessible(errors)),
));
}
Err(runtime_service::RuntimeCallError::InvalidRuntime(error)) => {
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::InvalidRuntime(error)),
));
}
Err(runtime_service::RuntimeCallError::ApiVersionRequirementUnfulfilled) => {
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(
ValidateTransactionError::ApiVersionRequirementUnfulfilled,
),
));
}
};
match validate::decode_validate_transaction_return_value(&success.output) {
Ok(Ok(decoded)) => Ok(decoded),
Ok(Err(err)) => Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(
err,
))),
Err(err) => Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::OutputDecodeError(err)),
)),
}
}
fn blake2_hash(bytes: &[u8]) -> [u8; 32] {
<[u8; 32]>::try_from(blake2_rfc::blake2b::blake2b(32, &[], bytes).as_bytes()).unwrap()
}