Skip to main content

pop_fork/
remote.rs

1// SPDX-License-Identifier: GPL-3.0
2
3//! Remote storage layer for lazy-loading state from live chains.
4//!
5//! This module provides the [`RemoteStorageLayer`] which transparently fetches storage
6//! from a live chain via RPC when values aren't in the local cache. This enables
7//! "lazy forking" where state is fetched on-demand rather than requiring a full sync.
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │                    RemoteStorageLayer                            │
14//! │                                                                   │
15//! │   get(key) ─────► Cache Hit? ──── Yes ────► Return cached value │
16//! │                        │                                         │
17//! │                        No                                        │
18//! │                        │                                         │
19//! │                        ▼                                         │
20//! │                 Fetch from RPC                                   │
21//! │                        │                                         │
22//! │                        ▼                                         │
23//! │                 Store in cache                                   │
24//! │                        │                                         │
25//! │                        ▼                                         │
26//! │                 Return value                                     │
27//! └─────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # Example
31//!
32//! ```ignore
33//! use pop_fork::{ForkRpcClient, RemoteStorageLayer, StorageCache};
34//!
35//! let rpc = ForkRpcClient::connect(&"wss://rpc.polkadot.io".parse()?).await?;
36//! let cache = StorageCache::in_memory().await?;
37//! let block_hash = rpc.finalized_head().await?;
38//!
39//! let storage = RemoteStorageLayer::new(rpc, cache);
40//!
41//! // First call fetches from RPC and caches
42//! let value = storage.get(block_hash, &key).await?;
43//!
44//! // Second call returns cached value (no RPC call)
45//! let value = storage.get(block_hash, &key).await?;
46//! ```
47
48use crate::{
49	ForkRpcClient, StorageCache,
50	error::{RemoteStorageError, RpcClientError},
51	models::BlockRow,
52};
53use std::sync::{
54	Arc,
55	atomic::{AtomicUsize, Ordering},
56};
57use subxt::{Metadata, config::substrate::H256, ext::codec::Encode};
58
59/// Default number of keys to fetch per RPC call during prefix scans.
60///
61/// This balances RPC overhead (fewer calls = better) against memory usage
62/// and response latency. 1000 keys typically fits well within RPC response limits.
63const DEFAULT_PREFETCH_PAGE_SIZE: u32 = 1000;
64
65/// Minimum key length (bytes) for speculative prefix prefetch.
66///
67/// Polkadot SDK storage keys are composed of twox128(pallet) + twox128(item) = 32 bytes.
68/// Keys shorter than this are pallet-level prefixes rather than storage item keys,
69/// so speculative prefix scans on them would be too broad.
70const MIN_STORAGE_KEY_PREFIX_LEN: usize = 32;
71
72/// Counters tracking cache hits vs RPC misses for performance analysis.
73///
74/// All counters are atomic and shared across clones of the same `RemoteStorageLayer`.
75/// Use [`RemoteStorageLayer::reset_stats`] to zero them before a phase, and
76/// [`RemoteStorageLayer::stats`] to read the snapshot.
77#[derive(Debug, Default)]
78pub struct StorageStats {
79	/// Number of `get()` calls served from cache (no RPC).
80	pub cache_hits: AtomicUsize,
81	/// Number of `get()` calls that triggered a speculative prefetch and the
82	/// prefetch covered the requested key (cache hit after prefetch).
83	pub prefetch_hits: AtomicUsize,
84	/// Number of `get()` calls that fell through to an individual `state_getStorage` RPC.
85	pub rpc_misses: AtomicUsize,
86	/// Number of `next_key()` calls served from cache.
87	pub next_key_cache: AtomicUsize,
88	/// Number of `next_key()` calls that hit RPC.
89	pub next_key_rpc: AtomicUsize,
90}
91
92/// Snapshot of [`StorageStats`] counters at a point in time.
93#[derive(Debug, Clone, Default)]
94pub struct StorageStatsSnapshot {
95	pub cache_hits: usize,
96	pub prefetch_hits: usize,
97	pub rpc_misses: usize,
98	pub next_key_cache: usize,
99	pub next_key_rpc: usize,
100}
101
102impl std::fmt::Display for StorageStatsSnapshot {
103	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104		let total_get = self.cache_hits + self.prefetch_hits + self.rpc_misses;
105		let total_next = self.next_key_cache + self.next_key_rpc;
106		write!(
107			f,
108			"get: {} total ({} cache, {} prefetch, {} rpc) | next_key: {} total ({} cache, {} rpc)",
109			total_get,
110			self.cache_hits,
111			self.prefetch_hits,
112			self.rpc_misses,
113			total_next,
114			self.next_key_cache,
115			self.next_key_rpc,
116		)
117	}
118}
119
120/// Remote storage layer that lazily fetches state from a live chain.
121///
122/// Provides a cache-through abstraction: reads check the local cache first,
123/// and only fetch from the remote RPC when the value isn't cached. Fetched
124/// values are automatically cached for subsequent reads.
125///
126/// # Cloning
127///
128/// `RemoteStorageLayer` is cheap to clone. Both `ForkRpcClient` and `StorageCache`
129/// use internal reference counting (connection pools/Arc), so cloning just increments
130/// reference counts.
131///
132/// # Thread Safety
133///
134/// The layer is `Send + Sync` and can be shared across async tasks. The underlying
135/// cache handles concurrent access safely.
136#[derive(Clone, Debug)]
137pub struct RemoteStorageLayer {
138	rpc: ForkRpcClient,
139	cache: StorageCache,
140	stats: Arc<StorageStats>,
141}
142
143impl RemoteStorageLayer {
144	/// Create a new remote storage layer.
145	///
146	/// # Arguments
147	/// * `rpc` - RPC client connected to the live chain
148	/// * `cache` - Storage cache for persisting fetched values
149	pub fn new(rpc: ForkRpcClient, cache: StorageCache) -> Self {
150		Self { rpc, cache, stats: Arc::new(StorageStats::default()) }
151	}
152
153	/// Get a reference to the underlying RPC client.
154	pub fn rpc(&self) -> &ForkRpcClient {
155		&self.rpc
156	}
157
158	/// Get a reference to the underlying cache.
159	pub fn cache(&self) -> &StorageCache {
160		&self.cache
161	}
162
163	/// Get the RPC endpoint URL this layer is connected to.
164	pub fn endpoint(&self) -> &url::Url {
165		self.rpc.endpoint()
166	}
167
168	/// Take a snapshot of the current storage access counters.
169	pub fn stats(&self) -> StorageStatsSnapshot {
170		StorageStatsSnapshot {
171			cache_hits: self.stats.cache_hits.load(Ordering::Relaxed),
172			prefetch_hits: self.stats.prefetch_hits.load(Ordering::Relaxed),
173			rpc_misses: self.stats.rpc_misses.load(Ordering::Relaxed),
174			next_key_cache: self.stats.next_key_cache.load(Ordering::Relaxed),
175			next_key_rpc: self.stats.next_key_rpc.load(Ordering::Relaxed),
176		}
177	}
178
179	/// Reset all storage access counters to zero.
180	pub fn reset_stats(&self) {
181		self.stats.cache_hits.store(0, Ordering::Relaxed);
182		self.stats.prefetch_hits.store(0, Ordering::Relaxed);
183		self.stats.rpc_misses.store(0, Ordering::Relaxed);
184		self.stats.next_key_cache.store(0, Ordering::Relaxed);
185		self.stats.next_key_rpc.store(0, Ordering::Relaxed);
186	}
187
188	/// Get a storage value, fetching from RPC if not cached.
189	///
190	/// # Returns
191	/// * `Ok(Some(value))` - Storage exists with value
192	/// * `Ok(None)` - Storage key doesn't exist (empty)
193	/// * `Err(_)` - RPC or cache error
194	///
195	/// # Caching Behavior
196	/// - If the key is in cache, returns the cached value immediately
197	/// - If not cached and the key is >= 32 bytes, speculatively prefetches the first page of keys
198	///   sharing the same 32-byte prefix (pallet hash + storage item hash). This converts hundreds
199	///   of individual RPCs into a handful of bulk fetches without risking a full scan of large
200	///   maps.
201	/// - Falls back to individual RPC fetch if the key is short or the speculative prefetch didn't
202	///   cover it (key beyond first page).
203	/// - Empty storage (key exists but has no value) is cached as `None`
204	pub async fn get(
205		&self,
206		block_hash: H256,
207		key: &[u8],
208	) -> Result<Option<Vec<u8>>, RemoteStorageError> {
209		// Check cache first
210		if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
211			self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
212			return Ok(cached);
213		}
214
215		// Speculative prefix prefetch: if the key is at least 32 bytes (pallet hash +
216		// storage item hash), bulk-fetch the FIRST PAGE of keys sharing that prefix.
217		// Only fetches one page to avoid blocking on large maps (e.g., Account maps
218		// with thousands of entries). This still captures the majority of runtime
219		// reads since most storage items have fewer than 1000 keys.
220		//
221		// Errors are non-fatal: speculative prefetch is an optimization. If the
222		// connection drops mid-prefetch, we fall through to the individual fetch
223		// below which has its own retry logic.
224		if key.len() >= MIN_STORAGE_KEY_PREFIX_LEN {
225			let prefix = &key[..MIN_STORAGE_KEY_PREFIX_LEN];
226			let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
227			if progress.is_none() {
228				match self
229					.prefetch_prefix_single_page(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE)
230					.await
231				{
232					Ok(_) => {
233						// Check cache again, the prefetch likely fetched our key
234						if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
235							self.stats.prefetch_hits.fetch_add(1, Ordering::Relaxed);
236							return Ok(cached);
237						}
238					},
239					Err(e) => {
240						log::debug!(
241							"Speculative prefetch failed (non-fatal), falling through to individual fetch: {e}"
242						);
243					},
244				}
245			}
246		}
247
248		// Fallback: fetch individual key from RPC (with reconnect-retry)
249		self.stats.rpc_misses.fetch_add(1, Ordering::Relaxed);
250		let value = match self.rpc.storage(key, block_hash).await {
251			Ok(v) => v,
252			Err(_) => {
253				self.rpc.reconnect().await?;
254				self.rpc.storage(key, block_hash).await?
255			},
256		};
257
258		// Cache the result (including empty values)
259		self.cache.set_storage(block_hash, key, value.as_deref()).await?;
260
261		Ok(value)
262	}
263
264	/// Get multiple storage values in a batch, fetching uncached keys from RPC.
265	///
266	/// # Arguments
267	/// * `block_hash` - The hash of the block being queried.
268	/// * `keys` - Slice of storage keys to fetch (as byte slices to avoid unnecessary allocations)
269	///
270	/// # Returns
271	/// A vector of optional values, in the same order as the input keys.
272	///
273	/// # Caching Behavior
274	/// - Checks cache for all keys first
275	/// - Only fetches uncached keys from RPC
276	/// - Caches all fetched values (including empty ones)
277	/// - Returns results in the same order as input keys
278	pub async fn get_batch(
279		&self,
280		block_hash: H256,
281		keys: &[&[u8]],
282	) -> Result<Vec<Option<Vec<u8>>>, RemoteStorageError> {
283		if keys.is_empty() {
284			return Ok(vec![]);
285		}
286
287		// Check cache for all keys
288		let cached_results = self.cache.get_storage_batch(block_hash, keys).await?;
289
290		// Find which keys need to be fetched
291		let mut uncached_indices: Vec<usize> = Vec::new();
292		let mut uncached_keys: Vec<&[u8]> = Vec::new();
293
294		for (i, cached) in cached_results.iter().enumerate() {
295			if cached.is_none() {
296				uncached_indices.push(i);
297				uncached_keys.push(keys[i]);
298			}
299		}
300
301		// If everything was cached, return immediately
302		if uncached_keys.is_empty() {
303			return Ok(cached_results.into_iter().map(|c| c.flatten()).collect());
304		}
305
306		// Fetch uncached keys from RPC (with reconnect-retry)
307		let fetched_values = match self.rpc.storage_batch(&uncached_keys, block_hash).await {
308			Ok(v) => v,
309			Err(_) => {
310				self.rpc.reconnect().await?;
311				self.rpc.storage_batch(&uncached_keys, block_hash).await?
312			},
313		};
314
315		// Cache fetched values
316		let cache_entries: Vec<(&[u8], Option<&[u8]>)> = uncached_keys
317			.iter()
318			.zip(fetched_values.iter())
319			.map(|(k, v)| (*k, v.as_deref()))
320			.collect();
321
322		if !cache_entries.is_empty() {
323			self.cache.set_storage_batch(block_hash, &cache_entries).await?;
324		}
325
326		// Build final result, merging cached and fetched values
327		let mut results: Vec<Option<Vec<u8>>> =
328			cached_results.into_iter().map(|c| c.flatten()).collect();
329
330		for (i, idx) in uncached_indices.into_iter().enumerate() {
331			results[idx] = fetched_values[i].clone();
332		}
333
334		Ok(results)
335	}
336
337	/// Prefetch a range of storage keys by prefix (resumable).
338	///
339	/// Fetches all keys matching the prefix and caches their values.
340	/// This operation is resumable - if interrupted, calling it again will
341	/// continue from where it left off.
342	///
343	/// # Arguments
344	/// * `block_hash`.
345	/// * `prefix` - Storage key prefix to match
346	/// * `page_size` - Number of keys to fetch per RPC call
347	///
348	/// # Returns
349	/// The total number of keys for this prefix (including previously cached).
350	pub async fn prefetch_prefix(
351		&self,
352		block_hash: H256,
353		prefix: &[u8],
354		page_size: u32,
355	) -> Result<usize, RemoteStorageError> {
356		// Check existing progress
357		let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
358
359		if let Some(ref p) = progress &&
360			p.is_complete
361		{
362			// Already done - return cached count
363			return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
364		}
365
366		// Resume from last scanned key if we have progress
367		let mut start_key = progress.and_then(|p| p.last_scanned_key);
368
369		loop {
370			// Get next page of keys (with reconnect-retry)
371			let keys = match self
372				.rpc
373				.storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
374				.await
375			{
376				Ok(v) => v,
377				Err(_) => {
378					self.rpc.reconnect().await?;
379					self.rpc
380						.storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
381						.await?
382				},
383			};
384
385			if keys.is_empty() {
386				// No keys found - mark as complete if this is the first page
387				if start_key.is_none() {
388					// Empty prefix, mark complete with empty marker
389					self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
390				}
391				break;
392			}
393
394			// Determine pagination state before consuming keys
395			let is_last_page = keys.len() < page_size as usize;
396
397			// Fetch values for these keys (with reconnect-retry)
398			let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
399			let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
400				Ok(v) => v,
401				Err(_) => {
402					self.rpc.reconnect().await?;
403					self.rpc.storage_batch(&key_refs, block_hash).await?
404				},
405			};
406
407			// Cache all key-value pairs
408			let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
409				key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();
410
411			self.cache.set_storage_batch(block_hash, &cache_entries).await?;
412
413			// Update progress with the last key from this page.
414			// We consume keys here to avoid cloning for the next iteration's start_key.
415			let last_key = keys.into_iter().last();
416			if let Some(ref key) = last_key {
417				self.cache.update_prefix_scan(block_hash, prefix, key, is_last_page).await?;
418			}
419
420			if is_last_page {
421				break;
422			}
423
424			// Set up for next page (last_key is already owned, no extra allocation)
425			start_key = last_key;
426		}
427
428		// Return total count (includes any previously cached keys)
429		Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?)
430	}
431
432	/// Fetch a single page of keys for a prefix and cache their values.
433	///
434	/// Unlike [`prefetch_prefix`](Self::prefetch_prefix), this fetches only the first
435	/// page of keys (up to `page_size`) without looping through subsequent pages.
436	/// This keeps the cost bounded regardless of how many keys exist under the prefix.
437	///
438	/// Records scan progress so that subsequent calls to `prefetch_prefix` can
439	/// resume from where this left off.
440	pub async fn prefetch_prefix_single_page(
441		&self,
442		block_hash: H256,
443		prefix: &[u8],
444		page_size: u32,
445	) -> Result<usize, RemoteStorageError> {
446		// Check existing progress
447		let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
448
449		if let Some(ref p) = progress {
450			if p.is_complete {
451				return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
452			}
453			// A scan is already in progress (from a concurrent call or prior run),
454			// don't start another one.
455			return Ok(0);
456		}
457
458		// Fetch first page of keys (with reconnect-retry)
459		let keys = match self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await {
460			Ok(v) => v,
461			Err(_) => {
462				self.rpc.reconnect().await?;
463				self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await?
464			},
465		};
466
467		if keys.is_empty() {
468			self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
469			return Ok(0);
470		}
471
472		let is_last_page = keys.len() < page_size as usize;
473
474		// Fetch values for these keys (with reconnect-retry)
475		let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
476		let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
477			Ok(v) => v,
478			Err(_) => {
479				self.rpc.reconnect().await?;
480				self.rpc.storage_batch(&key_refs, block_hash).await?
481			},
482		};
483
484		// Cache all key-value pairs
485		let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
486			key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();
487
488		self.cache.set_storage_batch(block_hash, &cache_entries).await?;
489
490		let count = keys.len();
491		if let Some(last_key) = keys.into_iter().last() {
492			self.cache
493				.update_prefix_scan(block_hash, prefix, &last_key, is_last_page)
494				.await?;
495		}
496
497		Ok(count)
498	}
499
500	/// Get all keys for a prefix, fetching from RPC if not fully cached.
501	///
502	/// This is a convenience method that:
503	/// 1. Ensures the prefix is fully scanned (calls [`Self::prefetch_prefix`] if needed)
504	/// 2. Returns all cached keys matching the prefix
505	///
506	/// Useful for enumerating all entries in a storage map (e.g., all accounts
507	/// in a balances pallet).
508	///
509	/// # Arguments
510	/// * `block_hash` - Block hash to query at
511	/// * `prefix` - Storage key prefix to match (typically a pallet + storage item prefix)
512	///
513	/// # Returns
514	/// All keys matching the prefix at the specified block hash.
515	///
516	/// # Performance
517	/// First call may be slow if the prefix hasn't been scanned yet.
518	/// Subsequent calls return cached data immediately.
519	pub async fn get_keys(
520		&self,
521		block_hash: H256,
522		prefix: &[u8],
523	) -> Result<Vec<Vec<u8>>, RemoteStorageError> {
524		// Ensure prefix is fully scanned
525		self.prefetch_prefix(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE).await?;
526
527		// Return from cache
528		Ok(self.cache.get_keys_by_prefix(block_hash, prefix).await?)
529	}
530
531	/// Fetch a block by number from the remote RPC and cache it.
532	///
533	/// This method fetches the block data for the given block number and caches
534	/// the block metadata in the cache.
535	///
536	/// # Arguments
537	/// * `block_number` - The block number to fetch and cache
538	///
539	/// # Returns
540	/// * `Ok(Some(block_row))` - Block was fetched and cached successfully
541	/// * `Ok(None)` - Block number doesn't exist
542	/// * `Err(_)` - RPC or cache error
543	///
544	/// # Caching Behavior
545	/// - Fetches block hash and data from block number using `chain_getBlockHash` and
546	///   `chain_getBlock`
547	/// - Caches block metadata (hash, number, parent_hash, header) in the cache
548	/// - If block is already cached, this will update the cache entry
549	pub async fn fetch_and_cache_block_by_number(
550		&self,
551		block_number: u32,
552	) -> Result<Option<BlockRow>, RemoteStorageError> {
553		// Get block hash and full block data in one call
554		let (block_hash, block) = match self.rpc.block_by_number(block_number).await? {
555			Some((hash, block)) => (hash, block),
556			None => return Ok(None),
557		};
558
559		// Extract header and parent hash
560		let header = block.header;
561		let parent_hash = header.parent_hash;
562		let header_encoded = header.encode();
563
564		// Cache the block
565		self.cache
566			.cache_block(block_hash, block_number, parent_hash, &header_encoded)
567			.await?;
568
569		// Return the cached block row
570		Ok(Some(BlockRow {
571			hash: block_hash.as_bytes().to_vec(),
572			number: block_number as i64,
573			parent_hash: parent_hash.as_bytes().to_vec(),
574			header: header_encoded,
575		}))
576	}
577
578	/// Get the next key after the given key that starts with the prefix.
579	///
580	/// This method is used for key enumeration during runtime execution.
581	/// Before hitting the RPC, it checks whether a complete prefix scan exists
582	/// in the cache for the queried prefix (or parent prefixes at 32 or 16 bytes).
583	/// If so, the answer is served from the local SQLite cache, avoiding an RPC
584	/// round-trip entirely.
585	///
586	/// # Arguments
587	/// * `block_hash` - Block hash to query at
588	/// * `prefix` - Storage key prefix to match
589	/// * `key` - The current key; returns the next key after this one
590	///
591	/// # Returns
592	/// * `Ok(Some(key))` - The next key after `key` that starts with `prefix`
593	/// * `Ok(None)` - No more keys with this prefix
594	pub async fn next_key(
595		&self,
596		block_hash: H256,
597		prefix: &[u8],
598		key: &[u8],
599	) -> Result<Option<Vec<u8>>, RemoteStorageError> {
600		// Check if we have a complete prefix scan that covers this query.
601		// Try the exact prefix first, then common parent lengths (32-byte = pallet+item,
602		// 16-byte = pallet-only).
603		let candidate_lengths: &[usize] = &[prefix.len(), 32, 16];
604		for &len in candidate_lengths {
605			if len > prefix.len() {
606				continue;
607			}
608			let candidate = &prefix[..len];
609			if let Some(progress) =
610				self.cache.get_prefix_scan_progress(block_hash, candidate).await? &&
611				progress.is_complete
612			{
613				self.stats.next_key_cache.fetch_add(1, Ordering::Relaxed);
614				return Ok(self.cache.next_key_from_cache(block_hash, prefix, key).await?);
615			}
616		}
617
618		// Fallback: fetch from RPC (with reconnect-retry)
619		self.stats.next_key_rpc.fetch_add(1, Ordering::Relaxed);
620		let keys = match self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await {
621			Ok(v) => v,
622			Err(_) => {
623				self.rpc.reconnect().await?;
624				self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await?
625			},
626		};
627		Ok(keys.into_iter().next())
628	}
629
630	// ============================================================================
631	// Block and header fetching methods
632	// ============================================================================
633	// These methods provide access to block data from the remote chain,
634	// allowing Blockchain to delegate remote queries without directly
635	// interfacing with ForkRpcClient.
636
637	/// Get block body (extrinsics) by hash from the remote chain.
638	///
639	/// # Returns
640	/// * `Ok(Some(extrinsics))` - Block found, returns list of encoded extrinsics
641	/// * `Ok(None)` - Block not found
642	pub async fn block_body(&self, hash: H256) -> Result<Option<Vec<Vec<u8>>>, RemoteStorageError> {
643		match self.rpc.block_by_hash(hash).await? {
644			Some(block) => {
645				let extrinsics = block.extrinsics.into_iter().map(|ext| ext.0.to_vec()).collect();
646				Ok(Some(extrinsics))
647			},
648			None => Ok(None),
649		}
650	}
651
652	/// Get block header by hash from the remote chain.
653	///
654	/// # Returns
655	/// * `Ok(Some(header_bytes))` - Encoded header bytes
656	/// * `Ok(None)` - Block not found on the remote chain
657	/// * `Err(..)` - Transport/connection error (caller should retry or reconnect)
658	pub async fn block_header(&self, hash: H256) -> Result<Option<Vec<u8>>, RemoteStorageError> {
659		match self.rpc.header(hash).await {
660			Ok(header) => Ok(Some(header.encode())),
661			// Header not found (RPC returned null): legitimate "not found"
662			Err(RpcClientError::InvalidResponse(_)) => Ok(None),
663			// Connection/transport errors must be propagated so callers can reconnect
664			Err(e) => Err(e.into()),
665		}
666	}
667
668	/// Get block hash by block number from the remote chain.
669	///
670	/// # Returns
671	/// * `Ok(Some(hash))` - Block hash at the given number
672	/// * `Ok(None)` - Block number not found
673	pub async fn block_hash_by_number(
674		&self,
675		block_number: u32,
676	) -> Result<Option<H256>, RemoteStorageError> {
677		Ok(self.rpc.block_hash_at(block_number).await?)
678	}
679
680	/// Get block number by hash from the remote chain.
681	///
682	/// This method checks the persistent SQLite cache first before hitting RPC.
683	/// Results are cached for future lookups.
684	///
685	/// # Returns
686	/// * `Ok(Some(number))` - Block number for the given hash
687	/// * `Ok(None)` - Block not found
688	pub async fn block_number_by_hash(
689		&self,
690		hash: H256,
691	) -> Result<Option<u32>, RemoteStorageError> {
692		// Check cache first
693		if let Some(block) = self.cache.get_block(hash).await? {
694			return Ok(Some(block.number as u32));
695		}
696
697		// Fetch from RPC
698		match self.rpc.block_by_hash(hash).await? {
699			Some(block) => {
700				let number = block.header.number;
701				let parent_hash = block.header.parent_hash;
702				let header_encoded = block.header.encode();
703
704				// Cache for future lookups
705				self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;
706
707				Ok(Some(number))
708			},
709			None => Ok(None),
710		}
711	}
712
713	/// Get parent hash of a block from the remote chain.
714	///
715	/// This method checks the persistent SQLite cache first before hitting RPC.
716	/// Results are cached for future lookups.
717	///
718	/// # Returns
719	/// * `Ok(Some(parent_hash))` - Parent hash of the block
720	/// * `Ok(None)` - Block not found
721	pub async fn parent_hash(&self, hash: H256) -> Result<Option<H256>, RemoteStorageError> {
722		// Check cache first
723		if let Some(block) = self.cache.get_block(hash).await? {
724			let parent_hash = H256::from_slice(&block.parent_hash);
725			return Ok(Some(parent_hash));
726		}
727
728		// Fetch from RPC
729		match self.rpc.block_by_hash(hash).await? {
730			Some(block) => {
731				let number = block.header.number;
732				let parent_hash = block.header.parent_hash;
733				let header_encoded = block.header.encode();
734
735				// Cache for future lookups
736				self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;
737
738				Ok(Some(parent_hash))
739			},
740			None => Ok(None),
741		}
742	}
743
744	/// Get full block data (hash and block) by number from the remote chain.
745	///
746	/// # Returns
747	/// * `Ok(Some((hash, block)))` - Block found
748	/// * `Ok(None)` - Block number not found
749	pub async fn block_by_number(
750		&self,
751		block_number: u32,
752	) -> Result<
753		Option<(H256, subxt::backend::legacy::rpc_methods::Block<subxt::SubstrateConfig>)>,
754		RemoteStorageError,
755	> {
756		Ok(self.rpc.block_by_number(block_number).await?)
757	}
758
759	/// Get the latest finalized block hash from the remote chain.
760	pub async fn finalized_head(&self) -> Result<H256, RemoteStorageError> {
761		Ok(self.rpc.finalized_head().await?)
762	}
763
764	/// Get decoded metadata at a specific block from the remote chain.
765	pub async fn metadata(&self, block_hash: H256) -> Result<Metadata, RemoteStorageError> {
766		Ok(self.rpc.metadata(block_hash).await?)
767	}
768}
769
770#[cfg(test)]
771mod tests {
772	use super::*;
773
774	#[test]
775	fn error_display_rpc() {
776		use crate::error::RpcClientError;
777		let inner = RpcClientError::InvalidResponse("test".to_string());
778		let err = RemoteStorageError::Rpc(inner);
779		assert!(err.to_string().contains("RPC error"));
780	}
781
782	#[test]
783	fn error_display_cache() {
784		use crate::error::CacheError;
785		let inner = CacheError::DataCorruption("test".to_string());
786		let err = RemoteStorageError::Cache(inner);
787		assert!(err.to_string().contains("Cache error"));
788	}
789}