1use crate::{
52 Block, BlockBuilder, BlockBuilderError, BlockError, BlockForkPoint, CacheError, ExecutorConfig,
53 ExecutorError, ForkRpcClient, InherentProvider, RuntimeExecutor, StorageCache,
54 builder::ApplyExtrinsicResult,
55 create_next_header_with_slot, default_providers,
56 strings::{
57 inherent::{parachain::storage_keys, timestamp::slot_duration},
58 txpool::{runtime_api, transaction_source},
59 },
60};
61use scale::Decode;
62use scale_info::{PortableRegistry, TypeDef, TypeDefPrimitive};
63use std::{
64 path::Path,
65 sync::{
66 Arc,
67 atomic::{AtomicU64, Ordering},
68 },
69};
70use subxt::config::substrate::H256;
71use tokio::sync::{OnceCell, RwLock, broadcast};
72use url::Url;
73
74const RECONNECT_LOG_DEBOUNCE_SECS: u64 = 30;
77const TYPE_RESOLVE_MAX_DEPTH: usize = 8;
78
79fn infer_account_id_len_from_type(
80 registry: &PortableRegistry,
81 ty_id: u32,
82 depth: usize,
83) -> Option<usize> {
84 if depth > TYPE_RESOLVE_MAX_DEPTH {
85 return None;
86 }
87
88 let ty = registry.resolve(ty_id)?;
89
90 if let Some(last_segment) = ty.path.segments.last().map(String::as_str) {
91 match last_segment {
92 "AccountId20" | "H160" => return Some(20),
93 "AccountId32" | "H256" => return Some(32),
94 _ => {},
95 }
96 }
97
98 match &ty.type_def {
99 TypeDef::Array(array) => {
100 if matches!(array.len, 20 | 32) {
101 let element_ty = registry.resolve(array.type_param.id)?;
102 if matches!(element_ty.type_def, TypeDef::Primitive(TypeDefPrimitive::U8)) {
103 return Some(array.len as usize);
104 }
105 }
106 None
107 },
108 TypeDef::Composite(composite) if composite.fields.len() == 1 =>
109 infer_account_id_len_from_type(registry, composite.fields[0].ty.id, depth + 1),
110 TypeDef::Variant(variant) => {
111 let mut inferred_len = None;
112 for var in &variant.variants {
113 for field in &var.fields {
114 if let Some(len) =
115 infer_account_id_len_from_type(registry, field.ty.id, depth + 1)
116 {
117 if inferred_len.is_some() && inferred_len != Some(len) {
118 return None;
119 }
120 inferred_len = Some(len);
121 }
122 }
123 }
124 inferred_len
125 },
126 _ => None,
127 }
128}
129
130fn sudo_account_id_len(metadata: &subxt::Metadata) -> Option<usize> {
131 let sudo_pallet = metadata.pallet_by_name("Sudo")?;
132 let sudo_storage = sudo_pallet.storage()?;
133 let sudo_key = sudo_storage.entry_by_name("Key")?;
134 infer_account_id_len_from_type(metadata.types(), sudo_key.entry_type().value_ty(), 0)
135}
136
137fn select_sudo_account<'a>(
138 accounts: &'a [(&'a str, Vec<u8>)],
139 expected_len: Option<usize>,
140) -> (&'a str, &'a [u8]) {
141 let selected = expected_len
142 .and_then(|len| accounts.iter().find(|(_, account)| account.len() == len))
143 .unwrap_or_else(|| {
144 accounts.first().expect("initialize_dev_accounts requires dev accounts")
145 });
146 (selected.0, selected.1.as_slice())
147}
148
149pub type BlockBody = Vec<Vec<u8>>;
150
151#[derive(Debug, Clone, Decode)]
157pub enum TransactionValidity {
158 #[codec(index = 0)]
160 Ok(ValidTransaction),
161 #[codec(index = 1)]
163 Err(TransactionValidityError),
164}
165
166#[derive(Debug, Clone, Decode)]
168pub struct ValidTransaction {
169 pub priority: u64,
171 pub requires: Vec<Vec<u8>>,
173 pub provides: Vec<Vec<u8>>,
175 pub longevity: u64,
177 pub propagate: bool,
179}
180
181#[derive(Debug, Clone, Decode)]
183pub enum TransactionValidityError {
184 #[codec(index = 0)]
186 Invalid(InvalidTransaction),
187 #[codec(index = 1)]
189 Unknown(UnknownTransaction),
190}
191
192#[derive(Debug, Clone, Decode)]
194pub enum InvalidTransaction {
195 #[codec(index = 0)]
197 Call,
198 #[codec(index = 1)]
200 Payment,
201 #[codec(index = 2)]
203 Future,
204 #[codec(index = 3)]
206 Stale,
207 #[codec(index = 4)]
209 BadMandatory,
210 #[codec(index = 5)]
212 MandatoryDispatch,
213 #[codec(index = 6)]
215 BadSigner,
216 #[codec(index = 7)]
218 Custom(u8),
219}
220
221#[derive(Debug, Clone, Decode)]
223pub enum UnknownTransaction {
224 #[codec(index = 0)]
226 CannotLookup,
227 #[codec(index = 1)]
229 NoUnsignedValidator,
230 #[codec(index = 2)]
232 Custom(u8),
233}
234
235impl TransactionValidityError {
236 pub fn reason(&self) -> String {
238 match self {
239 Self::Invalid(inv) => match inv {
240 InvalidTransaction::Call => "Call failed".into(),
241 InvalidTransaction::Payment => "Insufficient funds for fees".into(),
242 InvalidTransaction::Future => "Nonce too high".into(),
243 InvalidTransaction::Stale => "Nonce too low (already used)".into(),
244 InvalidTransaction::BadMandatory => "Bad mandatory inherent".into(),
245 InvalidTransaction::MandatoryDispatch => "Mandatory dispatch failed".into(),
246 InvalidTransaction::BadSigner => "Invalid signature".into(),
247 InvalidTransaction::Custom(code) => format!("Custom error: {code}"),
248 },
249 Self::Unknown(unk) => match unk {
250 UnknownTransaction::CannotLookup => "Cannot lookup validity".into(),
251 UnknownTransaction::NoUnsignedValidator => "No unsigned validator".into(),
252 UnknownTransaction::Custom(code) => format!("Custom unknown: {code}"),
253 },
254 }
255 }
256
257 pub fn is_unknown(&self) -> bool {
259 matches!(self, Self::Unknown(_))
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct BuildBlockResult {
266 pub block: Block,
268 pub included: Vec<Vec<u8>>,
270 pub failed: Vec<FailedExtrinsic>,
272}
273
274#[derive(Debug, Clone)]
276pub struct FailedExtrinsic {
277 pub extrinsic: Vec<u8>,
279 pub reason: String,
281}
282
283const EVENT_CHANNEL_CAPACITY: usize = 256;
285
286#[derive(Debug, Clone)]
291pub enum BlockchainEvent {
292 NewBlock {
294 hash: H256,
296 number: u32,
298 parent_hash: H256,
300 header: Vec<u8>,
302 modified_keys: Vec<Vec<u8>>,
304 },
305}
306
307#[derive(Debug, thiserror::Error)]
309pub enum BlockchainError {
310 #[error(transparent)]
312 Block(#[from] BlockError),
313
314 #[error(transparent)]
316 Builder(#[from] BlockBuilderError),
317
318 #[error(transparent)]
320 Cache(#[from] CacheError),
321
322 #[error(transparent)]
324 Executor(#[from] ExecutorError),
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
329pub enum ChainType {
330 RelayChain,
332 Parachain {
334 para_id: u32,
336 },
337}
338
339pub struct Blockchain {
383 head: RwLock<Block>,
385
386 inherent_providers: Vec<Arc<dyn InherentProvider>>,
388
389 chain_name: String,
391
392 chain_type: ChainType,
394
395 fork_point_hash: H256,
397
398 fork_point_number: u32,
400
401 executor_config: ExecutorConfig,
403
404 executor: RwLock<RuntimeExecutor>,
410
411 warm_prototype: tokio::sync::Mutex<Option<smoldot::executor::host::HostVmPrototype>>,
418
419 remote: crate::RemoteStorageLayer,
425
426 prefetch_done: OnceCell<()>,
433
434 cached_slot_duration: AtomicU64,
441
442 event_tx: broadcast::Sender<BlockchainEvent>,
447
448 genesis_hash_cache: OnceCell<String>,
453
454 chain_properties_cache: OnceCell<Option<serde_json::Value>>,
459
460 last_reconnect_log: AtomicU64,
467}
468
469impl Blockchain {
470 pub async fn fork(
501 endpoint: &Url,
502 cache_path: Option<&Path>,
503 ) -> Result<Arc<Self>, BlockchainError> {
504 Self::fork_with_config(endpoint, cache_path, None, ExecutorConfig::default()).await
505 }
506
507 pub async fn fork_at(
529 endpoint: &Url,
530 cache_path: Option<&Path>,
531 fork_point: Option<BlockForkPoint>,
532 ) -> Result<Arc<Self>, BlockchainError> {
533 Self::fork_with_config(endpoint, cache_path, fork_point, ExecutorConfig::default()).await
534 }
535
536 pub async fn fork_with_config(
562 endpoint: &Url,
563 cache_path: Option<&Path>,
564 fork_point: Option<BlockForkPoint>,
565 executor_config: ExecutorConfig,
566 ) -> Result<Arc<Self>, BlockchainError> {
567 let cache = StorageCache::open(cache_path).await?;
569
570 let fork_point = match fork_point {
572 Some(fp) => fp,
573 None => {
574 let rpc =
576 crate::ForkRpcClient::connect(endpoint).await.map_err(BlockError::from)?;
577 let finalized = rpc.finalized_head().await.map_err(BlockError::from)?;
578 BlockForkPoint::Hash(finalized)
579 },
580 };
581
582 let fork_block = Block::fork_point(endpoint, cache, fork_point).await?;
584 let fork_point_hash = fork_block.hash;
585 let fork_point_number = fork_block.number;
586
587 let chain_type = Self::detect_chain_type(&fork_block).await?;
589
590 let chain_name = Self::get_chain_name(&fork_block).await?;
592
593 let is_parachain = matches!(chain_type, ChainType::Parachain { .. });
595 let inherent_providers = default_providers(is_parachain)
596 .into_iter()
597 .map(|p| Arc::from(p) as Arc<dyn InherentProvider>)
598 .collect();
599
600 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
602
603 let remote = fork_block.storage().remote().clone();
605
606 let runtime_code = fork_block.runtime_code().await?;
609 let executor = RuntimeExecutor::with_config(runtime_code, None, executor_config.clone())?;
610
611 log::debug!("Forked at block #{fork_point_number} (0x{})", hex::encode(fork_point_hash));
612
613 let blockchain = Arc::new(Self {
614 head: RwLock::new(fork_block),
615 inherent_providers,
616 chain_name,
617 chain_type,
618 fork_point_hash,
619 fork_point_number,
620 executor_config,
621 executor: RwLock::new(executor),
622 warm_prototype: tokio::sync::Mutex::new(None),
623 prefetch_done: OnceCell::new(),
624 cached_slot_duration: AtomicU64::new(0),
625 remote,
626 event_tx,
627 genesis_hash_cache: OnceCell::new(),
628 chain_properties_cache: OnceCell::new(),
629 last_reconnect_log: AtomicU64::new(0),
630 });
631
632 #[cfg(all(not(test), not(feature = "integration-tests")))]
638 {
639 let bc = Arc::clone(&blockchain);
640 tokio::spawn(async move { bc.warmup().await });
641 }
642
643 Ok(blockchain)
644 }
645
646 pub async fn warmup(self: &Arc<Self>) {
665 let warmup_start = std::time::Instant::now();
666 log::debug!("[Blockchain] Background warmup starting...");
667
668 let executor = self.executor.read().await.clone();
674 match executor.create_prototype() {
675 Ok(proto) => {
676 log::debug!(
677 "[Blockchain] Warmup: WASM prototype compiled ({:?})",
678 warmup_start.elapsed()
679 );
680
681 let head = self.head.read().await;
683 let (result, returned_proto) = executor
684 .call_with_prototype(
685 Some(proto),
686 crate::strings::inherent::timestamp::slot_duration::AURA_API_METHOD,
687 &[],
688 head.storage(),
689 )
690 .await;
691
692 let aura_duration =
693 result.ok().and_then(|r| u64::decode(&mut r.output.as_slice()).ok());
694
695 let duration = if let Some(d) = aura_duration {
699 d
700 } else {
701 let metadata = head.metadata().await.ok();
702 let babe_duration = metadata.as_ref().and_then(|m| {
703 use crate::strings::inherent::timestamp::slot_duration;
704 m.pallet_by_name(slot_duration::BABE_PALLET)?
705 .constant_by_name(slot_duration::BABE_EXPECTED_BLOCK_TIME)
706 .and_then(|c| u64::decode(&mut &c.value()[..]).ok())
707 });
708 babe_duration.unwrap_or(match self.chain_type {
709 ChainType::RelayChain => slot_duration::RELAY_CHAIN_FALLBACK_MS,
710 ChainType::Parachain { .. } => slot_duration::PARACHAIN_FALLBACK_MS,
711 })
712 };
713 drop(head);
714
715 self.cached_slot_duration.store(duration, Ordering::Release);
716 *self.warm_prototype.lock().await = returned_proto;
718 log::debug!(
719 "[Blockchain] Warmup: slot_duration={duration}ms ({:?})",
720 warmup_start.elapsed()
721 );
722 },
723 Err(e) => log::warn!("[Blockchain] Warmup: prototype compilation failed: {e}"),
724 }
725
726 self.ensure_prefetched().await;
728 log::debug!("[Blockchain] Warmup: prefetch done ({:?})", warmup_start.elapsed());
729
730 let head = self.head.read().await.clone();
732 for provider in &self.inherent_providers {
733 provider.warmup(&head, &executor).await;
734 }
735
736 log::debug!("[Blockchain] Background warmup complete ({:?})", warmup_start.elapsed());
737 }
738
739 async fn ensure_prefetched(&self) {
746 self.prefetch_done
747 .get_or_init(|| async {
748 if let Err(e) = self.do_prefetch().await {
749 log::warn!("[Blockchain] Storage prefetch failed (non-fatal): {e}");
750 }
751 })
752 .await;
753 }
754
755 async fn do_prefetch(&self) -> Result<(), BlockchainError> {
760 let head = self.head.read().await;
761 let metadata = head.metadata().await?;
762 let block_hash = head.storage().fork_block_hash();
763
764 let mut value_keys: Vec<Vec<u8>> = Vec::new();
766 let mut pallet_prefixes: Vec<Vec<u8>> = Vec::new();
767
768 for pallet in metadata.pallets() {
769 let pallet_hash = sp_core::twox_128(pallet.name().as_bytes());
770 if let Some(storage) = pallet.storage() {
771 for entry in storage.entries() {
772 if matches!(
773 entry.entry_type(),
774 subxt::metadata::types::StorageEntryType::Plain(_)
775 ) {
776 let entry_hash = sp_core::twox_128(entry.name().as_bytes());
777 value_keys.push([pallet_hash.as_slice(), entry_hash.as_slice()].concat());
778 }
779 }
780 pallet_prefixes.push(pallet_hash.to_vec());
781 }
782 }
783
784 if !value_keys.is_empty() {
786 let key_refs: Vec<&[u8]> = value_keys.iter().map(|k| k.as_slice()).collect();
787 if let Err(e) = self.remote.get_batch(block_hash, &key_refs).await {
788 log::debug!(
789 "[Blockchain] Warmup: StorageValue batch fetch failed (non-fatal): {e}"
790 );
791 }
792 }
793
794 let page_size = crate::strings::builder::PREFETCH_PAGE_SIZE;
796 let scan_futures: Vec<_> = pallet_prefixes
797 .iter()
798 .map(|prefix| self.remote.prefetch_prefix_single_page(block_hash, prefix, page_size))
799 .collect();
800 let scan_results = futures::future::join_all(scan_futures).await;
801 let mut scan_keys = 0usize;
802 for count in scan_results.into_iter().flatten() {
803 scan_keys += count;
804 }
805
806 log::debug!(
807 "[Blockchain] Prefetched {} StorageValue + {} map keys ({} pallets)",
808 value_keys.len(),
809 scan_keys,
810 pallet_prefixes.len(),
811 );
812
813 Ok(())
814 }
815
816 pub fn chain_name(&self) -> &str {
818 &self.chain_name
819 }
820
821 pub fn chain_type(&self) -> &ChainType {
823 &self.chain_type
824 }
825
826 pub fn fork_point(&self) -> H256 {
828 self.fork_point_hash
829 }
830
831 pub fn fork_point_number(&self) -> u32 {
833 self.fork_point_number
834 }
835
836 pub fn endpoint(&self) -> &Url {
838 self.remote.endpoint()
839 }
840
841 pub async fn genesis_hash(&self) -> Result<String, BlockchainError> {
851 self.genesis_hash_cache
852 .get_or_try_init(|| async {
853 match self.block_hash_at(0).await? {
854 Some(hash) => Ok(format!("0x{}", hex::encode(hash.as_bytes()))),
855 None => Err(BlockchainError::Block(BlockError::RuntimeCodeNotFound)),
856 }
857 })
858 .await
859 .cloned()
860 }
861
862 pub async fn chain_properties(&self) -> Option<serde_json::Value> {
872 self.chain_properties_cache
873 .get_or_init(|| async {
874 match ForkRpcClient::connect(self.endpoint()).await {
875 Ok(client) => match client.system_properties().await {
876 Ok(system_props) => serde_json::to_value(system_props).ok(),
877 Err(_) => None,
878 },
879 Err(_) => None,
880 }
881 })
882 .await
883 .clone()
884 }
885
886 pub fn subscribe_events(&self) -> broadcast::Receiver<BlockchainEvent> {
906 self.event_tx.subscribe()
907 }
908
909 pub async fn head(&self) -> Block {
911 self.head.read().await.clone()
912 }
913
914 pub async fn head_number(&self) -> u32 {
916 self.head.read().await.number
917 }
918
919 pub async fn head_hash(&self) -> H256 {
921 self.head.read().await.hash
922 }
923
924 pub async fn block_body(&self, hash: H256) -> Result<Option<BlockBody>, BlockchainError> {
939 let head = self.head.read().await;
942
943 let mut current: Option<&Block> = Some(&head);
945 while let Some(block) = current {
946 if block.hash == hash {
947 return Ok(Some(block.extrinsics.clone()));
948 }
949 current = block.parent.as_deref();
950 }
951 drop(head);
952
953 match self.remote.block_body(hash).await {
955 Ok(body) => Ok(body),
956 Err(first_err) =>
957 if self.reconnect_upstream().await {
958 Ok(self.remote.block_body(hash).await.map_err(BlockError::from)?)
959 } else {
960 Err(BlockchainError::Block(BlockError::from(first_err)))
961 },
962 }
963 }
964
965 pub async fn block_header(&self, hash: H256) -> Result<Option<Vec<u8>>, BlockchainError> {
980 let head = self.head.read().await;
981
982 let mut current: Option<&Block> = Some(&head);
984 while let Some(block) = current {
985 if block.hash == hash {
986 return Ok(Some(block.header.clone()));
987 }
988 current = block.parent.as_deref();
989 }
990 drop(head);
991
992 match self.remote.block_header(hash).await {
994 Ok(header) => Ok(header),
995 Err(first_err) =>
996 if self.reconnect_upstream().await {
997 Ok(self.remote.block_header(hash).await.map_err(BlockError::from)?)
998 } else {
999 Err(BlockchainError::Block(BlockError::from(first_err)))
1000 },
1001 }
1002 }
1003
1004 pub async fn block_hash_at(&self, block_number: u32) -> Result<Option<H256>, BlockchainError> {
1019 let head = self.head.read().await;
1021
1022 if head.number < block_number {
1023 return Ok(None);
1024 }
1025
1026 let mut current: Option<&Block> = Some(&head);
1028 while let Some(block) = current {
1029 if block.number == block_number {
1030 return Ok(Some(block.hash));
1031 }
1032
1033 if block.parent.is_none() {
1034 break;
1035 }
1036
1037 current = block.parent.as_deref();
1038 }
1039 drop(head);
1040
1041 match self.remote.block_hash_by_number(block_number).await {
1043 Ok(hash) => Ok(hash),
1044 Err(first_err) =>
1045 if self.reconnect_upstream().await {
1046 Ok(self
1047 .remote
1048 .block_hash_by_number(block_number)
1049 .await
1050 .map_err(BlockError::from)?)
1051 } else {
1052 Err(BlockchainError::Block(BlockError::from(first_err)))
1053 },
1054 }
1055 }
1056
1057 pub async fn block_number_by_hash(&self, hash: H256) -> Result<Option<u32>, BlockchainError> {
1071 let head = self.head.read().await;
1073 let mut current: Option<&Block> = Some(&head);
1074 while let Some(block) = current {
1075 if block.hash == hash {
1076 return Ok(Some(block.number));
1077 }
1078 current = block.parent.as_deref();
1079 }
1080 drop(head);
1081
1082 match self.remote.block_number_by_hash(hash).await {
1084 Ok(number) => Ok(number),
1085 Err(first_err) =>
1086 if self.reconnect_upstream().await {
1087 Ok(self.remote.block_number_by_hash(hash).await.map_err(BlockError::from)?)
1088 } else {
1089 Err(BlockchainError::Block(BlockError::from(first_err)))
1090 },
1091 }
1092 }
1093
1094 pub async fn block_parent_hash(&self, hash: H256) -> Result<Option<H256>, BlockchainError> {
1108 let head = self.head.read().await;
1110 let mut current: Option<&Block> = Some(&head);
1111 while let Some(block) = current {
1112 if block.hash == hash {
1113 return Ok(Some(block.parent_hash));
1114 }
1115 current = block.parent.as_deref();
1116 }
1117 drop(head);
1118
1119 match self.remote.parent_hash(hash).await {
1121 Ok(parent) => Ok(parent),
1122 Err(first_err) =>
1123 if self.reconnect_upstream().await {
1124 Ok(self.remote.parent_hash(hash).await.map_err(BlockError::from)?)
1125 } else {
1126 Err(BlockchainError::Block(BlockError::from(first_err)))
1127 },
1128 }
1129 }
1130
1131 pub async fn build_block(
1157 &self,
1158 extrinsics: BlockBody,
1159 ) -> Result<BuildBlockResult, BlockchainError> {
1160 let (parent_block, parent_hash) = {
1162 let head = self.head.read().await;
1163 let parent_hash = head.hash;
1164 (head.clone(), parent_hash)
1165 }; let executor = self.executor.read().await.clone();
1170
1171 let warm_prototype = self.warm_prototype.lock().await.take();
1173
1174 let header = create_next_header_with_slot(
1177 &parent_block,
1178 &executor,
1179 vec![],
1180 match self.cached_slot_duration.load(Ordering::Acquire) {
1181 0 => None,
1182 d => Some(d),
1183 },
1184 )
1185 .await?;
1186
1187 let providers: Vec<Box<dyn InherentProvider>> = self
1189 .inherent_providers
1190 .iter()
1191 .map(|p| Box::new(ArcProvider(Arc::clone(p))) as Box<dyn InherentProvider>)
1192 .collect();
1193
1194 self.ensure_prefetched().await;
1200
1201 let mut builder = BlockBuilder::new(
1203 parent_block,
1204 executor,
1205 header,
1206 providers,
1207 warm_prototype,
1208 true, );
1210
1211 builder.initialize().await?;
1213
1214 builder.apply_inherents().await?;
1216
1217 let mut included = Vec::new();
1219 let mut failed = Vec::new();
1220
1221 for extrinsic in extrinsics {
1223 match builder.apply_extrinsic(extrinsic.clone()).await? {
1224 ApplyExtrinsicResult::Success { .. } => {
1225 included.push(extrinsic);
1226 },
1227 ApplyExtrinsicResult::DispatchFailed { error } => {
1228 failed.push(FailedExtrinsic { extrinsic, reason: error });
1229 },
1230 }
1231 }
1232
1233 let runtime_upgraded = builder.runtime_upgraded();
1236
1237 let (new_block, returned_prototype) = builder.finalize().await?;
1239
1240 let new_executor = if runtime_upgraded {
1245 log::debug!("[Blockchain] Runtime upgrade detected, recreating executor");
1246 match new_block.runtime_code().await {
1247 Ok(code) => {
1248 match RuntimeExecutor::with_config(code, None, self.executor_config.clone()) {
1249 Ok(executor) => Some(executor),
1250 Err(e) => {
1251 log::error!(
1252 "[Blockchain] Failed to recreate executor after runtime upgrade: {e}. \
1253 Subsequent runtime calls may fail until the next successful upgrade."
1254 );
1255 None
1256 },
1257 }
1258 },
1259 Err(e) => {
1260 log::error!(
1261 "[Blockchain] Failed to get runtime code after upgrade: {e}. \
1262 Subsequent runtime calls may fail until the next successful upgrade."
1263 );
1264 None
1265 },
1266 }
1267 } else {
1268 None
1269 };
1270
1271 {
1275 let mut head = self.head.write().await;
1276 if head.hash != parent_hash {
1278 return Err(BlockchainError::Block(BlockError::ConcurrentBlockBuild));
1279 }
1280 *head = new_block.clone();
1281
1282 if let Some(executor) = new_executor {
1285 *self.executor.write().await = executor;
1286 }
1287 if runtime_upgraded {
1288 self.cached_slot_duration.store(0, Ordering::Release);
1290 }
1291 }
1292
1293 if runtime_upgraded {
1295 *self.warm_prototype.lock().await = None;
1296 for provider in &self.inherent_providers {
1299 provider.invalidate_cache();
1300 }
1301 } else {
1302 *self.warm_prototype.lock().await = returned_prototype;
1303 }
1304
1305 let modified_keys: Vec<Vec<u8>> = new_block
1307 .storage()
1308 .diff()
1309 .map(|diff| diff.into_iter().map(|(k, _)| k).collect())
1310 .unwrap_or_default();
1311
1312 let subscribers = self.event_tx.receiver_count();
1314 log::debug!(
1315 "[Blockchain] Emitting NewBlock #{} event ({} modified keys, {} subscribers, {} header bytes)",
1316 new_block.number,
1317 modified_keys.len(),
1318 subscribers,
1319 new_block.header.len(),
1320 );
1321 let _ = self.event_tx.send(BlockchainEvent::NewBlock {
1322 hash: new_block.hash,
1323 number: new_block.number,
1324 parent_hash: new_block.parent_hash,
1325 header: new_block.header.clone(),
1326 modified_keys,
1327 });
1328
1329 Ok(BuildBlockResult { block: new_block, included, failed })
1330 }
1331
1332 pub async fn build_empty_block(&self) -> Result<Block, BlockchainError> {
1341 self.build_block(vec![]).await.map(|result| result.block)
1342 }
1343
1344 pub async fn call(&self, method: &str, args: &[u8]) -> Result<Vec<u8>, BlockchainError> {
1355 let head_hash = self.head_hash().await;
1356 self.call_at_block(head_hash, method, args)
1357 .await
1358 .map(|result| result.expect("head_hash always exists; qed;"))
1359 }
1360
1361 pub async fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, BlockchainError> {
1371 let block_number = self.head.read().await.number;
1372 self.get_storage_value(block_number, key).await
1373 }
1374
1375 pub async fn storage_at(
1386 &self,
1387 block_number: u32,
1388 key: &[u8],
1389 ) -> Result<Option<Vec<u8>>, BlockchainError> {
1390 self.get_storage_value(block_number, key).await
1391 }
1392
1393 pub async fn storage_keys_paged(
1402 &self,
1403 prefix: &[u8],
1404 count: u32,
1405 start_key: Option<&[u8]>,
1406 at: Option<H256>,
1407 ) -> Result<Vec<Vec<u8>>, BlockchainError> {
1408 let block_hash = match at {
1409 Some(h) => h,
1410 None => self.head_hash().await,
1411 };
1412 log::debug!(
1413 "storage_keys_paged: prefix=0x{} count={} start_key={} at={:?}",
1414 hex::encode(prefix),
1415 count,
1416 start_key
1417 .map(|k| format!("0x{}", hex::encode(k)))
1418 .unwrap_or_else(|| "None".into()),
1419 block_hash,
1420 );
1421
1422 let block_number = self.block_number_by_hash(block_hash).await?;
1423
1424 if let Some(n) = block_number.filter(|&n| n > self.fork_point_number) {
1425 let all_keys = {
1427 let head = self.head.read().await;
1428 head.storage()
1429 .keys_by_prefix(prefix, n)
1430 .await
1431 .map_err(|e| BlockchainError::Block(BlockError::Storage(e)))?
1432 };
1433 let keys: Vec<Vec<u8>> = all_keys
1435 .into_iter()
1436 .filter(|k| start_key.is_none_or(|sk| k.as_slice() > sk))
1437 .take(count as usize)
1438 .collect();
1439 log::debug!("storage_keys_paged: returned {} keys (fork-local)", keys.len());
1440 Ok(keys)
1441 } else {
1442 let head = self.head.read().await;
1443 let rpc = head.storage().remote().rpc();
1444 match rpc.storage_keys_paged(prefix, count, start_key, block_hash).await {
1445 Ok(keys) => {
1446 log::debug!("storage_keys_paged: returned {} keys", keys.len());
1447 Ok(keys)
1448 },
1449 Err(first_err) => {
1450 drop(head);
1451 if self.reconnect_upstream().await {
1452 let head = self.head.read().await;
1453 let rpc = head.storage().remote().rpc();
1454 let keys = rpc
1455 .storage_keys_paged(prefix, count, start_key, block_hash)
1456 .await
1457 .map_err(|e| BlockchainError::Block(BlockError::Rpc(e)))?;
1458 log::debug!(
1459 "storage_keys_paged: returned {} keys (after reconnect)",
1460 keys.len()
1461 );
1462 Ok(keys)
1463 } else {
1464 Err(BlockchainError::Block(BlockError::Rpc(first_err)))
1465 }
1466 },
1467 }
1468 }
1469 }
1470
1471 pub async fn storage_keys_by_prefix(
1480 &self,
1481 prefix: &[u8],
1482 at: H256,
1483 ) -> Result<Vec<Vec<u8>>, BlockchainError> {
1484 log::debug!(
1485 "storage_keys_by_prefix: prefix=0x{} ({} bytes) at={:?}",
1486 hex::encode(prefix),
1487 prefix.len(),
1488 at,
1489 );
1490
1491 let block_number = self.block_number_by_hash(at).await?;
1492
1493 let keys = if let Some(n) = block_number.filter(|&n| n > self.fork_point_number) {
1494 let head = self.head.read().await;
1496 head.storage()
1497 .keys_by_prefix(prefix, n)
1498 .await
1499 .map_err(|e| BlockchainError::Block(BlockError::Storage(e)))?
1500 } else {
1501 let head = self.head.read().await;
1502 head.storage()
1503 .remote()
1504 .get_keys(at, prefix)
1505 .await
1506 .map_err(|e| BlockchainError::Block(BlockError::RemoteStorage(e)))?
1507 };
1508
1509 log::debug!(
1510 "storage_keys_by_prefix: returned {} keys for prefix=0x{}",
1511 keys.len(),
1512 hex::encode(prefix)
1513 );
1514 Ok(keys)
1515 }
1516
1517 async fn get_storage_value(
1523 &self,
1524 block_number: u32,
1525 key: &[u8],
1526 ) -> Result<Option<Vec<u8>>, BlockchainError> {
1527 let head = self.head.read().await;
1528 match head.storage().get(block_number, key).await {
1529 Ok(value) => Ok(value.and_then(|v| v.value.clone())),
1530 Err(first_err) => {
1531 if self.reconnect_upstream().await {
1533 let value =
1534 head.storage().get(block_number, key).await.map_err(BlockError::from)?;
1535 Ok(value.and_then(|v| v.value.clone()))
1536 } else {
1537 Err(BlockchainError::Block(BlockError::from(first_err)))
1538 }
1539 },
1540 }
1541 }
1542
1543 async fn detect_chain_type(block: &Block) -> Result<ChainType, BlockchainError> {
1545 let metadata = block.metadata().await?;
1546
1547 if metadata.pallet_by_name("ParachainSystem").is_some() {
1549 let para_id = Self::get_para_id(block).await.unwrap_or(0);
1551 Ok(ChainType::Parachain { para_id })
1552 } else {
1553 Ok(ChainType::RelayChain)
1554 }
1555 }
1556
1557 async fn get_para_id(block: &Block) -> Option<u32> {
1561 use scale::Decode;
1562
1563 let pallet_hash = sp_core::twox_128(storage_keys::PARACHAIN_INFO_PALLET);
1565 let storage_hash = sp_core::twox_128(storage_keys::PARACHAIN_ID);
1566 let key: Vec<u8> = [pallet_hash.as_slice(), storage_hash.as_slice()].concat();
1567
1568 let value = block.storage().get(block.number, &key).await.ok().flatten()?;
1570
1571 value.value.as_ref().map(|value| u32::decode(&mut value.as_slice()).ok())?
1572 }
1573
1574 async fn get_chain_name(block: &Block) -> Result<String, BlockchainError> {
1576 let runtime_code = block.runtime_code().await?;
1578 let executor = RuntimeExecutor::new(runtime_code, None)?;
1579
1580 let version = executor.runtime_version()?;
1582 Ok(version.spec_name)
1583 }
1584
1585 pub async fn call_at_block(
1599 &self,
1600 hash: H256,
1601 method: &str,
1602 args: &[u8],
1603 ) -> Result<Option<Vec<u8>>, BlockchainError> {
1604 let head_block = {
1606 let head = self.head.read().await;
1607 (hash == head.hash).then(|| head.clone())
1608 };
1609 if let Some(head_block) = head_block {
1610 let pre_call_hash = head_block.hash;
1611 let executor = self.executor.read().await.clone();
1612 let warm_prototype = self.warm_prototype.lock().await.take();
1613 let (result, returned_prototype) = executor
1614 .call_with_prototype(warm_prototype, method, args, head_block.storage())
1615 .await;
1616 if self.head.read().await.hash == pre_call_hash {
1620 *self.warm_prototype.lock().await = returned_prototype;
1621 }
1622 return Ok(Some(result?.output));
1623 }
1624
1625 let block = self.find_or_create_block_for_call(hash).await?;
1627
1628 let Some(block) = block else {
1629 return Ok(None); };
1631
1632 let runtime_code = block.runtime_code().await?;
1633 let executor =
1634 RuntimeExecutor::with_config(runtime_code, None, self.executor_config.clone())?;
1635 let result = executor.call(method, args, block.storage()).await?;
1636 Ok(Some(result.output))
1637 }
1638
1639 pub async fn storage_batch(
1647 &self,
1648 at: H256,
1649 keys: &[&[u8]],
1650 ) -> Result<Vec<Option<Vec<u8>>>, BlockchainError> {
1651 match self.remote.get_batch(at, keys).await {
1652 Ok(result) => Ok(result),
1653 Err(first_err) => {
1654 if self.reconnect_upstream().await {
1656 self.remote
1657 .get_batch(at, keys)
1658 .await
1659 .map_err(|e| BlockchainError::Block(BlockError::RemoteStorage(e)))
1660 } else {
1661 Err(BlockchainError::Block(BlockError::RemoteStorage(first_err)))
1662 }
1663 },
1664 }
1665 }
1666
1667 pub async fn proxy_state_call(
1675 &self,
1676 method: &str,
1677 args: &[u8],
1678 at: H256,
1679 ) -> Result<Vec<u8>, BlockchainError> {
1680 let rpc = self.remote.rpc();
1681 match rpc.state_call(method, args, Some(at)).await {
1682 Ok(result) => Ok(result),
1683 Err(first_err) => {
1684 if self.reconnect_upstream().await {
1686 rpc.state_call(method, args, Some(at))
1687 .await
1688 .map_err(|e| BlockchainError::Block(BlockError::from(e)))
1689 } else {
1690 Err(BlockchainError::Block(BlockError::from(first_err)))
1691 }
1692 },
1693 }
1694 }
1695
1696 async fn reconnect_upstream(&self) -> bool {
1704 let now_ms = std::time::SystemTime::now()
1705 .duration_since(std::time::UNIX_EPOCH)
1706 .map(|d| d.as_millis() as u64)
1707 .unwrap_or(0);
1708 let last = self.last_reconnect_log.load(Ordering::Relaxed);
1709 let elapsed_secs = now_ms.saturating_sub(last) / 1000;
1710
1711 if elapsed_secs >= RECONNECT_LOG_DEBOUNCE_SECS {
1712 self.last_reconnect_log.store(now_ms, Ordering::Relaxed);
1713 log::debug!(
1714 "Upstream connection lost, reconnecting to {}",
1715 self.remote.rpc().endpoint()
1716 );
1717 } else {
1718 log::trace!(
1719 "Upstream connection lost, reconnecting to {}",
1720 self.remote.rpc().endpoint()
1721 );
1722 }
1723
1724 self.remote.rpc().reconnect().await.is_ok()
1725 }
1726
1727 pub async fn validate_extrinsic(
1755 &self,
1756 extrinsic: &[u8],
1757 ) -> Result<ValidTransaction, TransactionValidityError> {
1758 let head = self.head.read().await.clone();
1761
1762 let mut args = Vec::with_capacity(1 + extrinsic.len() + 32);
1767 args.push(transaction_source::EXTERNAL);
1768 args.extend(extrinsic);
1769 args.extend(head.hash.as_bytes());
1770
1771 let pre_call_hash = head.hash;
1773 let executor = self.executor.read().await.clone();
1774 let warm_prototype = self.warm_prototype.lock().await.take();
1775
1776 let (result, returned_prototype) = executor
1778 .call_with_prototype(
1779 warm_prototype,
1780 runtime_api::TAGGED_TRANSACTION_QUEUE_VALIDATE,
1781 &args,
1782 head.storage(),
1783 )
1784 .await;
1785 if self.head.read().await.hash == pre_call_hash {
1787 *self.warm_prototype.lock().await = returned_prototype;
1788 }
1789
1790 let result = result
1791 .map_err(|_| TransactionValidityError::Unknown(UnknownTransaction::CannotLookup))?;
1792
1793 let validity = TransactionValidity::decode(&mut result.output.as_slice())
1795 .map_err(|_| TransactionValidityError::Unknown(UnknownTransaction::CannotLookup))?;
1796
1797 match validity {
1798 TransactionValidity::Ok(valid) => Ok(valid),
1799 TransactionValidity::Err(err) => Err(err),
1800 }
1801 }
1802
1803 async fn find_or_create_block_for_call(
1809 &self,
1810 hash: H256,
1811 ) -> Result<Option<Block>, BlockchainError> {
1812 let head = self.head.read().await;
1813
1814 let mut current: Option<&Block> = Some(&head);
1816 while let Some(block) = current {
1817 if block.hash == hash {
1818 return Ok(Some(block.clone()));
1819 }
1820 if block.parent.is_none() {
1822 break;
1823 }
1824 current = block.parent.as_deref();
1825 }
1826
1827 let block_number =
1829 match self.remote.block_number_by_hash(hash).await.map_err(BlockError::from)? {
1830 Some(number) => number,
1831 None => return Ok(None), };
1833
1834 Ok(Some(Block::mocked_for_call(hash, block_number, head.storage().clone())))
1837 }
1838
1839 #[cfg(any(test, feature = "integration-tests"))]
1853 pub async fn set_storage_for_testing(&self, key: &[u8], value: Option<&[u8]>) {
1854 let mut head = self.head.write().await;
1855 head.storage_mut().set(key, value).unwrap();
1856 }
1857
1858 pub async fn initialize_dev_accounts(&self) -> Result<(), BlockchainError> {
1872 use crate::dev::{
1873 DEV_BALANCE, ETHEREUM_DEV_ACCOUNTS, SUBSTRATE_DEV_ACCOUNTS, account_storage_key,
1874 build_account_info, patch_free_balance, sudo_key_storage_key,
1875 };
1876
1877 let is_ethereum = self
1879 .chain_properties()
1880 .await
1881 .and_then(|props| props.get("isEthereum")?.as_bool())
1882 .unwrap_or(false);
1883
1884 let mut head = self.head.write().await;
1885
1886 let mut accounts: Vec<(&str, Vec<u8>)> =
1890 SUBSTRATE_DEV_ACCOUNTS.iter().map(|(n, a)| (*n, a.to_vec())).collect();
1891 if is_ethereum {
1892 accounts.extend(ETHEREUM_DEV_ACCOUNTS.iter().map(|(n, a)| (*n, a.to_vec())));
1893 }
1894
1895 let keys: Vec<Vec<u8>> = accounts.iter().map(|(_, a)| account_storage_key(a)).collect();
1897 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
1898
1899 let existing_values = head
1901 .storage()
1902 .remote()
1903 .get_batch(self.fork_point_hash, &key_refs)
1904 .await
1905 .map_err(BlockError::from)?;
1906
1907 let entries: Vec<(&[u8], Option<Vec<u8>>)> = keys
1909 .iter()
1910 .zip(existing_values.iter())
1911 .map(|(key, existing)| {
1912 let value = match existing {
1913 Some(data) => patch_free_balance(data, DEV_BALANCE),
1914 None => build_account_info(DEV_BALANCE),
1915 };
1916 (key.as_slice(), Some(value))
1917 })
1918 .collect();
1919
1920 let batch: Vec<(&[u8], Option<&[u8]>)> =
1921 entries.iter().map(|(k, v)| (*k, v.as_deref())).collect();
1922 head.storage_mut().set_batch_initial(&batch).map_err(BlockError::from)?;
1923
1924 for (name, addr) in &accounts {
1925 log::debug!("Funded dev account: {name} (0x{})", hex::encode(addr));
1926 }
1927
1928 let metadata = head.metadata().await?;
1930 if metadata.pallet_by_name("Sudo").is_some() {
1931 let key = sudo_key_storage_key();
1932 let expected_len = sudo_account_id_len(metadata.as_ref());
1933 let (sudo_name, sudo_account) = select_sudo_account(&accounts, expected_len);
1934 head.storage_mut()
1935 .set_initial(&key, Some(sudo_account))
1936 .map_err(BlockError::from)?;
1937 log::debug!("Set {} as sudo key (0x{})", sudo_name, hex::encode(sudo_account));
1938 }
1939
1940 Ok(())
1941 }
1942
1943 pub async fn clear_local_storage(&self) -> Result<(), CacheError> {
1955 let head = self.head.read().await;
1956 head.storage().cache().clear_local_storage().await
1957 }
1958}
1959
1960struct ArcProvider(Arc<dyn InherentProvider>);
1965
1966#[async_trait::async_trait]
1967impl InherentProvider for ArcProvider {
1968 fn identifier(&self) -> &'static str {
1969 self.0.identifier()
1970 }
1971
1972 async fn provide(
1973 &self,
1974 parent: &Block,
1975 executor: &RuntimeExecutor,
1976 ) -> Result<Vec<Vec<u8>>, BlockBuilderError> {
1977 self.0.provide(parent, executor).await
1978 }
1979}
1980
1981#[cfg(test)]
1982mod tests {
1983 use super::*;
1984
1985 #[test]
1986 fn blockchain_error_display() {
1987 let err = BlockchainError::Block(BlockError::RuntimeCodeNotFound);
1988 assert!(err.to_string().contains("Runtime code not found"));
1989 }
1990
1991 #[test]
1992 fn transaction_validity_error_reason_returns_correct_strings() {
1993 let stale = TransactionValidityError::Invalid(InvalidTransaction::Stale);
1994 assert_eq!(stale.reason(), "Nonce too low (already used)");
1995
1996 let payment = TransactionValidityError::Invalid(InvalidTransaction::Payment);
1997 assert_eq!(payment.reason(), "Insufficient funds for fees");
1998
1999 let unknown = TransactionValidityError::Unknown(UnknownTransaction::CannotLookup);
2000 assert_eq!(unknown.reason(), "Cannot lookup validity");
2001 }
2002
2003 #[test]
2004 fn transaction_validity_error_is_unknown_distinguishes_types() {
2005 let invalid = TransactionValidityError::Invalid(InvalidTransaction::Stale);
2006 assert!(!invalid.is_unknown());
2007
2008 let unknown = TransactionValidityError::Unknown(UnknownTransaction::CannotLookup);
2009 assert!(unknown.is_unknown());
2010 }
2011
2012 #[test]
2013 fn transaction_validity_types_can_be_decoded() {
2014 use scale::Decode;
2015
2016 let valid_bytes = [
2018 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, ];
2025 let validity = TransactionValidity::decode(&mut valid_bytes.as_slice())
2026 .expect("Should decode valid transaction");
2027 match validity {
2028 TransactionValidity::Ok(valid) => {
2029 assert_eq!(valid.priority, 1);
2030 assert!(valid.requires.is_empty());
2031 assert!(valid.provides.is_empty());
2032 assert_eq!(valid.longevity, 64);
2033 assert!(valid.propagate);
2034 },
2035 TransactionValidity::Err(_) => panic!("Expected Ok variant"),
2036 }
2037
2038 let invalid_bytes = [
2040 0x01, 0x00, 0x03, ];
2044 let validity = TransactionValidity::decode(&mut invalid_bytes.as_slice())
2045 .expect("Should decode invalid transaction");
2046 match validity {
2047 TransactionValidity::Ok(_) => panic!("Expected Err variant"),
2048 TransactionValidity::Err(err) => {
2049 assert_eq!(err.reason(), "Nonce too low (already used)");
2050 },
2051 }
2052 }
2053
2054 #[test]
2055 fn select_sudo_account_prefers_matching_account_length() {
2056 let accounts =
2057 vec![("Alice", vec![0u8; 32]), ("Bob", vec![1u8; 32]), ("Alith", vec![2u8; 20])];
2058
2059 let (name_20, account_20) = select_sudo_account(&accounts, Some(20));
2060 assert_eq!(name_20, "Alith");
2061 assert_eq!(account_20.len(), 20);
2062
2063 let (name_32, account_32) = select_sudo_account(&accounts, Some(32));
2064 assert_eq!(name_32, "Alice");
2065 assert_eq!(account_32.len(), 32);
2066 }
2067
2068 #[test]
2069 fn select_sudo_account_falls_back_to_first_account() {
2070 let accounts = vec![("Alice", vec![0u8; 32]), ("Alith", vec![2u8; 20])];
2071
2072 let (name_none, account_none) = select_sudo_account(&accounts, None);
2073 assert_eq!(name_none, "Alice");
2074 assert_eq!(account_none.len(), 32);
2075
2076 let (name_unknown, account_unknown) = select_sudo_account(&accounts, Some(64));
2077 assert_eq!(name_unknown, "Alice");
2078 assert_eq!(account_unknown.len(), 32);
2079 }
2080}