pop_fork/remote.rs
1// SPDX-License-Identifier: GPL-3.0
2
3//! Remote storage layer for lazy-loading state from live chains.
4//!
5//! This module provides the [`RemoteStorageLayer`] which transparently fetches storage
6//! from a live chain via RPC when values aren't in the local cache. This enables
7//! "lazy forking" where state is fetched on-demand rather than requiring a full sync.
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │ RemoteStorageLayer │
14//! │ │
15//! │ get(key) ─────► Cache Hit? ──── Yes ────► Return cached value │
16//! │ │ │
17//! │ No │
18//! │ │ │
19//! │ ▼ │
20//! │ Fetch from RPC │
21//! │ │ │
22//! │ ▼ │
23//! │ Store in cache │
24//! │ │ │
25//! │ ▼ │
26//! │ Return value │
27//! └─────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # Example
31//!
32//! ```ignore
33//! use pop_fork::{ForkRpcClient, RemoteStorageLayer, StorageCache};
34//!
35//! let rpc = ForkRpcClient::connect(&"wss://rpc.polkadot.io".parse()?).await?;
36//! let cache = StorageCache::in_memory().await?;
37//! let block_hash = rpc.finalized_head().await?;
38//!
39//! let storage = RemoteStorageLayer::new(rpc, cache);
40//!
41//! // First call fetches from RPC and caches
42//! let value = storage.get(block_hash, &key).await?;
43//!
44//! // Second call returns cached value (no RPC call)
45//! let value = storage.get(block_hash, &key).await?;
46//! ```
47
48use crate::{
49 ForkRpcClient, StorageCache,
50 error::{RemoteStorageError, RpcClientError},
51 models::BlockRow,
52};
53use std::sync::{
54 Arc,
55 atomic::{AtomicUsize, Ordering},
56};
57use subxt::{Metadata, config::substrate::H256, ext::codec::Encode};
58
59/// Default number of keys to fetch per RPC call during prefix scans.
60///
61/// This balances RPC overhead (fewer calls = better) against memory usage
62/// and response latency. 1000 keys typically fits well within RPC response limits.
63const DEFAULT_PREFETCH_PAGE_SIZE: u32 = 1000;
64
65/// Minimum key length (bytes) for speculative prefix prefetch.
66///
67/// Polkadot SDK storage keys are composed of twox128(pallet) + twox128(item) = 32 bytes.
68/// Keys shorter than this are pallet-level prefixes rather than storage item keys,
69/// so speculative prefix scans on them would be too broad.
70const MIN_STORAGE_KEY_PREFIX_LEN: usize = 32;
71
72/// Counters tracking cache hits vs RPC misses for performance analysis.
73///
74/// All counters are atomic and shared across clones of the same `RemoteStorageLayer`.
75/// Use [`RemoteStorageLayer::reset_stats`] to zero them before a phase, and
76/// [`RemoteStorageLayer::stats`] to read the snapshot.
77#[derive(Debug, Default)]
78pub struct StorageStats {
79 /// Number of `get()` calls served from cache (no RPC).
80 pub cache_hits: AtomicUsize,
81 /// Number of `get()` calls that triggered a speculative prefetch and the
82 /// prefetch covered the requested key (cache hit after prefetch).
83 pub prefetch_hits: AtomicUsize,
84 /// Number of `get()` calls that fell through to an individual `state_getStorage` RPC.
85 pub rpc_misses: AtomicUsize,
86 /// Number of `next_key()` calls served from cache.
87 pub next_key_cache: AtomicUsize,
88 /// Number of `next_key()` calls that hit RPC.
89 pub next_key_rpc: AtomicUsize,
90}
91
92/// Snapshot of [`StorageStats`] counters at a point in time.
93#[derive(Debug, Clone, Default)]
94pub struct StorageStatsSnapshot {
95 pub cache_hits: usize,
96 pub prefetch_hits: usize,
97 pub rpc_misses: usize,
98 pub next_key_cache: usize,
99 pub next_key_rpc: usize,
100}
101
102impl std::fmt::Display for StorageStatsSnapshot {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 let total_get = self.cache_hits + self.prefetch_hits + self.rpc_misses;
105 let total_next = self.next_key_cache + self.next_key_rpc;
106 write!(
107 f,
108 "get: {} total ({} cache, {} prefetch, {} rpc) | next_key: {} total ({} cache, {} rpc)",
109 total_get,
110 self.cache_hits,
111 self.prefetch_hits,
112 self.rpc_misses,
113 total_next,
114 self.next_key_cache,
115 self.next_key_rpc,
116 )
117 }
118}
119
120/// Remote storage layer that lazily fetches state from a live chain.
121///
122/// Provides a cache-through abstraction: reads check the local cache first,
123/// and only fetch from the remote RPC when the value isn't cached. Fetched
124/// values are automatically cached for subsequent reads.
125///
126/// # Cloning
127///
128/// `RemoteStorageLayer` is cheap to clone. Both `ForkRpcClient` and `StorageCache`
129/// use internal reference counting (connection pools/Arc), so cloning just increments
130/// reference counts.
131///
132/// # Thread Safety
133///
134/// The layer is `Send + Sync` and can be shared across async tasks. The underlying
135/// cache handles concurrent access safely.
136#[derive(Clone, Debug)]
137pub struct RemoteStorageLayer {
138 rpc: ForkRpcClient,
139 cache: StorageCache,
140 stats: Arc<StorageStats>,
141}
142
143impl RemoteStorageLayer {
144 /// Create a new remote storage layer.
145 ///
146 /// # Arguments
147 /// * `rpc` - RPC client connected to the live chain
148 /// * `cache` - Storage cache for persisting fetched values
149 pub fn new(rpc: ForkRpcClient, cache: StorageCache) -> Self {
150 Self { rpc, cache, stats: Arc::new(StorageStats::default()) }
151 }
152
153 /// Get a reference to the underlying RPC client.
154 pub fn rpc(&self) -> &ForkRpcClient {
155 &self.rpc
156 }
157
158 /// Get a reference to the underlying cache.
159 pub fn cache(&self) -> &StorageCache {
160 &self.cache
161 }
162
163 /// Get the RPC endpoint URL this layer is connected to.
164 pub fn endpoint(&self) -> &url::Url {
165 self.rpc.endpoint()
166 }
167
168 /// Take a snapshot of the current storage access counters.
169 pub fn stats(&self) -> StorageStatsSnapshot {
170 StorageStatsSnapshot {
171 cache_hits: self.stats.cache_hits.load(Ordering::Relaxed),
172 prefetch_hits: self.stats.prefetch_hits.load(Ordering::Relaxed),
173 rpc_misses: self.stats.rpc_misses.load(Ordering::Relaxed),
174 next_key_cache: self.stats.next_key_cache.load(Ordering::Relaxed),
175 next_key_rpc: self.stats.next_key_rpc.load(Ordering::Relaxed),
176 }
177 }
178
179 /// Reset all storage access counters to zero.
180 pub fn reset_stats(&self) {
181 self.stats.cache_hits.store(0, Ordering::Relaxed);
182 self.stats.prefetch_hits.store(0, Ordering::Relaxed);
183 self.stats.rpc_misses.store(0, Ordering::Relaxed);
184 self.stats.next_key_cache.store(0, Ordering::Relaxed);
185 self.stats.next_key_rpc.store(0, Ordering::Relaxed);
186 }
187
188 /// Get a storage value, fetching from RPC if not cached.
189 ///
190 /// # Returns
191 /// * `Ok(Some(value))` - Storage exists with value
192 /// * `Ok(None)` - Storage key doesn't exist (empty)
193 /// * `Err(_)` - RPC or cache error
194 ///
195 /// # Caching Behavior
196 /// - If the key is in cache, returns the cached value immediately
197 /// - If not cached and the key is >= 32 bytes, speculatively prefetches the first page of keys
198 /// sharing the same 32-byte prefix (pallet hash + storage item hash). This converts hundreds
199 /// of individual RPCs into a handful of bulk fetches without risking a full scan of large
200 /// maps.
201 /// - Falls back to individual RPC fetch if the key is short or the speculative prefetch didn't
202 /// cover it (key beyond first page).
203 /// - Empty storage (key exists but has no value) is cached as `None`
204 pub async fn get(
205 &self,
206 block_hash: H256,
207 key: &[u8],
208 ) -> Result<Option<Vec<u8>>, RemoteStorageError> {
209 // Check cache first
210 if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
211 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
212 return Ok(cached);
213 }
214
215 // Speculative prefix prefetch: if the key is at least 32 bytes (pallet hash +
216 // storage item hash), bulk-fetch the FIRST PAGE of keys sharing that prefix.
217 // Only fetches one page to avoid blocking on large maps (e.g., Account maps
218 // with thousands of entries). This still captures the majority of runtime
219 // reads since most storage items have fewer than 1000 keys.
220 //
221 // Errors are non-fatal: speculative prefetch is an optimization. If the
222 // connection drops mid-prefetch, we fall through to the individual fetch
223 // below which has its own retry logic.
224 if key.len() >= MIN_STORAGE_KEY_PREFIX_LEN {
225 let prefix = &key[..MIN_STORAGE_KEY_PREFIX_LEN];
226 let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
227 if progress.is_none() {
228 match self
229 .prefetch_prefix_single_page(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE)
230 .await
231 {
232 Ok(_) => {
233 // Check cache again, the prefetch likely fetched our key
234 if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
235 self.stats.prefetch_hits.fetch_add(1, Ordering::Relaxed);
236 return Ok(cached);
237 }
238 },
239 Err(e) => {
240 log::debug!(
241 "Speculative prefetch failed (non-fatal), falling through to individual fetch: {e}"
242 );
243 },
244 }
245 }
246 }
247
248 // Fallback: fetch individual key from RPC (with reconnect-retry)
249 self.stats.rpc_misses.fetch_add(1, Ordering::Relaxed);
250 let value = match self.rpc.storage(key, block_hash).await {
251 Ok(v) => v,
252 Err(_) => {
253 self.rpc.reconnect().await?;
254 self.rpc.storage(key, block_hash).await?
255 },
256 };
257
258 // Cache the result (including empty values)
259 self.cache.set_storage(block_hash, key, value.as_deref()).await?;
260
261 Ok(value)
262 }
263
264 /// Get multiple storage values in a batch, fetching uncached keys from RPC.
265 ///
266 /// # Arguments
267 /// * `block_hash` - The hash of the block being queried.
268 /// * `keys` - Slice of storage keys to fetch (as byte slices to avoid unnecessary allocations)
269 ///
270 /// # Returns
271 /// A vector of optional values, in the same order as the input keys.
272 ///
273 /// # Caching Behavior
274 /// - Checks cache for all keys first
275 /// - Only fetches uncached keys from RPC
276 /// - Caches all fetched values (including empty ones)
277 /// - Returns results in the same order as input keys
278 pub async fn get_batch(
279 &self,
280 block_hash: H256,
281 keys: &[&[u8]],
282 ) -> Result<Vec<Option<Vec<u8>>>, RemoteStorageError> {
283 if keys.is_empty() {
284 return Ok(vec![]);
285 }
286
287 // Check cache for all keys
288 let cached_results = self.cache.get_storage_batch(block_hash, keys).await?;
289
290 // Find which keys need to be fetched
291 let mut uncached_indices: Vec<usize> = Vec::new();
292 let mut uncached_keys: Vec<&[u8]> = Vec::new();
293
294 for (i, cached) in cached_results.iter().enumerate() {
295 if cached.is_none() {
296 uncached_indices.push(i);
297 uncached_keys.push(keys[i]);
298 }
299 }
300
301 // If everything was cached, return immediately
302 if uncached_keys.is_empty() {
303 return Ok(cached_results.into_iter().map(|c| c.flatten()).collect());
304 }
305
306 // Fetch uncached keys from RPC (with reconnect-retry)
307 let fetched_values = match self.rpc.storage_batch(&uncached_keys, block_hash).await {
308 Ok(v) => v,
309 Err(_) => {
310 self.rpc.reconnect().await?;
311 self.rpc.storage_batch(&uncached_keys, block_hash).await?
312 },
313 };
314
315 // Cache fetched values
316 let cache_entries: Vec<(&[u8], Option<&[u8]>)> = uncached_keys
317 .iter()
318 .zip(fetched_values.iter())
319 .map(|(k, v)| (*k, v.as_deref()))
320 .collect();
321
322 if !cache_entries.is_empty() {
323 self.cache.set_storage_batch(block_hash, &cache_entries).await?;
324 }
325
326 // Build final result, merging cached and fetched values
327 let mut results: Vec<Option<Vec<u8>>> =
328 cached_results.into_iter().map(|c| c.flatten()).collect();
329
330 for (i, idx) in uncached_indices.into_iter().enumerate() {
331 results[idx] = fetched_values[i].clone();
332 }
333
334 Ok(results)
335 }
336
337 /// Prefetch a range of storage keys by prefix (resumable).
338 ///
339 /// Fetches all keys matching the prefix and caches their values.
340 /// This operation is resumable - if interrupted, calling it again will
341 /// continue from where it left off.
342 ///
343 /// # Arguments
344 /// * `block_hash`.
345 /// * `prefix` - Storage key prefix to match
346 /// * `page_size` - Number of keys to fetch per RPC call
347 ///
348 /// # Returns
349 /// The total number of keys for this prefix (including previously cached).
350 pub async fn prefetch_prefix(
351 &self,
352 block_hash: H256,
353 prefix: &[u8],
354 page_size: u32,
355 ) -> Result<usize, RemoteStorageError> {
356 // Check existing progress
357 let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
358
359 if let Some(ref p) = progress &&
360 p.is_complete
361 {
362 // Already done - return cached count
363 return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
364 }
365
366 // Resume from last scanned key if we have progress
367 let mut start_key = progress.and_then(|p| p.last_scanned_key);
368
369 loop {
370 // Get next page of keys (with reconnect-retry)
371 let keys = match self
372 .rpc
373 .storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
374 .await
375 {
376 Ok(v) => v,
377 Err(_) => {
378 self.rpc.reconnect().await?;
379 self.rpc
380 .storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
381 .await?
382 },
383 };
384
385 if keys.is_empty() {
386 // No keys found - mark as complete if this is the first page
387 if start_key.is_none() {
388 // Empty prefix, mark complete with empty marker
389 self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
390 }
391 break;
392 }
393
394 // Determine pagination state before consuming keys
395 let is_last_page = keys.len() < page_size as usize;
396
397 // Fetch values for these keys (with reconnect-retry)
398 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
399 let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
400 Ok(v) => v,
401 Err(_) => {
402 self.rpc.reconnect().await?;
403 self.rpc.storage_batch(&key_refs, block_hash).await?
404 },
405 };
406
407 // Cache all key-value pairs
408 let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
409 key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();
410
411 self.cache.set_storage_batch(block_hash, &cache_entries).await?;
412
413 // Update progress with the last key from this page.
414 // We consume keys here to avoid cloning for the next iteration's start_key.
415 let last_key = keys.into_iter().last();
416 if let Some(ref key) = last_key {
417 self.cache.update_prefix_scan(block_hash, prefix, key, is_last_page).await?;
418 }
419
420 if is_last_page {
421 break;
422 }
423
424 // Set up for next page (last_key is already owned, no extra allocation)
425 start_key = last_key;
426 }
427
428 // Return total count (includes any previously cached keys)
429 Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?)
430 }
431
432 /// Fetch a single page of keys for a prefix and cache their values.
433 ///
434 /// Unlike [`prefetch_prefix`](Self::prefetch_prefix), this fetches only the first
435 /// page of keys (up to `page_size`) without looping through subsequent pages.
436 /// This keeps the cost bounded regardless of how many keys exist under the prefix.
437 ///
438 /// Records scan progress so that subsequent calls to `prefetch_prefix` can
439 /// resume from where this left off.
440 pub async fn prefetch_prefix_single_page(
441 &self,
442 block_hash: H256,
443 prefix: &[u8],
444 page_size: u32,
445 ) -> Result<usize, RemoteStorageError> {
446 // Check existing progress
447 let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
448
449 if let Some(ref p) = progress {
450 if p.is_complete {
451 return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
452 }
453 // A scan is already in progress (from a concurrent call or prior run),
454 // don't start another one.
455 return Ok(0);
456 }
457
458 // Fetch first page of keys (with reconnect-retry)
459 let keys = match self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await {
460 Ok(v) => v,
461 Err(_) => {
462 self.rpc.reconnect().await?;
463 self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await?
464 },
465 };
466
467 if keys.is_empty() {
468 self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
469 return Ok(0);
470 }
471
472 let is_last_page = keys.len() < page_size as usize;
473
474 // Fetch values for these keys (with reconnect-retry)
475 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
476 let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
477 Ok(v) => v,
478 Err(_) => {
479 self.rpc.reconnect().await?;
480 self.rpc.storage_batch(&key_refs, block_hash).await?
481 },
482 };
483
484 // Cache all key-value pairs
485 let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
486 key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();
487
488 self.cache.set_storage_batch(block_hash, &cache_entries).await?;
489
490 let count = keys.len();
491 if let Some(last_key) = keys.into_iter().last() {
492 self.cache
493 .update_prefix_scan(block_hash, prefix, &last_key, is_last_page)
494 .await?;
495 }
496
497 Ok(count)
498 }
499
500 /// Get all keys for a prefix, fetching from RPC if not fully cached.
501 ///
502 /// This is a convenience method that:
503 /// 1. Ensures the prefix is fully scanned (calls [`Self::prefetch_prefix`] if needed)
504 /// 2. Returns all cached keys matching the prefix
505 ///
506 /// Useful for enumerating all entries in a storage map (e.g., all accounts
507 /// in a balances pallet).
508 ///
509 /// # Arguments
510 /// * `block_hash` - Block hash to query at
511 /// * `prefix` - Storage key prefix to match (typically a pallet + storage item prefix)
512 ///
513 /// # Returns
514 /// All keys matching the prefix at the specified block hash.
515 ///
516 /// # Performance
517 /// First call may be slow if the prefix hasn't been scanned yet.
518 /// Subsequent calls return cached data immediately.
519 pub async fn get_keys(
520 &self,
521 block_hash: H256,
522 prefix: &[u8],
523 ) -> Result<Vec<Vec<u8>>, RemoteStorageError> {
524 // Ensure prefix is fully scanned
525 self.prefetch_prefix(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE).await?;
526
527 // Return from cache
528 Ok(self.cache.get_keys_by_prefix(block_hash, prefix).await?)
529 }
530
531 /// Fetch a block by number from the remote RPC and cache it.
532 ///
533 /// This method fetches the block data for the given block number and caches
534 /// the block metadata in the cache.
535 ///
536 /// # Arguments
537 /// * `block_number` - The block number to fetch and cache
538 ///
539 /// # Returns
540 /// * `Ok(Some(block_row))` - Block was fetched and cached successfully
541 /// * `Ok(None)` - Block number doesn't exist
542 /// * `Err(_)` - RPC or cache error
543 ///
544 /// # Caching Behavior
545 /// - Fetches block hash and data from block number using `chain_getBlockHash` and
546 /// `chain_getBlock`
547 /// - Caches block metadata (hash, number, parent_hash, header) in the cache
548 /// - If block is already cached, this will update the cache entry
549 pub async fn fetch_and_cache_block_by_number(
550 &self,
551 block_number: u32,
552 ) -> Result<Option<BlockRow>, RemoteStorageError> {
553 // Get block hash and full block data in one call
554 let (block_hash, block) = match self.rpc.block_by_number(block_number).await? {
555 Some((hash, block)) => (hash, block),
556 None => return Ok(None),
557 };
558
559 // Extract header and parent hash
560 let header = block.header;
561 let parent_hash = header.parent_hash;
562 let header_encoded = header.encode();
563
564 // Cache the block
565 self.cache
566 .cache_block(block_hash, block_number, parent_hash, &header_encoded)
567 .await?;
568
569 // Return the cached block row
570 Ok(Some(BlockRow {
571 hash: block_hash.as_bytes().to_vec(),
572 number: block_number as i64,
573 parent_hash: parent_hash.as_bytes().to_vec(),
574 header: header_encoded,
575 }))
576 }
577
578 /// Get the next key after the given key that starts with the prefix.
579 ///
580 /// This method is used for key enumeration during runtime execution.
581 /// Before hitting the RPC, it checks whether a complete prefix scan exists
582 /// in the cache for the queried prefix (or parent prefixes at 32 or 16 bytes).
583 /// If so, the answer is served from the local SQLite cache, avoiding an RPC
584 /// round-trip entirely.
585 ///
586 /// # Arguments
587 /// * `block_hash` - Block hash to query at
588 /// * `prefix` - Storage key prefix to match
589 /// * `key` - The current key; returns the next key after this one
590 ///
591 /// # Returns
592 /// * `Ok(Some(key))` - The next key after `key` that starts with `prefix`
593 /// * `Ok(None)` - No more keys with this prefix
594 pub async fn next_key(
595 &self,
596 block_hash: H256,
597 prefix: &[u8],
598 key: &[u8],
599 ) -> Result<Option<Vec<u8>>, RemoteStorageError> {
600 // Check if we have a complete prefix scan that covers this query.
601 // Try the exact prefix first, then common parent lengths (32-byte = pallet+item,
602 // 16-byte = pallet-only).
603 let candidate_lengths: &[usize] = &[prefix.len(), 32, 16];
604 for &len in candidate_lengths {
605 if len > prefix.len() {
606 continue;
607 }
608 let candidate = &prefix[..len];
609 if let Some(progress) =
610 self.cache.get_prefix_scan_progress(block_hash, candidate).await? &&
611 progress.is_complete
612 {
613 self.stats.next_key_cache.fetch_add(1, Ordering::Relaxed);
614 return Ok(self.cache.next_key_from_cache(block_hash, prefix, key).await?);
615 }
616 }
617
618 // Fallback: fetch from RPC (with reconnect-retry)
619 self.stats.next_key_rpc.fetch_add(1, Ordering::Relaxed);
620 let keys = match self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await {
621 Ok(v) => v,
622 Err(_) => {
623 self.rpc.reconnect().await?;
624 self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await?
625 },
626 };
627 Ok(keys.into_iter().next())
628 }
629
630 // ============================================================================
631 // Block and header fetching methods
632 // ============================================================================
633 // These methods provide access to block data from the remote chain,
634 // allowing Blockchain to delegate remote queries without directly
635 // interfacing with ForkRpcClient.
636
637 /// Get block body (extrinsics) by hash from the remote chain.
638 ///
639 /// # Returns
640 /// * `Ok(Some(extrinsics))` - Block found, returns list of encoded extrinsics
641 /// * `Ok(None)` - Block not found
642 pub async fn block_body(&self, hash: H256) -> Result<Option<Vec<Vec<u8>>>, RemoteStorageError> {
643 match self.rpc.block_by_hash(hash).await? {
644 Some(block) => {
645 let extrinsics = block.extrinsics.into_iter().map(|ext| ext.0.to_vec()).collect();
646 Ok(Some(extrinsics))
647 },
648 None => Ok(None),
649 }
650 }
651
652 /// Get block header by hash from the remote chain.
653 ///
654 /// # Returns
655 /// * `Ok(Some(header_bytes))` - Encoded header bytes
656 /// * `Ok(None)` - Block not found on the remote chain
657 /// * `Err(..)` - Transport/connection error (caller should retry or reconnect)
658 pub async fn block_header(&self, hash: H256) -> Result<Option<Vec<u8>>, RemoteStorageError> {
659 match self.rpc.header(hash).await {
660 Ok(header) => Ok(Some(header.encode())),
661 // Header not found (RPC returned null): legitimate "not found"
662 Err(RpcClientError::InvalidResponse(_)) => Ok(None),
663 // Connection/transport errors must be propagated so callers can reconnect
664 Err(e) => Err(e.into()),
665 }
666 }
667
668 /// Get block hash by block number from the remote chain.
669 ///
670 /// # Returns
671 /// * `Ok(Some(hash))` - Block hash at the given number
672 /// * `Ok(None)` - Block number not found
673 pub async fn block_hash_by_number(
674 &self,
675 block_number: u32,
676 ) -> Result<Option<H256>, RemoteStorageError> {
677 Ok(self.rpc.block_hash_at(block_number).await?)
678 }
679
680 /// Get block number by hash from the remote chain.
681 ///
682 /// This method checks the persistent SQLite cache first before hitting RPC.
683 /// Results are cached for future lookups.
684 ///
685 /// # Returns
686 /// * `Ok(Some(number))` - Block number for the given hash
687 /// * `Ok(None)` - Block not found
688 pub async fn block_number_by_hash(
689 &self,
690 hash: H256,
691 ) -> Result<Option<u32>, RemoteStorageError> {
692 // Check cache first
693 if let Some(block) = self.cache.get_block(hash).await? {
694 return Ok(Some(block.number as u32));
695 }
696
697 // Fetch from RPC
698 match self.rpc.block_by_hash(hash).await? {
699 Some(block) => {
700 let number = block.header.number;
701 let parent_hash = block.header.parent_hash;
702 let header_encoded = block.header.encode();
703
704 // Cache for future lookups
705 self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;
706
707 Ok(Some(number))
708 },
709 None => Ok(None),
710 }
711 }
712
713 /// Get parent hash of a block from the remote chain.
714 ///
715 /// This method checks the persistent SQLite cache first before hitting RPC.
716 /// Results are cached for future lookups.
717 ///
718 /// # Returns
719 /// * `Ok(Some(parent_hash))` - Parent hash of the block
720 /// * `Ok(None)` - Block not found
721 pub async fn parent_hash(&self, hash: H256) -> Result<Option<H256>, RemoteStorageError> {
722 // Check cache first
723 if let Some(block) = self.cache.get_block(hash).await? {
724 let parent_hash = H256::from_slice(&block.parent_hash);
725 return Ok(Some(parent_hash));
726 }
727
728 // Fetch from RPC
729 match self.rpc.block_by_hash(hash).await? {
730 Some(block) => {
731 let number = block.header.number;
732 let parent_hash = block.header.parent_hash;
733 let header_encoded = block.header.encode();
734
735 // Cache for future lookups
736 self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;
737
738 Ok(Some(parent_hash))
739 },
740 None => Ok(None),
741 }
742 }
743
744 /// Get full block data (hash and block) by number from the remote chain.
745 ///
746 /// # Returns
747 /// * `Ok(Some((hash, block)))` - Block found
748 /// * `Ok(None)` - Block number not found
749 pub async fn block_by_number(
750 &self,
751 block_number: u32,
752 ) -> Result<
753 Option<(H256, subxt::backend::legacy::rpc_methods::Block<subxt::SubstrateConfig>)>,
754 RemoteStorageError,
755 > {
756 Ok(self.rpc.block_by_number(block_number).await?)
757 }
758
759 /// Get the latest finalized block hash from the remote chain.
760 pub async fn finalized_head(&self) -> Result<H256, RemoteStorageError> {
761 Ok(self.rpc.finalized_head().await?)
762 }
763
764 /// Get decoded metadata at a specific block from the remote chain.
765 pub async fn metadata(&self, block_hash: H256) -> Result<Metadata, RemoteStorageError> {
766 Ok(self.rpc.metadata(block_hash).await?)
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773
774 #[test]
775 fn error_display_rpc() {
776 use crate::error::RpcClientError;
777 let inner = RpcClientError::InvalidResponse("test".to_string());
778 let err = RemoteStorageError::Rpc(inner);
779 assert!(err.to_string().contains("RPC error"));
780 }
781
782 #[test]
783 fn error_display_cache() {
784 use crate::error::CacheError;
785 let inner = CacheError::DataCorruption("test".to_string());
786 let err = RemoteStorageError::Cache(inner);
787 assert!(err.to_string().contains("Cache error"));
788 }
789}