use crate::{
Block, BlockBuilder, BlockBuilderError, BlockError, BlockForkPoint, CacheError, ExecutorConfig,
ExecutorError, ForkRpcClient, InherentProvider, RuntimeExecutor, StorageCache,
builder::ApplyExtrinsicResult,
create_next_header_with_slot, default_providers,
strings::{
inherent::{parachain::storage_keys, timestamp::slot_duration},
txpool::{runtime_api, transaction_source},
},
};
use scale::Decode;
use std::{
path::Path,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use subxt::config::substrate::H256;
use tokio::sync::{OnceCell, RwLock, broadcast};
use url::Url;
const RECONNECT_LOG_DEBOUNCE_SECS: u64 = 30;
pub type BlockBody = Vec<Vec<u8>>;
#[derive(Debug, Clone, Decode)]
pub enum TransactionValidity {
#[codec(index = 0)]
Ok(ValidTransaction),
#[codec(index = 1)]
Err(TransactionValidityError),
}
#[derive(Debug, Clone, Decode)]
pub struct ValidTransaction {
pub priority: u64,
pub requires: Vec<Vec<u8>>,
pub provides: Vec<Vec<u8>>,
pub longevity: u64,
pub propagate: bool,
}
#[derive(Debug, Clone, Decode)]
pub enum TransactionValidityError {
#[codec(index = 0)]
Invalid(InvalidTransaction),
#[codec(index = 1)]
Unknown(UnknownTransaction),
}
#[derive(Debug, Clone, Decode)]
pub enum InvalidTransaction {
#[codec(index = 0)]
Call,
#[codec(index = 1)]
Payment,
#[codec(index = 2)]
Future,
#[codec(index = 3)]
Stale,
#[codec(index = 4)]
BadMandatory,
#[codec(index = 5)]
MandatoryDispatch,
#[codec(index = 6)]
BadSigner,
#[codec(index = 7)]
Custom(u8),
}
#[derive(Debug, Clone, Decode)]
pub enum UnknownTransaction {
#[codec(index = 0)]
CannotLookup,
#[codec(index = 1)]
NoUnsignedValidator,
#[codec(index = 2)]
Custom(u8),
}
impl TransactionValidityError {
pub fn reason(&self) -> String {
match self {
Self::Invalid(inv) => match inv {
InvalidTransaction::Call => "Call failed".into(),
InvalidTransaction::Payment => "Insufficient funds for fees".into(),
InvalidTransaction::Future => "Nonce too high".into(),
InvalidTransaction::Stale => "Nonce too low (already used)".into(),
InvalidTransaction::BadMandatory => "Bad mandatory inherent".into(),
InvalidTransaction::MandatoryDispatch => "Mandatory dispatch failed".into(),
InvalidTransaction::BadSigner => "Invalid signature".into(),
InvalidTransaction::Custom(code) => format!("Custom error: {code}"),
},
Self::Unknown(unk) => match unk {
UnknownTransaction::CannotLookup => "Cannot lookup validity".into(),
UnknownTransaction::NoUnsignedValidator => "No unsigned validator".into(),
UnknownTransaction::Custom(code) => format!("Custom unknown: {code}"),
},
}
}
pub fn is_unknown(&self) -> bool {
matches!(self, Self::Unknown(_))
}
}
#[derive(Debug, Clone)]
pub struct BuildBlockResult {
pub block: Block,
pub included: Vec<Vec<u8>>,
pub failed: Vec<FailedExtrinsic>,
}
#[derive(Debug, Clone)]
pub struct FailedExtrinsic {
pub extrinsic: Vec<u8>,
pub reason: String,
}
const EVENT_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub enum BlockchainEvent {
NewBlock {
hash: H256,
number: u32,
parent_hash: H256,
header: Vec<u8>,
modified_keys: Vec<Vec<u8>>,
},
}
#[derive(Debug, thiserror::Error)]
pub enum BlockchainError {
#[error(transparent)]
Block(#[from] BlockError),
#[error(transparent)]
Builder(#[from] BlockBuilderError),
#[error(transparent)]
Cache(#[from] CacheError),
#[error(transparent)]
Executor(#[from] ExecutorError),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChainType {
RelayChain,
Parachain {
para_id: u32,
},
}
pub struct Blockchain {
head: RwLock<Block>,
inherent_providers: Vec<Arc<dyn InherentProvider>>,
chain_name: String,
chain_type: ChainType,
fork_point_hash: H256,
fork_point_number: u32,
executor_config: ExecutorConfig,
executor: RwLock<RuntimeExecutor>,
warm_prototype: tokio::sync::Mutex<Option<smoldot::executor::host::HostVmPrototype>>,
remote: crate::RemoteStorageLayer,
prefetch_done: OnceCell<()>,
cached_slot_duration: AtomicU64,
event_tx: broadcast::Sender<BlockchainEvent>,
genesis_hash_cache: OnceCell<String>,
chain_properties_cache: OnceCell<Option<serde_json::Value>>,
last_reconnect_log: AtomicU64,
}
impl Blockchain {
pub async fn fork(
endpoint: &Url,
cache_path: Option<&Path>,
) -> Result<Arc<Self>, BlockchainError> {
Self::fork_with_config(endpoint, cache_path, None, ExecutorConfig::default()).await
}
pub async fn fork_at(
endpoint: &Url,
cache_path: Option<&Path>,
fork_point: Option<BlockForkPoint>,
) -> Result<Arc<Self>, BlockchainError> {
Self::fork_with_config(endpoint, cache_path, fork_point, ExecutorConfig::default()).await
}
pub async fn fork_with_config(
endpoint: &Url,
cache_path: Option<&Path>,
fork_point: Option<BlockForkPoint>,
executor_config: ExecutorConfig,
) -> Result<Arc<Self>, BlockchainError> {
let cache = StorageCache::open(cache_path).await?;
let fork_point = match fork_point {
Some(fp) => fp,
None => {
let rpc =
crate::ForkRpcClient::connect(endpoint).await.map_err(BlockError::from)?;
let finalized = rpc.finalized_head().await.map_err(BlockError::from)?;
BlockForkPoint::Hash(finalized)
},
};
let fork_block = Block::fork_point(endpoint, cache, fork_point).await?;
let fork_point_hash = fork_block.hash;
let fork_point_number = fork_block.number;
let chain_type = Self::detect_chain_type(&fork_block).await?;
let chain_name = Self::get_chain_name(&fork_block).await?;
let is_parachain = matches!(chain_type, ChainType::Parachain { .. });
let inherent_providers = default_providers(is_parachain)
.into_iter()
.map(|p| Arc::from(p) as Arc<dyn InherentProvider>)
.collect();
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
let remote = fork_block.storage().remote().clone();
let runtime_code = fork_block.runtime_code().await?;
let executor = RuntimeExecutor::with_config(runtime_code, None, executor_config.clone())?;
log::debug!("Forked at block #{fork_point_number} (0x{})", hex::encode(fork_point_hash));
let blockchain = Arc::new(Self {
head: RwLock::new(fork_block),
inherent_providers,
chain_name,
chain_type,
fork_point_hash,
fork_point_number,
executor_config,
executor: RwLock::new(executor),
warm_prototype: tokio::sync::Mutex::new(None),
prefetch_done: OnceCell::new(),
cached_slot_duration: AtomicU64::new(0),
remote,
event_tx,
genesis_hash_cache: OnceCell::new(),
chain_properties_cache: OnceCell::new(),
last_reconnect_log: AtomicU64::new(0),
});
#[cfg(not(test))]
{
let bc = Arc::clone(&blockchain);
tokio::spawn(async move { bc.warmup().await });
}
Ok(blockchain)
}
pub async fn warmup(self: &Arc<Self>) {
let warmup_start = std::time::Instant::now();
log::debug!("[Blockchain] Background warmup starting...");
let executor = self.executor.read().await.clone();
match executor.create_prototype() {
Ok(proto) => {
log::debug!(
"[Blockchain] Warmup: WASM prototype compiled ({:?})",
warmup_start.elapsed()
);
let head = self.head.read().await;
let (result, returned_proto) = executor
.call_with_prototype(
Some(proto),
crate::strings::inherent::timestamp::slot_duration::AURA_API_METHOD,
&[],
head.storage(),
)
.await;
let aura_duration =
result.ok().and_then(|r| u64::decode(&mut r.output.as_slice()).ok());
let duration = if let Some(d) = aura_duration {
d
} else {
let metadata = head.metadata().await.ok();
let babe_duration = metadata.as_ref().and_then(|m| {
use crate::strings::inherent::timestamp::slot_duration;
m.pallet_by_name(slot_duration::BABE_PALLET)?
.constant_by_name(slot_duration::BABE_EXPECTED_BLOCK_TIME)
.and_then(|c| u64::decode(&mut &c.value()[..]).ok())
});
babe_duration.unwrap_or(match self.chain_type {
ChainType::RelayChain => slot_duration::RELAY_CHAIN_FALLBACK_MS,
ChainType::Parachain { .. } => slot_duration::PARACHAIN_FALLBACK_MS,
})
};
drop(head);
self.cached_slot_duration.store(duration, Ordering::Release);
*self.warm_prototype.lock().await = returned_proto;
log::debug!(
"[Blockchain] Warmup: slot_duration={duration}ms ({:?})",
warmup_start.elapsed()
);
},
Err(e) => log::warn!("[Blockchain] Warmup: prototype compilation failed: {e}"),
}
self.ensure_prefetched().await;
log::debug!("[Blockchain] Warmup: prefetch done ({:?})", warmup_start.elapsed());
let head = self.head.read().await.clone();
for provider in &self.inherent_providers {
provider.warmup(&head, &executor).await;
}
log::debug!("[Blockchain] Background warmup complete ({:?})", warmup_start.elapsed());
}
async fn ensure_prefetched(&self) {
self.prefetch_done
.get_or_init(|| async {
if let Err(e) = self.do_prefetch().await {
log::warn!("[Blockchain] Storage prefetch failed (non-fatal): {e}");
}
})
.await;
}
async fn do_prefetch(&self) -> Result<(), BlockchainError> {
let head = self.head.read().await;
let metadata = head.metadata().await?;
let block_hash = head.storage().fork_block_hash();
let mut value_keys: Vec<Vec<u8>> = Vec::new();
let mut pallet_prefixes: Vec<Vec<u8>> = Vec::new();
for pallet in metadata.pallets() {
let pallet_hash = sp_core::twox_128(pallet.name().as_bytes());
if let Some(storage) = pallet.storage() {
for entry in storage.entries() {
if matches!(
entry.entry_type(),
subxt::metadata::types::StorageEntryType::Plain(_)
) {
let entry_hash = sp_core::twox_128(entry.name().as_bytes());
value_keys.push([pallet_hash.as_slice(), entry_hash.as_slice()].concat());
}
}
pallet_prefixes.push(pallet_hash.to_vec());
}
}
if !value_keys.is_empty() {
let key_refs: Vec<&[u8]> = value_keys.iter().map(|k| k.as_slice()).collect();
if let Err(e) = self.remote.get_batch(block_hash, &key_refs).await {
log::debug!(
"[Blockchain] Warmup: StorageValue batch fetch failed (non-fatal): {e}"
);
}
}
let page_size = crate::strings::builder::PREFETCH_PAGE_SIZE;
let scan_futures: Vec<_> = pallet_prefixes
.iter()
.map(|prefix| self.remote.prefetch_prefix_single_page(block_hash, prefix, page_size))
.collect();
let scan_results = futures::future::join_all(scan_futures).await;
let mut scan_keys = 0usize;
for count in scan_results.into_iter().flatten() {
scan_keys += count;
}
log::debug!(
"[Blockchain] Prefetched {} StorageValue + {} map keys ({} pallets)",
value_keys.len(),
scan_keys,
pallet_prefixes.len(),
);
Ok(())
}
pub fn chain_name(&self) -> &str {
&self.chain_name
}
pub fn chain_type(&self) -> &ChainType {
&self.chain_type
}
pub fn fork_point(&self) -> H256 {
self.fork_point_hash
}
pub fn fork_point_number(&self) -> u32 {
self.fork_point_number
}
pub fn endpoint(&self) -> &Url {
self.remote.endpoint()
}
pub async fn genesis_hash(&self) -> Result<String, BlockchainError> {
self.genesis_hash_cache
.get_or_try_init(|| async {
match self.block_hash_at(0).await? {
Some(hash) => Ok(format!("0x{}", hex::encode(hash.as_bytes()))),
None => Err(BlockchainError::Block(BlockError::RuntimeCodeNotFound)),
}
})
.await
.cloned()
}
pub async fn chain_properties(&self) -> Option<serde_json::Value> {
self.chain_properties_cache
.get_or_init(|| async {
match ForkRpcClient::connect(self.endpoint()).await {
Ok(client) => match client.system_properties().await {
Ok(system_props) => serde_json::to_value(system_props).ok(),
Err(_) => None,
},
Err(_) => None,
}
})
.await
.clone()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<BlockchainEvent> {
self.event_tx.subscribe()
}
pub async fn head(&self) -> Block {
self.head.read().await.clone()
}
pub async fn head_number(&self) -> u32 {
self.head.read().await.number
}
pub async fn head_hash(&self) -> H256 {
self.head.read().await.hash
}
pub async fn block_body(&self, hash: H256) -> Result<Option<BlockBody>, BlockchainError> {
let head = self.head.read().await;
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.hash == hash {
if block.parent.is_none() {
break; }
return Ok(Some(block.extrinsics.clone()));
}
current = block.parent.as_deref();
}
drop(head);
match self.remote.block_body(hash).await {
Ok(body) => Ok(body),
Err(first_err) =>
if self.reconnect_upstream().await {
Ok(self.remote.block_body(hash).await.map_err(BlockError::from)?)
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
},
}
}
pub async fn block_header(&self, hash: H256) -> Result<Option<Vec<u8>>, BlockchainError> {
let head = self.head.read().await;
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.hash == hash {
return Ok(Some(block.header.clone()));
}
current = block.parent.as_deref();
}
drop(head);
match self.remote.block_header(hash).await {
Ok(header) => Ok(header),
Err(first_err) =>
if self.reconnect_upstream().await {
Ok(self.remote.block_header(hash).await.map_err(BlockError::from)?)
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
},
}
}
pub async fn block_hash_at(&self, block_number: u32) -> Result<Option<H256>, BlockchainError> {
let head = self.head.read().await;
if head.number < block_number {
return Ok(None);
}
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.number == block_number {
return Ok(Some(block.hash));
}
if block.parent.is_none() {
break;
}
current = block.parent.as_deref();
}
drop(head);
match self.remote.block_hash_by_number(block_number).await {
Ok(hash) => Ok(hash),
Err(first_err) =>
if self.reconnect_upstream().await {
Ok(self
.remote
.block_hash_by_number(block_number)
.await
.map_err(BlockError::from)?)
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
},
}
}
pub async fn block_number_by_hash(&self, hash: H256) -> Result<Option<u32>, BlockchainError> {
let head = self.head.read().await;
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.hash == hash {
return Ok(Some(block.number));
}
current = block.parent.as_deref();
}
drop(head);
match self.remote.block_number_by_hash(hash).await {
Ok(number) => Ok(number),
Err(first_err) =>
if self.reconnect_upstream().await {
Ok(self.remote.block_number_by_hash(hash).await.map_err(BlockError::from)?)
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
},
}
}
pub async fn block_parent_hash(&self, hash: H256) -> Result<Option<H256>, BlockchainError> {
let head = self.head.read().await;
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.hash == hash {
return Ok(Some(block.parent_hash));
}
current = block.parent.as_deref();
}
drop(head);
match self.remote.parent_hash(hash).await {
Ok(parent) => Ok(parent),
Err(first_err) =>
if self.reconnect_upstream().await {
Ok(self.remote.parent_hash(hash).await.map_err(BlockError::from)?)
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
},
}
}
pub async fn build_block(
&self,
extrinsics: BlockBody,
) -> Result<BuildBlockResult, BlockchainError> {
let (parent_block, parent_hash) = {
let head = self.head.read().await;
let parent_hash = head.hash;
(head.clone(), parent_hash)
};
let executor = self.executor.read().await.clone();
let warm_prototype = self.warm_prototype.lock().await.take();
let header = create_next_header_with_slot(
&parent_block,
&executor,
vec![],
match self.cached_slot_duration.load(Ordering::Acquire) {
0 => None,
d => Some(d),
},
)
.await?;
let providers: Vec<Box<dyn InherentProvider>> = self
.inherent_providers
.iter()
.map(|p| Box::new(ArcProvider(Arc::clone(p))) as Box<dyn InherentProvider>)
.collect();
self.ensure_prefetched().await;
let mut builder = BlockBuilder::new(
parent_block,
executor,
header,
providers,
warm_prototype,
true, );
builder.initialize().await?;
builder.apply_inherents().await?;
let mut included = Vec::new();
let mut failed = Vec::new();
for extrinsic in extrinsics {
match builder.apply_extrinsic(extrinsic.clone()).await? {
ApplyExtrinsicResult::Success { .. } => {
included.push(extrinsic);
},
ApplyExtrinsicResult::DispatchFailed { error } => {
failed.push(FailedExtrinsic { extrinsic, reason: error });
},
}
}
let runtime_upgraded = builder.runtime_upgraded();
let (new_block, returned_prototype) = builder.finalize().await?;
let new_executor = if runtime_upgraded {
log::debug!("[Blockchain] Runtime upgrade detected, recreating executor");
match new_block.runtime_code().await {
Ok(code) =>
match RuntimeExecutor::with_config(code, None, self.executor_config.clone()) {
Ok(executor) => Some(executor),
Err(e) => {
log::error!(
"[Blockchain] Failed to recreate executor after runtime upgrade: {e}. \
Subsequent runtime calls may fail until the next successful upgrade."
);
None
},
},
Err(e) => {
log::error!(
"[Blockchain] Failed to get runtime code after upgrade: {e}. \
Subsequent runtime calls may fail until the next successful upgrade."
);
None
},
}
} else {
None
};
{
let mut head = self.head.write().await;
if head.hash != parent_hash {
return Err(BlockchainError::Block(BlockError::ConcurrentBlockBuild));
}
*head = new_block.clone();
if let Some(executor) = new_executor {
*self.executor.write().await = executor;
}
if runtime_upgraded {
self.cached_slot_duration.store(0, Ordering::Release);
}
}
if runtime_upgraded {
*self.warm_prototype.lock().await = None;
for provider in &self.inherent_providers {
provider.invalidate_cache();
}
} else {
*self.warm_prototype.lock().await = returned_prototype;
}
let modified_keys: Vec<Vec<u8>> = new_block
.storage()
.diff()
.map(|diff| diff.into_iter().map(|(k, _)| k).collect())
.unwrap_or_default();
let subscribers = self.event_tx.receiver_count();
log::debug!(
"[Blockchain] Emitting NewBlock #{} event ({} modified keys, {} subscribers, {} header bytes)",
new_block.number,
modified_keys.len(),
subscribers,
new_block.header.len(),
);
let _ = self.event_tx.send(BlockchainEvent::NewBlock {
hash: new_block.hash,
number: new_block.number,
parent_hash: new_block.parent_hash,
header: new_block.header.clone(),
modified_keys,
});
Ok(BuildBlockResult { block: new_block, included, failed })
}
pub async fn build_empty_block(&self) -> Result<Block, BlockchainError> {
self.build_block(vec![]).await.map(|result| result.block)
}
pub async fn call(&self, method: &str, args: &[u8]) -> Result<Vec<u8>, BlockchainError> {
let head_hash = self.head_hash().await;
self.call_at_block(head_hash, method, args)
.await
.map(|result| result.expect("head_hash always exists; qed;"))
}
pub async fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, BlockchainError> {
let block_number = self.head.read().await.number;
self.get_storage_value(block_number, key).await
}
pub async fn storage_at(
&self,
block_number: u32,
key: &[u8],
) -> Result<Option<Vec<u8>>, BlockchainError> {
self.get_storage_value(block_number, key).await
}
pub async fn storage_keys_paged(
&self,
prefix: &[u8],
count: u32,
start_key: Option<&[u8]>,
at: Option<H256>,
) -> Result<Vec<Vec<u8>>, BlockchainError> {
let block_hash = match at {
Some(h) => h,
None => self.head_hash().await,
};
log::debug!(
"storage_keys_paged: prefix=0x{} count={} start_key={} at={:?}",
hex::encode(prefix),
count,
start_key
.map(|k| format!("0x{}", hex::encode(k)))
.unwrap_or_else(|| "None".into()),
block_hash,
);
let block_number = self.block_number_by_hash(block_hash).await?;
if let Some(n) = block_number.filter(|&n| n > self.fork_point_number) {
let all_keys = {
let head = self.head.read().await;
head.storage()
.keys_by_prefix(prefix, n)
.await
.map_err(|e| BlockchainError::Block(BlockError::Storage(e)))?
};
let keys: Vec<Vec<u8>> = all_keys
.into_iter()
.filter(|k| start_key.is_none_or(|sk| k.as_slice() > sk))
.take(count as usize)
.collect();
log::debug!("storage_keys_paged: returned {} keys (fork-local)", keys.len());
Ok(keys)
} else {
let head = self.head.read().await;
let rpc = head.storage().remote().rpc();
match rpc.storage_keys_paged(prefix, count, start_key, block_hash).await {
Ok(keys) => {
log::debug!("storage_keys_paged: returned {} keys", keys.len());
Ok(keys)
},
Err(first_err) => {
drop(head);
if self.reconnect_upstream().await {
let head = self.head.read().await;
let rpc = head.storage().remote().rpc();
let keys = rpc
.storage_keys_paged(prefix, count, start_key, block_hash)
.await
.map_err(|e| BlockchainError::Block(BlockError::Rpc(e)))?;
log::debug!(
"storage_keys_paged: returned {} keys (after reconnect)",
keys.len()
);
Ok(keys)
} else {
Err(BlockchainError::Block(BlockError::Rpc(first_err)))
}
},
}
}
}
pub async fn storage_keys_by_prefix(
&self,
prefix: &[u8],
at: H256,
) -> Result<Vec<Vec<u8>>, BlockchainError> {
log::debug!(
"storage_keys_by_prefix: prefix=0x{} ({} bytes) at={:?}",
hex::encode(prefix),
prefix.len(),
at,
);
let block_number = self.block_number_by_hash(at).await?;
let keys = if let Some(n) = block_number.filter(|&n| n > self.fork_point_number) {
let head = self.head.read().await;
head.storage()
.keys_by_prefix(prefix, n)
.await
.map_err(|e| BlockchainError::Block(BlockError::Storage(e)))?
} else {
let head = self.head.read().await;
head.storage()
.remote()
.get_keys(at, prefix)
.await
.map_err(|e| BlockchainError::Block(BlockError::RemoteStorage(e)))?
};
log::debug!(
"storage_keys_by_prefix: returned {} keys for prefix=0x{}",
keys.len(),
hex::encode(prefix)
);
Ok(keys)
}
async fn get_storage_value(
&self,
block_number: u32,
key: &[u8],
) -> Result<Option<Vec<u8>>, BlockchainError> {
let head = self.head.read().await;
match head.storage().get(block_number, key).await {
Ok(value) => Ok(value.and_then(|v| v.value.clone())),
Err(first_err) => {
if self.reconnect_upstream().await {
let value =
head.storage().get(block_number, key).await.map_err(BlockError::from)?;
Ok(value.and_then(|v| v.value.clone()))
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
}
},
}
}
async fn detect_chain_type(block: &Block) -> Result<ChainType, BlockchainError> {
let metadata = block.metadata().await?;
if metadata.pallet_by_name("ParachainSystem").is_some() {
let para_id = Self::get_para_id(block).await.unwrap_or(0);
Ok(ChainType::Parachain { para_id })
} else {
Ok(ChainType::RelayChain)
}
}
async fn get_para_id(block: &Block) -> Option<u32> {
use scale::Decode;
let pallet_hash = sp_core::twox_128(storage_keys::PARACHAIN_INFO_PALLET);
let storage_hash = sp_core::twox_128(storage_keys::PARACHAIN_ID);
let key: Vec<u8> = [pallet_hash.as_slice(), storage_hash.as_slice()].concat();
let value = block.storage().get(block.number, &key).await.ok().flatten()?;
value.value.as_ref().map(|value| u32::decode(&mut value.as_slice()).ok())?
}
async fn get_chain_name(block: &Block) -> Result<String, BlockchainError> {
let runtime_code = block.runtime_code().await?;
let executor = RuntimeExecutor::new(runtime_code, None)?;
let version = executor.runtime_version()?;
Ok(version.spec_name)
}
pub async fn call_at_block(
&self,
hash: H256,
method: &str,
args: &[u8],
) -> Result<Option<Vec<u8>>, BlockchainError> {
let head_block = {
let head = self.head.read().await;
(hash == head.hash).then(|| head.clone())
};
if let Some(head_block) = head_block {
let pre_call_hash = head_block.hash;
let executor = self.executor.read().await.clone();
let warm_prototype = self.warm_prototype.lock().await.take();
let (result, returned_prototype) = executor
.call_with_prototype(warm_prototype, method, args, head_block.storage())
.await;
if self.head.read().await.hash == pre_call_hash {
*self.warm_prototype.lock().await = returned_prototype;
}
return Ok(Some(result?.output));
}
let block = self.find_or_create_block_for_call(hash).await?;
let Some(block) = block else {
return Ok(None); };
let runtime_code = block.runtime_code().await?;
let executor =
RuntimeExecutor::with_config(runtime_code, None, self.executor_config.clone())?;
let result = executor.call(method, args, block.storage()).await?;
Ok(Some(result.output))
}
pub async fn storage_batch(
&self,
at: H256,
keys: &[&[u8]],
) -> Result<Vec<Option<Vec<u8>>>, BlockchainError> {
match self.remote.get_batch(at, keys).await {
Ok(result) => Ok(result),
Err(first_err) => {
if self.reconnect_upstream().await {
self.remote
.get_batch(at, keys)
.await
.map_err(|e| BlockchainError::Block(BlockError::RemoteStorage(e)))
} else {
Err(BlockchainError::Block(BlockError::RemoteStorage(first_err)))
}
},
}
}
pub async fn proxy_state_call(
&self,
method: &str,
args: &[u8],
at: H256,
) -> Result<Vec<u8>, BlockchainError> {
let rpc = self.remote.rpc();
match rpc.state_call(method, args, Some(at)).await {
Ok(result) => Ok(result),
Err(first_err) => {
if self.reconnect_upstream().await {
rpc.state_call(method, args, Some(at))
.await
.map_err(|e| BlockchainError::Block(BlockError::from(e)))
} else {
Err(BlockchainError::Block(BlockError::from(first_err)))
}
},
}
}
async fn reconnect_upstream(&self) -> bool {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = self.last_reconnect_log.load(Ordering::Relaxed);
let elapsed_secs = now_ms.saturating_sub(last) / 1000;
if elapsed_secs >= RECONNECT_LOG_DEBOUNCE_SECS {
self.last_reconnect_log.store(now_ms, Ordering::Relaxed);
log::debug!(
"Upstream connection lost, reconnecting to {}",
self.remote.rpc().endpoint()
);
} else {
log::trace!(
"Upstream connection lost, reconnecting to {}",
self.remote.rpc().endpoint()
);
}
self.remote.rpc().reconnect().await.is_ok()
}
pub async fn validate_extrinsic(
&self,
extrinsic: &[u8],
) -> Result<ValidTransaction, TransactionValidityError> {
let head = self.head.read().await.clone();
let mut args = Vec::with_capacity(1 + extrinsic.len() + 32);
args.push(transaction_source::EXTERNAL);
args.extend(extrinsic);
args.extend(head.hash.as_bytes());
let pre_call_hash = head.hash;
let executor = self.executor.read().await.clone();
let warm_prototype = self.warm_prototype.lock().await.take();
let (result, returned_prototype) = executor
.call_with_prototype(
warm_prototype,
runtime_api::TAGGED_TRANSACTION_QUEUE_VALIDATE,
&args,
head.storage(),
)
.await;
if self.head.read().await.hash == pre_call_hash {
*self.warm_prototype.lock().await = returned_prototype;
}
let result = result
.map_err(|_| TransactionValidityError::Unknown(UnknownTransaction::CannotLookup))?;
let validity = TransactionValidity::decode(&mut result.output.as_slice())
.map_err(|_| TransactionValidityError::Unknown(UnknownTransaction::CannotLookup))?;
match validity {
TransactionValidity::Ok(valid) => Ok(valid),
TransactionValidity::Err(err) => Err(err),
}
}
async fn find_or_create_block_for_call(
&self,
hash: H256,
) -> Result<Option<Block>, BlockchainError> {
let head = self.head.read().await;
let mut current: Option<&Block> = Some(&head);
while let Some(block) = current {
if block.hash == hash {
return Ok(Some(block.clone()));
}
if block.parent.is_none() {
break;
}
current = block.parent.as_deref();
}
let block_number =
match self.remote.block_number_by_hash(hash).await.map_err(BlockError::from)? {
Some(number) => number,
None => return Ok(None), };
Ok(Some(Block::mocked_for_call(hash, block_number, head.storage().clone())))
}
#[cfg(test)]
pub async fn set_storage_for_testing(&self, key: &[u8], value: Option<&[u8]>) {
let mut head = self.head.write().await;
head.storage_mut().set(key, value).unwrap();
}
pub async fn initialize_dev_accounts(&self) -> Result<(), BlockchainError> {
use crate::dev::{
DEV_BALANCE, ETHEREUM_DEV_ACCOUNTS, SUBSTRATE_DEV_ACCOUNTS, account_storage_key,
build_account_info, patch_free_balance, sudo_key_storage_key,
};
let is_ethereum = self
.chain_properties()
.await
.and_then(|props| props.get("isEthereum")?.as_bool())
.unwrap_or(false);
let mut head = self.head.write().await;
let accounts: Vec<(&str, Vec<u8>)> = if is_ethereum {
ETHEREUM_DEV_ACCOUNTS.iter().map(|(n, a)| (*n, a.to_vec())).collect()
} else {
SUBSTRATE_DEV_ACCOUNTS.iter().map(|(n, a)| (*n, a.to_vec())).collect()
};
let keys: Vec<Vec<u8>> = accounts.iter().map(|(_, a)| account_storage_key(a)).collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let existing_values = head
.storage()
.remote()
.get_batch(self.fork_point_hash, &key_refs)
.await
.map_err(BlockError::from)?;
let entries: Vec<(&[u8], Option<Vec<u8>>)> = keys
.iter()
.zip(existing_values.iter())
.map(|(key, existing)| {
let value = match existing {
Some(data) => patch_free_balance(data, DEV_BALANCE),
None => build_account_info(DEV_BALANCE),
};
(key.as_slice(), Some(value))
})
.collect();
let batch: Vec<(&[u8], Option<&[u8]>)> =
entries.iter().map(|(k, v)| (*k, v.as_deref())).collect();
head.storage_mut().set_batch_initial(&batch).map_err(BlockError::from)?;
for (name, addr) in &accounts {
log::debug!("Funded dev account: {name} (0x{})", hex::encode(addr));
}
let metadata = head.metadata().await?;
if metadata.pallet_by_name("Sudo").is_some() {
let key = sudo_key_storage_key();
let sudo_account = &accounts[0].1;
head.storage_mut()
.set_initial(&key, Some(sudo_account))
.map_err(BlockError::from)?;
log::debug!("Set {} as sudo key (0x{})", accounts[0].0, hex::encode(&accounts[0].1));
}
Ok(())
}
pub async fn clear_local_storage(&self) -> Result<(), CacheError> {
let head = self.head.read().await;
head.storage().cache().clear_local_storage().await
}
}
struct ArcProvider(Arc<dyn InherentProvider>);
#[async_trait::async_trait]
impl InherentProvider for ArcProvider {
fn identifier(&self) -> &'static str {
self.0.identifier()
}
async fn provide(
&self,
parent: &Block,
executor: &RuntimeExecutor,
) -> Result<Vec<Vec<u8>>, BlockBuilderError> {
self.0.provide(parent, executor).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blockchain_error_display() {
let err = BlockchainError::Block(BlockError::RuntimeCodeNotFound);
assert!(err.to_string().contains("Runtime code not found"));
}
mod sequential {
use super::*;
use crate::testing::{
TestContext,
accounts::{ALICE, BOB},
constants::TRANSFER_AMOUNT,
helpers::{account_storage_key, build_mock_signed_extrinsic_v4, decode_free_balance},
};
#[tokio::test(flavor = "multi_thread")]
async fn fork_creates_blockchain_with_correct_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
assert!(blockchain.fork_point_number() > 0 || blockchain.fork_point_number() == 0);
assert_ne!(blockchain.fork_point(), H256::zero());
assert_eq!(blockchain.head_number().await, blockchain.fork_point_number());
assert_eq!(blockchain.head_hash().await, blockchain.fork_point());
}
#[tokio::test(flavor = "multi_thread")]
async fn fork_at_creates_blockchain_at_specific_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
let blockchain2 = Blockchain::fork_at(&ctx.endpoint, None, Some(fork_number.into()))
.await
.expect("Failed to fork at specific block");
assert_eq!(blockchain2.fork_point_number(), fork_number);
}
#[tokio::test(flavor = "multi_thread")]
async fn fork_with_invalid_endpoint_fails() {
let invalid_endpoint: Url = "ws://localhost:19999".parse().unwrap();
let result = Blockchain::fork(&invalid_endpoint, None).await;
assert!(result.is_err());
}
#[tokio::test(flavor = "multi_thread")]
async fn fork_at_with_invalid_block_number_fails() {
let ctx = TestContext::minimal().await;
let result = Blockchain::fork_at(&ctx.endpoint, None, Some(u32::MAX.into())).await;
assert!(result.is_err());
}
#[tokio::test(flavor = "multi_thread")]
async fn fork_detects_relay_chain_type() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
assert_eq!(*blockchain.chain_type(), ChainType::RelayChain);
}
#[tokio::test(flavor = "multi_thread")]
async fn fork_retrieves_chain_name() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
assert!(!blockchain.chain_name().is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn build_empty_block_advances_chain() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let initial_number = blockchain.head_number().await;
let initial_hash = blockchain.head_hash().await;
let new_block =
blockchain.build_empty_block().await.expect("Failed to build empty block");
assert_eq!(new_block.number, initial_number + 1);
assert_eq!(blockchain.head_number().await, initial_number + 1);
assert_ne!(blockchain.head_hash().await, initial_hash);
assert_eq!(new_block.parent_hash, initial_hash);
}
#[tokio::test(flavor = "multi_thread")]
async fn build_multiple_empty_blocks_creates_chain() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
for i in 1..=3 {
let block =
blockchain.build_empty_block().await.expect("Failed to build empty block");
assert_eq!(block.number, fork_number + i);
}
assert_eq!(blockchain.head_number().await, fork_number + 3);
}
#[tokio::test(flavor = "multi_thread")]
async fn storage_returns_value_for_existing_key() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let key = {
let mut k = Vec::new();
k.extend(sp_core::twox_128(b"System"));
k.extend(sp_core::twox_128(b"Number"));
k
};
let value = blockchain.storage(&key).await.expect("Failed to query storage");
assert!(value.is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn storage_returns_none_for_nonexistent_key() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let nonexistent_key = b"nonexistent_key_12345";
let value = blockchain.storage(nonexistent_key).await.expect("Failed to query storage");
assert!(value.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn storage_at_queries_specific_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
blockchain.build_empty_block().await.expect("Failed to build block");
let key = {
let mut k = Vec::new();
k.extend(sp_core::twox_128(b"System"));
k.extend(sp_core::twox_128(b"Number"));
k
};
let value = blockchain
.storage_at(fork_number, &key)
.await
.expect("Failed to query storage at block");
assert!(value.is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn call_executes_runtime_api() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let result =
blockchain.call("Core_version", &[]).await.expect("Failed to call runtime API");
assert!(!result.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn head_returns_current_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let head = blockchain.head().await;
assert_eq!(head.number, blockchain.head_number().await);
assert_eq!(head.hash, blockchain.head_hash().await);
}
#[tokio::test(flavor = "multi_thread")]
async fn head_updates_after_building_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let old_head = blockchain.head().await;
blockchain.build_empty_block().await.expect("Failed to build block");
let new_head = blockchain.head().await;
assert_eq!(new_head.number, old_head.number + 1);
assert_ne!(new_head.hash, old_head.hash);
assert_eq!(new_head.parent_hash, old_head.hash);
}
#[tokio::test(flavor = "multi_thread")]
async fn build_block_with_signed_transfer_updates_balances() {
use crate::{ExecutorConfig, SignatureMockMode};
use scale::{Compact, Encode};
let ctx = TestContext::minimal().await;
let config = ExecutorConfig {
signature_mock: SignatureMockMode::AlwaysValid,
..Default::default()
};
let blockchain = Blockchain::fork_with_config(&ctx.endpoint, None, None, config)
.await
.expect("Failed to fork blockchain");
let alice_key = account_storage_key(&ALICE);
let bob_key = account_storage_key(&BOB);
let head = blockchain.head().await;
let head_number_before = head.number;
let metadata = head.metadata().await.expect("Failed to get metadata");
let alice_balance_before = blockchain
.storage_at(head_number_before, &alice_key)
.await
.expect("Failed to get Alice balance")
.map(|v| decode_free_balance(&v))
.expect("Alice should have a balance");
let bob_balance_before = blockchain
.storage_at(head_number_before, &bob_key)
.await
.expect("Failed to get Bob balance")
.map(|v| decode_free_balance(&v))
.expect("Bob should have a balance");
let balances_pallet =
metadata.pallet_by_name("Balances").expect("Balances pallet should exist");
let pallet_index = balances_pallet.index();
let transfer_call = balances_pallet
.call_variant_by_name("transfer_keep_alive")
.expect("transfer_keep_alive call should exist");
let call_index = transfer_call.index;
let mut call_data = vec![pallet_index, call_index];
call_data.push(0x00); call_data.extend(BOB);
call_data.extend(Compact(TRANSFER_AMOUNT).encode());
let extrinsic = build_mock_signed_extrinsic_v4(&call_data);
let result = blockchain
.build_block(vec![extrinsic])
.await
.expect("Failed to build block with transfer");
let new_block = result.block;
assert_eq!(new_block.number, head_number_before + 1);
let alice_balance_after = blockchain
.storage_at(new_block.number, &alice_key)
.await
.expect("Failed to get Alice balance after")
.map(|v| decode_free_balance(&v))
.expect("Alice should still have a balance");
let bob_balance_after = blockchain
.storage_at(new_block.number, &bob_key)
.await
.expect("Failed to get Bob balance after")
.map(|v| decode_free_balance(&v))
.expect("Bob should still have a balance");
assert!(
alice_balance_after < alice_balance_before,
"Alice balance should decrease after transfer"
);
assert_eq!(
bob_balance_after,
bob_balance_before + TRANSFER_AMOUNT,
"Bob should receive exactly the transfer amount"
);
assert!(
alice_balance_before - alice_balance_after >= TRANSFER_AMOUNT,
"Alice should have paid at least the transfer amount plus fees"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn block_body_returns_extrinsics_for_head() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block = blockchain.build_empty_block().await.expect("Failed to build block");
let body = blockchain.block_body(block.hash).await.expect("Failed to get block body");
assert!(body.is_some(), "Should return body for head hash");
let extrinsics = body.unwrap();
assert!(!extrinsics.is_empty(), "Built block should have inherent extrinsics");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_body_returns_extrinsics_for_parent_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block1 = blockchain.build_empty_block().await.expect("Failed to build block 1");
let _block2 = blockchain.build_empty_block().await.expect("Failed to build block 2");
let body = blockchain.block_body(block1.hash).await.expect("Failed to get block body");
assert!(body.is_some(), "Should return body for parent block");
let extrinsics = body.unwrap();
assert!(!extrinsics.is_empty(), "Parent block should have inherent extrinsics");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_body_returns_extrinsics_for_fork_point_from_remote() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_point_hash = blockchain.fork_point();
let body =
blockchain.block_body(fork_point_hash).await.expect("Failed to get block body");
assert!(body.is_some(), "Should return body for fork point from remote");
assert!(!body.unwrap().is_empty(), "Should contain body");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_body_returns_none_for_unknown_hash() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let unknown_hash = H256::from([0xde; 32]);
let body =
blockchain.block_body(unknown_hash).await.expect("Failed to query block body");
assert!(body.is_none(), "Should return None for unknown hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_header_returns_header_for_head() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block = blockchain.build_empty_block().await.expect("Failed to build block");
let header =
blockchain.block_header(block.hash).await.expect("Failed to get block header");
assert!(header.is_some(), "Should return header for head hash");
let header_bytes = header.unwrap();
assert!(!header_bytes.is_empty(), "Built block should have a header");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_header_returns_header_for_different_blocks() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block1 = blockchain.build_empty_block().await.expect("Failed to build block 1");
let block2 = blockchain.build_empty_block().await.expect("Failed to build block 2");
let header_1 = blockchain
.block_header(block1.hash)
.await
.expect("Failed to get block header")
.unwrap();
let header_2 = blockchain
.block_header(block2.hash)
.await
.expect("Failed to get block header")
.unwrap();
assert_ne!(header_1, header_2);
}
#[tokio::test(flavor = "multi_thread")]
async fn block_header_returns_header_for_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_point_hash = blockchain.fork_point();
let header = blockchain
.block_header(fork_point_hash)
.await
.expect("Failed to get block header");
assert!(header.is_some(), "Should return header for fork point from remote");
assert!(!header.unwrap().is_empty(), "Should contain header");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_header_returns_none_for_unknown_hash() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let unknown_hash = H256::from([0xde; 32]);
let header = blockchain
.block_header(unknown_hash)
.await
.expect("Failed to query block header");
assert!(header.is_none(), "Should return None for unknown hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_header_returns_header_for_historical_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
if fork_number > 0 {
let historical_hash = blockchain
.block_hash_at(fork_number - 1)
.await
.expect("Failed to get historical hash")
.expect("Historical block should exist");
let header = blockchain
.block_header(historical_hash)
.await
.expect("Failed to get block header");
assert!(header.is_some(), "Should return header for historical block");
assert!(!header.unwrap().is_empty(), "Historical block should have a header");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn block_hash_at_returns_hash_for_head() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block = blockchain.build_empty_block().await.expect("Failed to build block");
let hash =
blockchain.block_hash_at(block.number).await.expect("Failed to get block hash");
assert!(hash.is_some(), "Should return hash for head block number");
assert_eq!(hash.unwrap(), block.hash, "Hash should match head block hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_hash_at_returns_hash_for_parent_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block1 = blockchain.build_empty_block().await.expect("Failed to build block 1");
let _block2 = blockchain.build_empty_block().await.expect("Failed to build block 2");
let hash =
blockchain.block_hash_at(block1.number).await.expect("Failed to get block hash");
assert!(hash.is_some(), "Should return hash for parent block number");
assert_eq!(hash.unwrap(), block1.hash, "Hash should match first block hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_hash_at_returns_hash_for_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_point_number = blockchain.fork_point_number();
let fork_point_hash = blockchain.fork_point();
let hash = blockchain
.block_hash_at(fork_point_number)
.await
.expect("Failed to get block hash");
assert!(hash.is_some(), "Should return hash for fork point");
assert_eq!(hash.unwrap(), fork_point_hash, "Hash should match fork point hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_hash_at_returns_hash_for_block_before_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_point_number = blockchain.fork_point_number();
if fork_point_number > 0 {
let hash = blockchain
.block_hash_at(fork_point_number - 1)
.await
.expect("Failed to get block hash");
assert!(hash.is_some(), "Should return hash for block before fork point");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn block_hash_at_returns_none_for_future_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let head_number = blockchain.head_number().await;
let hash = blockchain
.block_hash_at(head_number + 100)
.await
.expect("Failed to query block hash");
assert!(hash.is_none(), "Should return None for future block number");
}
#[tokio::test(flavor = "multi_thread")]
async fn block_number_by_hash_returns_number_for_head() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block = blockchain.build_empty_block().await.unwrap();
let number = blockchain
.block_number_by_hash(block.hash)
.await
.expect("Failed to query block number");
assert_eq!(number, Some(block.number));
}
#[tokio::test(flavor = "multi_thread")]
async fn block_number_by_hash_returns_number_for_parent() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block1 = blockchain.build_empty_block().await.unwrap();
let _block2 = blockchain.build_empty_block().await.unwrap();
let number = blockchain
.block_number_by_hash(block1.hash)
.await
.expect("Failed to query block number");
assert_eq!(number, Some(block1.number));
}
#[tokio::test(flavor = "multi_thread")]
async fn block_number_by_hash_returns_number_for_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_hash = blockchain.fork_point();
let fork_number = blockchain.fork_point_number();
let number = blockchain
.block_number_by_hash(fork_hash)
.await
.expect("Failed to query block number");
assert_eq!(number, Some(fork_number));
}
#[tokio::test(flavor = "multi_thread")]
async fn block_number_by_hash_returns_none_for_unknown() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let unknown_hash = H256::from_slice(&[0u8; 32]);
let number = blockchain
.block_number_by_hash(unknown_hash)
.await
.expect("Failed to query block number");
assert!(number.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn block_number_by_hash_returns_number_for_historical_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
if fork_number > 0 {
let historical_hash = blockchain
.block_hash_at(fork_number - 1)
.await
.expect("Failed to query block hash")
.expect("Block should exist");
let number = blockchain
.block_number_by_hash(historical_hash)
.await
.expect("Failed to query block number");
assert_eq!(number, Some(fork_number - 1));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_executes_at_head() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
blockchain.build_empty_block().await.unwrap();
blockchain.build_empty_block().await.unwrap();
blockchain.build_empty_block().await.unwrap();
let head_hash = blockchain.head_hash().await;
let result = blockchain
.call_at_block(head_hash, "Core_version", &[])
.await
.expect("Failed to call runtime API");
assert!(result.is_some(), "Should return result for head hash");
assert!(!result.unwrap().is_empty(), "Result should not be empty");
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_executes_at_fork_point() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
blockchain.build_empty_block().await.unwrap();
blockchain.build_empty_block().await.unwrap();
blockchain.build_empty_block().await.unwrap();
let fork_hash = blockchain.fork_point();
let result = blockchain
.call_at_block(fork_hash, "Core_version", &[])
.await
.expect("Failed to call runtime API");
assert!(result.is_some(), "Should return result for fork point hash");
assert!(!result.unwrap().is_empty(), "Result should not be empty");
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_executes_at_parent_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let block1 = blockchain.build_empty_block().await.expect("Failed to build block 1");
let _block2 = blockchain.build_empty_block().await.expect("Failed to build block 2");
let result = blockchain
.call_at_block(block1.hash, "Core_version", &[])
.await
.expect("Failed to call runtime API");
assert!(result.is_some(), "Should return result for parent block hash");
assert!(!result.unwrap().is_empty(), "Result should not be empty");
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_returns_none_for_unknown_hash() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let unknown_hash = H256::from([0xde; 32]);
let result = blockchain
.call_at_block(unknown_hash, "Core_version", &[])
.await
.expect("Failed to query");
assert!(result.is_none(), "Should return None for unknown hash");
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_executes_at_historical_block() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let fork_number = blockchain.fork_point_number();
if fork_number > 0 {
let historical_hash = blockchain
.block_hash_at(fork_number - 1)
.await
.expect("Failed to get historical hash")
.expect("Historical block should exist");
let result = blockchain
.call_at_block(historical_hash, "Core_version", &[])
.await
.expect("Failed to call runtime API");
assert!(result.is_some(), "Should return result for historical block");
assert!(!result.unwrap().is_empty(), "Result should not be empty");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn call_at_block_does_not_persist_storage() {
use crate::{DigestItem, consensus_engine, create_next_header};
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let head = blockchain.head().await;
let head_hash = head.hash;
let head_number = head.number;
let system_number_key: Vec<u8> =
[sp_core::twox_128(b"System").as_slice(), sp_core::twox_128(b"Number").as_slice()]
.concat();
let number_before = blockchain
.storage(&system_number_key)
.await
.expect("Failed to get System::Number")
.map(|v| {
u32::from_le_bytes(v.try_into().expect("System::Number should be 4 bytes"))
})
.expect("System::Number should exist");
let header = create_next_header(
&head,
vec![DigestItem::PreRuntime(consensus_engine::AURA, 0u64.to_le_bytes().to_vec())],
);
let result = blockchain
.call_at_block(head_hash, "Core_initialize_block", &header)
.await
.expect("Core_initialize_block call failed");
assert!(result.is_some(), "Block should exist");
let number_after = blockchain
.storage(&system_number_key)
.await
.expect("Failed to get System::Number after")
.map(|v| {
u32::from_le_bytes(v.try_into().expect("System::Number should be 4 bytes"))
})
.expect("System::Number should still exist");
assert_eq!(
number_before,
number_after,
"System::Number should NOT be modified by call_at_block. \
Before: {}, After: {} (would have been {} if persisted)",
number_before,
number_after,
head_number + 1
);
}
#[tokio::test(flavor = "multi_thread")]
async fn validate_extrinsic_accepts_valid_transfer() {
use crate::{ExecutorConfig, SignatureMockMode};
use scale::{Compact, Encode};
let ctx = TestContext::minimal().await;
let config = ExecutorConfig {
signature_mock: SignatureMockMode::AlwaysValid,
..Default::default()
};
let blockchain = Blockchain::fork_with_config(&ctx.endpoint, None, None, config)
.await
.expect("Failed to fork blockchain");
let head = blockchain.head().await;
let metadata = head.metadata().await.expect("Failed to get metadata");
let balances_pallet = metadata.pallet_by_name("Balances").expect("Balances pallet");
let pallet_index = balances_pallet.index();
let transfer_call = balances_pallet
.call_variant_by_name("transfer_keep_alive")
.expect("transfer_keep_alive");
let call_index = transfer_call.index;
let mut call_data = vec![pallet_index, call_index];
call_data.push(0x00); call_data.extend(BOB);
call_data.extend(Compact(TRANSFER_AMOUNT).encode());
let extrinsic = build_mock_signed_extrinsic_v4(&call_data);
let result = blockchain.validate_extrinsic(&extrinsic).await;
assert!(result.is_ok(), "Valid extrinsic should pass validation: {:?}", result);
}
#[tokio::test(flavor = "multi_thread")]
async fn validate_extrinsic_rejects_garbage() {
let ctx = TestContext::minimal().await;
let blockchain =
Blockchain::fork(&ctx.endpoint, None).await.expect("Failed to fork blockchain");
let garbage = vec![0xde, 0xad, 0xbe, 0xef];
let result = blockchain.validate_extrinsic(&garbage).await;
assert!(result.is_err(), "Garbage should fail validation");
}
#[tokio::test(flavor = "multi_thread")]
async fn build_block_result_tracks_included_extrinsics() {
use crate::{ExecutorConfig, SignatureMockMode};
use scale::{Compact, Encode};
let ctx = TestContext::minimal().await;
let config = ExecutorConfig {
signature_mock: SignatureMockMode::AlwaysValid,
..Default::default()
};
let blockchain = Blockchain::fork_with_config(&ctx.endpoint, None, None, config)
.await
.expect("Failed to fork");
let head = blockchain.head().await;
let metadata = head.metadata().await.expect("Failed to get metadata");
let balances_pallet = metadata.pallet_by_name("Balances").expect("Balances pallet");
let pallet_index = balances_pallet.index();
let transfer_call = balances_pallet
.call_variant_by_name("transfer_keep_alive")
.expect("transfer_keep_alive");
let call_index = transfer_call.index;
let mut call_data = vec![pallet_index, call_index];
call_data.push(0x00); call_data.extend(BOB);
call_data.extend(Compact(TRANSFER_AMOUNT).encode());
let extrinsic = build_mock_signed_extrinsic_v4(&call_data);
let result = blockchain
.build_block(vec![extrinsic.clone()])
.await
.expect("Failed to build block");
assert_eq!(result.included.len(), 1, "Should have 1 included extrinsic");
assert!(result.failed.is_empty(), "Should have no failed extrinsics");
assert_eq!(result.included[0], extrinsic);
}
#[tokio::test(flavor = "multi_thread")]
async fn build_block_result_tracks_failed_extrinsics() {
use crate::{ExecutorConfig, SignatureMockMode};
use scale::{Compact, Encode};
let ctx = TestContext::minimal().await;
let config = ExecutorConfig {
signature_mock: SignatureMockMode::AlwaysValid,
..Default::default()
};
let blockchain = Blockchain::fork_with_config(&ctx.endpoint, None, None, config)
.await
.expect("Failed to fork");
let head = blockchain.head().await;
let metadata = head.metadata().await.expect("Failed to get metadata");
let balances_pallet = metadata.pallet_by_name("Balances").expect("Balances pallet");
let pallet_index = balances_pallet.index();
let transfer_call = balances_pallet
.call_variant_by_name("transfer_keep_alive")
.expect("transfer_keep_alive");
let call_index = transfer_call.index;
let unfunded_account: [u8; 32] = [0x99; 32];
let recipient = BOB;
let amount: u128 = 1_000_000_000_000_000;
let mut call_data = vec![pallet_index, call_index];
call_data.push(0x00); call_data.extend(recipient);
call_data.extend(Compact(amount).encode());
let extrinsic = {
let mut inner = Vec::new();
inner.push(0x84); inner.push(0x00); inner.extend(unfunded_account);
inner.extend([0u8; 64]); inner.push(0x00); inner.extend(Compact(0u64).encode()); inner.extend(Compact(0u128).encode()); inner.push(0x00); inner.extend(&call_data);
let mut final_ext = Compact(inner.len() as u32).encode();
final_ext.extend(inner);
final_ext
};
let result = blockchain
.build_block(vec![extrinsic.clone()])
.await
.expect("Build should succeed even with failed extrinsics");
assert!(
result.failed.len() == 1,
"Failed extrinsic should be tracked. Included: {}, Failed: {}",
result.included.len(),
result.failed.len()
);
assert!(result.included.is_empty(), "Failed extrinsic should not be in included list");
assert_eq!(result.failed[0].extrinsic, extrinsic);
}
}
#[test]
fn transaction_validity_error_reason_returns_correct_strings() {
let stale = TransactionValidityError::Invalid(InvalidTransaction::Stale);
assert_eq!(stale.reason(), "Nonce too low (already used)");
let payment = TransactionValidityError::Invalid(InvalidTransaction::Payment);
assert_eq!(payment.reason(), "Insufficient funds for fees");
let unknown = TransactionValidityError::Unknown(UnknownTransaction::CannotLookup);
assert_eq!(unknown.reason(), "Cannot lookup validity");
}
#[test]
fn transaction_validity_error_is_unknown_distinguishes_types() {
let invalid = TransactionValidityError::Invalid(InvalidTransaction::Stale);
assert!(!invalid.is_unknown());
let unknown = TransactionValidityError::Unknown(UnknownTransaction::CannotLookup);
assert!(unknown.is_unknown());
}
#[test]
fn transaction_validity_types_can_be_decoded() {
use scale::Decode;
let valid_bytes = [
0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, ];
let validity = TransactionValidity::decode(&mut valid_bytes.as_slice())
.expect("Should decode valid transaction");
match validity {
TransactionValidity::Ok(valid) => {
assert_eq!(valid.priority, 1);
assert!(valid.requires.is_empty());
assert!(valid.provides.is_empty());
assert_eq!(valid.longevity, 64);
assert!(valid.propagate);
},
TransactionValidity::Err(_) => panic!("Expected Ok variant"),
}
let invalid_bytes = [
0x01, 0x00, 0x03, ];
let validity = TransactionValidity::decode(&mut invalid_bytes.as_slice())
.expect("Should decode invalid transaction");
match validity {
TransactionValidity::Ok(_) => panic!("Expected Err variant"),
TransactionValidity::Err(err) => {
assert_eq!(err.reason(), "Nonce too low (already used)");
},
}
}
}