use crate::{network_service, platform::Platform, runtime_service, sync_service};
use alloc::{
borrow::ToOwned as _,
boxed::Box,
format,
string::{String, ToString as _},
sync::Arc,
vec::Vec,
};
use core::{
cmp, iter,
marker::PhantomData,
num::{NonZeroU32, NonZeroUsize},
time::Duration,
};
use futures::{channel::mpsc, lock::Mutex, prelude::*, stream::FuturesUnordered};
use itertools::Itertools as _;
use smoldot::{
header,
informant::HashDisplay,
libp2p::peer_id::PeerId,
network::protocol,
transactions::{light_pool, validate},
};
pub struct Config<TPlat: Platform> {
pub log_name: String,
pub tasks_executor: Box<dyn FnMut(String, future::BoxFuture<'static, ()>) + Send>,
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
pub network_service: (Arc<network_service::NetworkService<TPlat>>, usize),
pub max_pending_transactions: NonZeroU32,
pub max_concurrent_downloads: NonZeroU32,
pub max_concurrent_validations: NonZeroU32,
}
pub struct TransactionsService<TPlat> {
to_background: Mutex<mpsc::Sender<ToBackground>>,
platform: PhantomData<fn() -> TPlat>,
}
impl<TPlat: Platform> TransactionsService<TPlat> {
pub async fn new(mut config: Config<TPlat>) -> Self {
let log_target = format!("tx-service-{}", config.log_name);
let (to_background, from_foreground) = mpsc::channel(8);
(config.tasks_executor)(
log_target.clone(),
Box::pin(background_task::<TPlat>(
log_target,
config.sync_service,
config.runtime_service,
config.network_service.0,
config.network_service.1,
from_foreground,
usize::try_from(config.max_concurrent_downloads.get())
.unwrap_or(usize::max_value()),
usize::try_from(config.max_pending_transactions.get())
.unwrap_or(usize::max_value()),
usize::try_from(config.max_concurrent_validations.get())
.unwrap_or(usize::max_value()),
)),
);
TransactionsService {
to_background: Mutex::new(to_background),
platform: PhantomData,
}
}
pub async fn submit_and_watch_transaction(
&self,
transaction_bytes: Vec<u8>,
channel_size: usize,
) -> mpsc::Receiver<TransactionStatus> {
let (updates_report, rx) = mpsc::channel(channel_size);
self.to_background
.lock()
.await
.send(ToBackground::SubmitTransaction {
transaction_bytes,
updates_report: Some(updates_report),
})
.await
.unwrap();
rx
}
pub async fn submit_transaction(&self, transaction_bytes: Vec<u8>) {
self.to_background
.lock()
.await
.send(ToBackground::SubmitTransaction {
transaction_bytes,
updates_report: None,
})
.await
.unwrap();
}
}
#[derive(Debug, Clone)]
pub enum TransactionStatus {
Broadcast(Vec<PeerId>),
IncludedBlockUpdate {
block_hash: Option<([u8; 32], u32)>,
},
Dropped(DropReason),
}
#[derive(Debug, Clone)]
pub enum DropReason {
Finalized { block_hash: [u8; 32], index: u32 },
GapInChain,
MaxPendingTransactionsReached,
Invalid(validate::TransactionValidityError),
ValidateError(ValidateTransactionError),
}
#[derive(Debug, derive_more::Display, Clone)]
pub enum ValidateTransactionError {
#[display(fmt = "{}", _0)]
Call(runtime_service::RuntimeCallError),
#[display(fmt = "{}", _0)]
Validation(validate::Error),
NextKeyForbidden,
}
#[derive(Debug, Clone)]
enum InvalidOrError {
Invalid(validate::TransactionValidityError),
ValidateError(ValidateTransactionError),
}
#[derive(Debug, Clone)]
enum ValidationError {
InvalidOrError(InvalidOrError),
ObsoleteSubscription,
}
enum ToBackground {
SubmitTransaction {
transaction_bytes: Vec<u8>,
updates_report: Option<mpsc::Sender<TransactionStatus>>,
},
}
async fn background_task<TPlat: Platform>(
log_target: String,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
network_service: Arc<network_service::NetworkService<TPlat>>,
network_chain_index: usize,
mut from_foreground: mpsc::Receiver<ToBackground>,
max_concurrent_downloads: usize,
max_pending_transactions: usize,
max_concurrent_validations: usize,
) {
let transactions_capacity = cmp::min(8, max_pending_transactions);
let blocks_capacity = 32;
let mut worker = Worker {
sync_service,
runtime_service,
network_service,
network_chain_index,
pending_transactions: light_pool::LightPool::new(light_pool::Config {
transactions_capacity,
blocks_capacity,
finalized_block_hash: [0; 32], }),
block_downloads: FuturesUnordered::new(),
validations_in_progress: FuturesUnordered::new(),
next_reannounce: FuturesUnordered::new(),
max_concurrent_downloads,
max_pending_transactions,
};
'channels_rebuild: loop {
let mut subscribe_all = worker
.runtime_service
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
for (_, pending) in worker.pending_transactions.transactions_iter_mut() {
pending.update_status(TransactionStatus::Dropped(DropReason::GapInChain));
}
let dropped_transactions = worker
.pending_transactions
.transactions_iter()
.map(|(tx_id, _)| {
HashDisplay(worker.pending_transactions.scale_encoding(tx_id).unwrap())
})
.join(",");
worker.pending_transactions = light_pool::LightPool::new(light_pool::Config {
transactions_capacity,
blocks_capacity,
finalized_block_hash: initial_finalized_block_hash,
});
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
worker.pending_transactions.add_block(
hash,
&block.parent_hash,
Block {
scale_encoded_header: block.scale_encoded_header,
failed_downloads: 0,
downloading: false,
},
);
if block.is_new_best {
worker.set_best_block(&log_target, &hash);
}
}
worker.block_downloads.clear();
worker.validations_in_progress.clear();
worker.next_reannounce.clear();
log::debug!(
target: &log_target,
"Reset(new_finalized={}. dropped-transactions={{{}}})",
HashDisplay(&initial_finalized_block_hash),
dropped_transactions
);
loop {
if worker.pending_transactions.oldest_block_finality_lag() >= 32 {
continue 'channels_rebuild;
}
while worker.validations_in_progress.len() < max_concurrent_validations {
let to_start_validate = worker
.pending_transactions
.unvalidated_transactions()
.filter(|(_, tx)| tx.validation_in_progress.is_none())
.next()
.map(|(tx_id, ..)| tx_id);
let to_start_validate = match to_start_validate {
Some(tx_id) => tx_id,
None => break,
};
let validation_future = {
let block_hash = *worker.pending_transactions.best_block_hash();
let scale_encoded_header =
match worker.pending_transactions.block_user_data(&block_hash) {
Some(b) => b.scale_encoded_header.clone(),
None => break,
};
let runtime_service = worker.runtime_service.clone();
let log_target = log_target.clone();
let relay_chain_sync_subscription_id = subscribe_all.new_blocks.id();
let scale_encoded_transaction = worker
.pending_transactions
.scale_encoding(to_start_validate)
.unwrap()
.to_owned();
async move {
let result = validate_transaction(
&log_target,
&runtime_service,
relay_chain_sync_subscription_id,
block_hash,
&scale_encoded_header,
scale_encoded_transaction,
validate::TransactionSource::External,
)
.await;
(block_hash, result)
}
};
let (to_execute, result_rx) = validation_future.remote_handle();
worker
.validations_in_progress
.push(to_execute.map(move |()| to_start_validate).boxed());
let tx = worker
.pending_transactions
.transaction_user_data_mut(to_start_validate)
.unwrap();
debug_assert!(tx.validation_in_progress.is_none());
tx.validation_in_progress = Some(result_rx);
}
loop {
let (tx_id, _, error) = match worker
.pending_transactions
.invalid_transactions_finalized_block()
.next()
{
Some(v) => v,
None => break,
};
let error = error.clone();
let (tx_body, mut transaction) =
worker.pending_transactions.remove_transaction(tx_id);
log::debug!(
target: &log_target,
"Discarded(tx_hash={}, error={:?})",
HashDisplay(&blake2_hash(&tx_body)),
error,
);
transaction.update_status(TransactionStatus::Dropped(match error {
InvalidOrError::Invalid(err) => DropReason::Invalid(err),
InvalidOrError::ValidateError(err) => DropReason::ValidateError(err),
}));
}
while worker.block_downloads.len() < worker.max_concurrent_downloads {
let block_hash_number = worker
.pending_transactions
.missing_block_bodies()
.find(|(_, block)| {
if block.downloading {
return false;
}
if block.failed_downloads >= 1 {
return false;
}
true
})
.map(|(hash, block)| {
let decoded = header::decode(
&block.scale_encoded_header,
worker.sync_service.block_number_bytes(),
)
.unwrap();
(*hash, decoded.number)
});
let (block_hash, block_number) = match block_hash_number {
Some(b) => b,
None => break,
};
worker.block_downloads.push({
let download_future = worker.sync_service.clone().block_query(
block_number,
block_hash,
protocol::BlocksRequestFields {
body: true,
header: true, justifications: false,
},
3,
Duration::from_secs(8),
NonZeroU32::new(3).unwrap(),
);
async move { (block_hash, download_future.await.map(|b| b.body.unwrap())) }
.boxed()
});
worker
.pending_transactions
.block_user_data_mut(&block_hash)
.unwrap()
.downloading = true;
log::debug!(
target: &log_target,
"BlockDownloads <= Start(block={})",
HashDisplay(&block_hash)
);
}
for block in worker.pending_transactions.prune_finalized_with_body() {
subscribe_all
.new_blocks
.unpin_block(&block.block_hash)
.await;
log::debug!(
target: &log_target,
"Finalized(block={}, body-transactions={{{}}})",
HashDisplay(&block.block_hash),
block
.included_transactions
.iter()
.map(|tx| HashDisplay(&blake2_hash(&tx.scale_encoding)).to_string())
.join(", ")
);
debug_assert!(!block.user_data.downloading);
for mut tx in block.included_transactions {
let body_index = u32::try_from(tx.index_in_block).unwrap();
tx.user_data
.update_status(TransactionStatus::Dropped(DropReason::Finalized {
block_hash: block.block_hash,
index: body_index,
}));
}
}
futures::select! {
notification = subscribe_all.new_blocks.next().fuse() => {
match notification {
Some(runtime_service::Notification::Block(new_block)) => {
let hash = header::hash_from_scale_encoded_header(&new_block.scale_encoded_header);
worker.pending_transactions.add_block(
header::hash_from_scale_encoded_header(&new_block.scale_encoded_header),
&new_block.parent_hash,
Block {
scale_encoded_header: new_block.scale_encoded_header,
failed_downloads: 0,
downloading: false,
},
);
if new_block.is_new_best {
worker.set_best_block(&log_target, &hash);
}
},
Some(runtime_service::Notification::Finalized { hash, best_block_hash, .. }) => {
worker.set_best_block(&log_target, &best_block_hash);
for pruned in worker
.pending_transactions
.set_finalized_block(&hash)
{
subscribe_all.new_blocks.unpin_block(&pruned.0).await;
}
},
Some(runtime_service::Notification::BestBlockChanged { hash }) => {
worker.set_best_block(&log_target, &hash);
},
None => continue 'channels_rebuild
}
},
download = worker.block_downloads.select_next_some() => {
let (block_hash, block_body) = download;
let mut block = match worker.pending_transactions.block_user_data_mut(&block_hash) {
Some(b) => b,
None => {
continue
},
};
debug_assert!(block.downloading);
block.downloading = false;
if block_body.is_err() {
block.failed_downloads = block.failed_downloads.saturating_add(1);
}
if let Ok(block_body) = block_body {
let block_body_size = block_body.len();
let included_transactions = worker
.pending_transactions
.set_block_body(&block_hash, block_body.into_iter())
.collect::<Vec<_>>();
log::debug!(
target: &log_target,
"BlockDownloads => Success(block={}, included-transactions={{{}}})",
HashDisplay(&block_hash),
included_transactions.iter()
.map(|(id, _)| HashDisplay(&blake2_hash(worker.pending_transactions.scale_encoding(*id).unwrap())).to_string())
.join(", ")
);
for (tx_id, body_index) in included_transactions {
debug_assert!(body_index < block_body_size);
let tx = worker.pending_transactions.transaction_user_data_mut(tx_id).unwrap();
let body_index = u32::try_from(body_index).unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate { block_hash: Some((block_hash, body_index)) });
}
} else {
log::debug!(
target: &log_target,
"BlockDownloads => Failed(block={})",
HashDisplay(&block_hash)
);
}
},
maybe_reannounce_tx_id = worker.next_reannounce.select_next_some() => {
if worker.pending_transactions.transaction_user_data(maybe_reannounce_tx_id).is_none() {
continue;
}
if worker.pending_transactions.is_included_best_chain(maybe_reannounce_tx_id) ||
!worker.pending_transactions.is_valid_against_best_block(maybe_reannounce_tx_id)
{
continue;
}
let now = TPlat::now();
let tx = worker.pending_transactions.transaction_user_data_mut(maybe_reannounce_tx_id).unwrap();
if tx.when_reannounce > now {
continue;
}
tx.when_reannounce = now + Duration::from_secs(5);
worker.next_reannounce.push(async move {
TPlat::sleep(Duration::from_secs(5)).await;
maybe_reannounce_tx_id
}.boxed());
let peers_sent = worker.network_service
.clone()
.announce_transaction(
worker.network_chain_index,
worker.pending_transactions.scale_encoding(maybe_reannounce_tx_id).unwrap()
)
.await;
log::debug!(
target: &log_target,
"NetworkService <= Announced(tx={}, peers={{{}}})",
HashDisplay(&blake2_hash(worker.pending_transactions.scale_encoding(maybe_reannounce_tx_id).unwrap())),
peers_sent.iter().join(", ")
);
if !peers_sent.is_empty() {
worker.pending_transactions
.transaction_user_data_mut(maybe_reannounce_tx_id).unwrap()
.update_status(TransactionStatus::Broadcast(peers_sent));
}
},
maybe_validated_tx_id = worker.validations_in_progress.select_next_some() => {
let (block_hash, validation_result) = match worker.pending_transactions.transaction_user_data_mut(maybe_validated_tx_id) {
None => continue, Some(tx) => match tx.validation_in_progress.as_mut().and_then(|f| f.now_or_never()) {
None => continue, Some(result) => {
tx.validation_in_progress = None;
result
},
},
};
let tx_hash = blake2_hash(worker.pending_transactions.scale_encoding(maybe_validated_tx_id).unwrap());
if !worker.pending_transactions.has_block(&block_hash) {
log::debug!(
target: &log_target,
"TxValidations => ObsoleteBlock(tx={}, block={})",
HashDisplay(&tx_hash),
HashDisplay(&block_hash)
);
continue;
}
let validation_result = match validation_result {
Ok(result) => {
log::debug!(
target: &log_target,
"TxValidations => Success(tx={}, block={}, priority={}, longevity={}, propagate={:?})",
HashDisplay(&tx_hash),
HashDisplay(&block_hash),
result.priority,
result.longevity,
result.propagate,
);
log::info!(
target: &log_target,
"Successfully validated transaction {}",
HashDisplay(&tx_hash)
);
worker.next_reannounce.push(async move {
maybe_validated_tx_id
}.boxed());
Ok(result)
}
Err(ValidationError::ObsoleteSubscription) => {
continue 'channels_rebuild
}
Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Invalid(tx={}, block={}, error={:?})",
HashDisplay(&tx_hash),
HashDisplay(&block_hash),
error,
);
log::warn!(
target: &log_target,
"Transaction {} invalid against block {}: {}",
HashDisplay(&tx_hash),
HashDisplay(&block_hash),
error,
);
Err(InvalidOrError::Invalid(error))
}
Err(ValidationError::InvalidOrError(InvalidOrError::ValidateError(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Error(tx={}, block={}, error={:?})",
HashDisplay(&tx_hash),
HashDisplay(&block_hash),
error,
);
log::warn!(
target: &log_target,
"Failed to validate transaction {}: {}",
HashDisplay(&tx_hash),
error
);
Err(InvalidOrError::ValidateError(error))
}
};
worker.pending_transactions
.set_validation_result(maybe_validated_tx_id, &block_hash, validation_result);
},
message = from_foreground.next().fuse() => {
let message = match message {
Some(msg) => msg,
None => return,
};
match message {
ToBackground::SubmitTransaction {
transaction_bytes,
updates_report,
} => {
let existing_tx_id = worker.pending_transactions
.find_transaction(&transaction_bytes)
.next();
if let Some(existing_tx_id) = existing_tx_id {
let existing_tx = worker.pending_transactions
.transaction_user_data_mut(existing_tx_id)
.unwrap();
if let Some(updates_report) = updates_report {
existing_tx.add_status_update(updates_report);
}
continue;
}
if worker.pending_transactions.num_transactions() >= worker.max_pending_transactions {
if let Some(mut updates_report) = updates_report {
let _ = updates_report.try_send(TransactionStatus::Dropped(DropReason::MaxPendingTransactionsReached));
}
continue;
}
worker
.pending_transactions
.add_unvalidated(transaction_bytes, PendingTransaction {
when_reannounce: TPlat::now(),
status_update: {
let mut vec = Vec::with_capacity(1);
if let Some(updates_report) = updates_report {
vec.push(updates_report);
}
vec
},
latest_status: None,
validation_in_progress: None,
});
}
}
}
}
}
}
}
struct Worker<TPlat: Platform> {
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
network_service: Arc<network_service::NetworkService<TPlat>>,
network_chain_index: usize,
pending_transactions: light_pool::LightPool<PendingTransaction<TPlat>, Block, InvalidOrError>,
max_pending_transactions: usize,
block_downloads:
FuturesUnordered<future::BoxFuture<'static, ([u8; 32], Result<Vec<Vec<u8>>, ()>)>>,
validations_in_progress:
FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
next_reannounce: FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
max_concurrent_downloads: usize,
}
impl<TPlat: Platform> Worker<TPlat> {
fn set_best_block(&mut self, log_target: &str, new_best_block_hash: &[u8; 32]) {
let updates = self
.pending_transactions
.set_best_block(new_best_block_hash);
log::debug!(
target: log_target,
"BestChainUpdate(new-best-block={}, included-transactions={{{}}}, retracted-transactions={{{}}})",
HashDisplay(new_best_block_hash),
updates.included_transactions.iter()
.map(|(id, _, _)| HashDisplay(&blake2_hash(self.pending_transactions.scale_encoding(*id).unwrap())).to_string())
.join(", "),
updates.retracted_transactions.iter()
.map(|(id, _, _)| HashDisplay(&blake2_hash(self.pending_transactions.scale_encoding(*id).unwrap())).to_string())
.join(", ")
);
for (tx_id, _, _) in updates.retracted_transactions {
let tx = self
.pending_transactions
.transaction_user_data_mut(tx_id)
.unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate { block_hash: None });
}
for (tx_id, block_hash, block_body_index) in updates.included_transactions {
let tx = self
.pending_transactions
.transaction_user_data_mut(tx_id)
.unwrap();
let block_body_index = u32::try_from(block_body_index).unwrap();
tx.update_status(TransactionStatus::IncludedBlockUpdate {
block_hash: Some((block_hash, block_body_index)),
});
}
}
}
struct Block {
scale_encoded_header: Vec<u8>,
failed_downloads: u8,
downloading: bool,
}
struct PendingTransaction<TPlat: Platform> {
when_reannounce: TPlat::Instant,
status_update: Vec<mpsc::Sender<TransactionStatus>>,
latest_status: Option<TransactionStatus>,
validation_in_progress: Option<
future::RemoteHandle<(
[u8; 32],
Result<validate::ValidTransaction, ValidationError>,
)>,
>,
}
impl<TPlat: Platform> PendingTransaction<TPlat> {
fn add_status_update(&mut self, mut channel: mpsc::Sender<TransactionStatus>) {
if let Some(latest_status) = &self.latest_status {
if channel.try_send(latest_status.clone()).is_err() {
return;
}
}
self.status_update.push(channel);
}
fn update_status(&mut self, status: TransactionStatus) {
for n in 0..self.status_update.len() {
let mut channel = self.status_update.swap_remove(n);
if channel.try_send(status.clone()).is_ok() {
self.status_update.push(channel);
}
}
self.latest_status = Some(status);
}
}
async fn validate_transaction<TPlat: Platform>(
log_target: &str,
relay_chain_sync: &Arc<runtime_service::RuntimeService<TPlat>>,
relay_chain_sync_subscription_id: runtime_service::SubscriptionId,
block_hash: [u8; 32],
block_scale_encoded_header: &[u8],
scale_encoded_transaction: impl AsRef<[u8]> + Clone,
source: validate::TransactionSource,
) -> Result<validate::ValidTransaction, ValidationError> {
let runtime_lock = match relay_chain_sync
.pinned_block_runtime_lock(relay_chain_sync_subscription_id, &block_hash)
.await
{
Ok(l) => l,
Err(runtime_service::PinnedBlockRuntimeLockError::ObsoleteSubscription) => {
return Err(ValidationError::ObsoleteSubscription)
}
};
log::debug!(
target: log_target,
"TxValidations <= Start(tx={}, block={}, block_height={})",
HashDisplay(&blake2_hash(scale_encoded_transaction.as_ref())),
HashDisplay(runtime_lock.block_hash()),
header::decode(
block_scale_encoded_header,
relay_chain_sync.block_number_bytes()
)
.ok()
.map(|h| format!("#{}", h.number))
.unwrap_or_else(|| "unknown".to_owned())
);
let block_hash = *runtime_lock.block_hash();
let (runtime_call_lock, runtime) = runtime_lock
.start(
validate::VALIDATION_FUNCTION_NAME,
validate::validate_transaction_runtime_parameters_v3(
iter::once(scale_encoded_transaction.as_ref()),
source,
&block_hash,
),
1,
Duration::from_secs(8),
NonZeroU32::new(1).unwrap(),
)
.await
.map_err(ValidateTransactionError::Call)
.map_err(InvalidOrError::ValidateError)
.map_err(ValidationError::InvalidOrError)?;
let mut validation_in_progress = validate::validate_transaction(validate::Config {
runtime,
scale_encoded_header: block_scale_encoded_header,
block_number_bytes: relay_chain_sync.block_number_bytes(),
scale_encoded_transaction: iter::once(scale_encoded_transaction),
source,
});
loop {
match validation_in_progress {
validate::Query::Finished {
result: Ok(Ok(success)),
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Ok(success);
}
validate::Query::Finished {
result: Ok(Err(invalid)),
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(
invalid,
)));
}
validate::Query::Finished {
result: Err(error),
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Validation(error)),
));
}
validate::Query::StorageGet(get) => {
let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(validate::Query::StorageGet(get).into_prototype());
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
};
validation_in_progress = get.inject_value(storage_value.map(iter::once));
}
validate::Query::NextKey(nk) => {
runtime_call_lock.unlock(validate::Query::NextKey(nk).into_prototype());
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::NextKeyForbidden),
));
}
validate::Query::PrefixKeys(prefix) => {
let rq_prefix = prefix.prefix().as_ref().to_owned();
let result = runtime_call_lock
.storage_prefix_keys_ordered(&rq_prefix)
.map(|i| i.map(|v| v.as_ref().to_owned()).collect::<Vec<_>>());
match result {
Ok(v) => validation_in_progress = prefix.inject_keys_ordered(v.into_iter()),
Err(err) => {
runtime_call_lock
.unlock(validate::Query::PrefixKeys(prefix).into_prototype());
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
}
}
}
}
}
fn blake2_hash(bytes: &[u8]) -> [u8; 32] {
<[u8; 32]>::try_from(blake2_rfc::blake2b::blake2b(32, &[], bytes).as_bytes()).unwrap()
}