pop-fork 0.13.0

Library for forking live Substrate chains.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
// SPDX-License-Identifier: GPL-3.0

//! Remote storage layer for lazy-loading state from live chains.
//!
//! This module provides the [`RemoteStorageLayer`] which transparently fetches storage
//! from a live chain via RPC when values aren't in the local cache. This enables
//! "lazy forking" where state is fetched on-demand rather than requiring a full sync.
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────┐
//! │                    RemoteStorageLayer                            │
//! │                                                                   │
//! │   get(key) ─────► Cache Hit? ──── Yes ────► Return cached value │
//! │                        │                                         │
//! │                        No                                        │
//! │                        │                                         │
//! │                        ▼                                         │
//! │                 Fetch from RPC                                   │
//! │                        │                                         │
//! │                        ▼                                         │
//! │                 Store in cache                                   │
//! │                        │                                         │
//! │                        ▼                                         │
//! │                 Return value                                     │
//! └─────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Example
//!
//! ```ignore
//! use pop_fork::{ForkRpcClient, RemoteStorageLayer, StorageCache};
//!
//! let rpc = ForkRpcClient::connect(&"wss://rpc.polkadot.io".parse()?).await?;
//! let cache = StorageCache::in_memory().await?;
//! let block_hash = rpc.finalized_head().await?;
//!
//! let storage = RemoteStorageLayer::new(rpc, cache);
//!
//! // First call fetches from RPC and caches
//! let value = storage.get(block_hash, &key).await?;
//!
//! // Second call returns cached value (no RPC call)
//! let value = storage.get(block_hash, &key).await?;
//! ```

use crate::{
	ForkRpcClient, StorageCache,
	error::{RemoteStorageError, RpcClientError},
	models::BlockRow,
};
use std::sync::{
	Arc,
	atomic::{AtomicUsize, Ordering},
};
use subxt::{Metadata, config::substrate::H256, ext::codec::Encode};

/// Default number of keys to fetch per RPC call during prefix scans.
///
/// This balances RPC overhead (fewer calls = better) against memory usage
/// and response latency. 1000 keys typically fits well within RPC response limits.
const DEFAULT_PREFETCH_PAGE_SIZE: u32 = 1000;

/// Minimum key length (bytes) for speculative prefix prefetch.
///
/// Polkadot SDK storage keys are composed of twox128(pallet) + twox128(item) = 32 bytes.
/// Keys shorter than this are pallet-level prefixes rather than storage item keys,
/// so speculative prefix scans on them would be too broad.
const MIN_STORAGE_KEY_PREFIX_LEN: usize = 32;

/// Counters tracking cache hits vs RPC misses for performance analysis.
///
/// All counters are atomic and shared across clones of the same `RemoteStorageLayer`.
/// Use [`RemoteStorageLayer::reset_stats`] to zero them before a phase, and
/// [`RemoteStorageLayer::stats`] to read the snapshot.
#[derive(Debug, Default)]
pub struct StorageStats {
	/// Number of `get()` calls served from cache (no RPC).
	pub cache_hits: AtomicUsize,
	/// Number of `get()` calls that triggered a speculative prefetch and the
	/// prefetch covered the requested key (cache hit after prefetch).
	pub prefetch_hits: AtomicUsize,
	/// Number of `get()` calls that fell through to an individual `state_getStorage` RPC.
	pub rpc_misses: AtomicUsize,
	/// Number of `next_key()` calls served from cache.
	pub next_key_cache: AtomicUsize,
	/// Number of `next_key()` calls that hit RPC.
	pub next_key_rpc: AtomicUsize,
}

/// Snapshot of [`StorageStats`] counters at a point in time.
#[derive(Debug, Clone, Default)]
pub struct StorageStatsSnapshot {
	pub cache_hits: usize,
	pub prefetch_hits: usize,
	pub rpc_misses: usize,
	pub next_key_cache: usize,
	pub next_key_rpc: usize,
}

impl std::fmt::Display for StorageStatsSnapshot {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		let total_get = self.cache_hits + self.prefetch_hits + self.rpc_misses;
		let total_next = self.next_key_cache + self.next_key_rpc;
		write!(
			f,
			"get: {} total ({} cache, {} prefetch, {} rpc) | next_key: {} total ({} cache, {} rpc)",
			total_get,
			self.cache_hits,
			self.prefetch_hits,
			self.rpc_misses,
			total_next,
			self.next_key_cache,
			self.next_key_rpc,
		)
	}
}

/// Remote storage layer that lazily fetches state from a live chain.
///
/// Provides a cache-through abstraction: reads check the local cache first,
/// and only fetch from the remote RPC when the value isn't cached. Fetched
/// values are automatically cached for subsequent reads.
///
/// # Cloning
///
/// `RemoteStorageLayer` is cheap to clone. Both `ForkRpcClient` and `StorageCache`
/// use internal reference counting (connection pools/Arc), so cloning just increments
/// reference counts.
///
/// # Thread Safety
///
/// The layer is `Send + Sync` and can be shared across async tasks. The underlying
/// cache handles concurrent access safely.
#[derive(Clone, Debug)]
pub struct RemoteStorageLayer {
	rpc: ForkRpcClient,
	cache: StorageCache,
	stats: Arc<StorageStats>,
}

impl RemoteStorageLayer {
	/// Create a new remote storage layer.
	///
	/// # Arguments
	/// * `rpc` - RPC client connected to the live chain
	/// * `cache` - Storage cache for persisting fetched values
	pub fn new(rpc: ForkRpcClient, cache: StorageCache) -> Self {
		Self { rpc, cache, stats: Arc::new(StorageStats::default()) }
	}

	/// Get a reference to the underlying RPC client.
	pub fn rpc(&self) -> &ForkRpcClient {
		&self.rpc
	}

	/// Get a reference to the underlying cache.
	pub fn cache(&self) -> &StorageCache {
		&self.cache
	}

	/// Get the RPC endpoint URL this layer is connected to.
	pub fn endpoint(&self) -> &url::Url {
		self.rpc.endpoint()
	}

	/// Take a snapshot of the current storage access counters.
	pub fn stats(&self) -> StorageStatsSnapshot {
		StorageStatsSnapshot {
			cache_hits: self.stats.cache_hits.load(Ordering::Relaxed),
			prefetch_hits: self.stats.prefetch_hits.load(Ordering::Relaxed),
			rpc_misses: self.stats.rpc_misses.load(Ordering::Relaxed),
			next_key_cache: self.stats.next_key_cache.load(Ordering::Relaxed),
			next_key_rpc: self.stats.next_key_rpc.load(Ordering::Relaxed),
		}
	}

	/// Reset all storage access counters to zero.
	pub fn reset_stats(&self) {
		self.stats.cache_hits.store(0, Ordering::Relaxed);
		self.stats.prefetch_hits.store(0, Ordering::Relaxed);
		self.stats.rpc_misses.store(0, Ordering::Relaxed);
		self.stats.next_key_cache.store(0, Ordering::Relaxed);
		self.stats.next_key_rpc.store(0, Ordering::Relaxed);
	}

	/// Get a storage value, fetching from RPC if not cached.
	///
	/// # Returns
	/// * `Ok(Some(value))` - Storage exists with value
	/// * `Ok(None)` - Storage key doesn't exist (empty)
	/// * `Err(_)` - RPC or cache error
	///
	/// # Caching Behavior
	/// - If the key is in cache, returns the cached value immediately
	/// - If not cached and the key is >= 32 bytes, speculatively prefetches the first page of keys
	///   sharing the same 32-byte prefix (pallet hash + storage item hash). This converts hundreds
	///   of individual RPCs into a handful of bulk fetches without risking a full scan of large
	///   maps.
	/// - Falls back to individual RPC fetch if the key is short or the speculative prefetch didn't
	///   cover it (key beyond first page).
	/// - Empty storage (key exists but has no value) is cached as `None`
	pub async fn get(
		&self,
		block_hash: H256,
		key: &[u8],
	) -> Result<Option<Vec<u8>>, RemoteStorageError> {
		// Check cache first
		if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
			self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
			return Ok(cached);
		}

		// Speculative prefix prefetch: if the key is at least 32 bytes (pallet hash +
		// storage item hash), bulk-fetch the FIRST PAGE of keys sharing that prefix.
		// Only fetches one page to avoid blocking on large maps (e.g., Account maps
		// with thousands of entries). This still captures the majority of runtime
		// reads since most storage items have fewer than 1000 keys.
		//
		// Errors are non-fatal: speculative prefetch is an optimization. If the
		// connection drops mid-prefetch, we fall through to the individual fetch
		// below which has its own retry logic.
		if key.len() >= MIN_STORAGE_KEY_PREFIX_LEN {
			let prefix = &key[..MIN_STORAGE_KEY_PREFIX_LEN];
			let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;
			if progress.is_none() {
				match self
					.prefetch_prefix_single_page(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE)
					.await
				{
					Ok(_) => {
						// Check cache again, the prefetch likely fetched our key
						if let Some(cached) = self.cache.get_storage(block_hash, key).await? {
							self.stats.prefetch_hits.fetch_add(1, Ordering::Relaxed);
							return Ok(cached);
						}
					},
					Err(e) => {
						log::debug!(
							"Speculative prefetch failed (non-fatal), falling through to individual fetch: {e}"
						);
					},
				}
			}
		}

		// Fallback: fetch individual key from RPC (with reconnect-retry)
		self.stats.rpc_misses.fetch_add(1, Ordering::Relaxed);
		let value = match self.rpc.storage(key, block_hash).await {
			Ok(v) => v,
			Err(_) => {
				self.rpc.reconnect().await?;
				self.rpc.storage(key, block_hash).await?
			},
		};

		// Cache the result (including empty values)
		self.cache.set_storage(block_hash, key, value.as_deref()).await?;

		Ok(value)
	}

	/// Get multiple storage values in a batch, fetching uncached keys from RPC.
	///
	/// # Arguments
	/// * `block_hash` - The hash of the block being queried.
	/// * `keys` - Slice of storage keys to fetch (as byte slices to avoid unnecessary allocations)
	///
	/// # Returns
	/// A vector of optional values, in the same order as the input keys.
	///
	/// # Caching Behavior
	/// - Checks cache for all keys first
	/// - Only fetches uncached keys from RPC
	/// - Caches all fetched values (including empty ones)
	/// - Returns results in the same order as input keys
	pub async fn get_batch(
		&self,
		block_hash: H256,
		keys: &[&[u8]],
	) -> Result<Vec<Option<Vec<u8>>>, RemoteStorageError> {
		if keys.is_empty() {
			return Ok(vec![]);
		}

		// Check cache for all keys
		let cached_results = self.cache.get_storage_batch(block_hash, keys).await?;

		// Find which keys need to be fetched
		let mut uncached_indices: Vec<usize> = Vec::new();
		let mut uncached_keys: Vec<&[u8]> = Vec::new();

		for (i, cached) in cached_results.iter().enumerate() {
			if cached.is_none() {
				uncached_indices.push(i);
				uncached_keys.push(keys[i]);
			}
		}

		// If everything was cached, return immediately
		if uncached_keys.is_empty() {
			return Ok(cached_results.into_iter().map(|c| c.flatten()).collect());
		}

		// Fetch uncached keys from RPC (with reconnect-retry)
		let fetched_values = match self.rpc.storage_batch(&uncached_keys, block_hash).await {
			Ok(v) => v,
			Err(_) => {
				self.rpc.reconnect().await?;
				self.rpc.storage_batch(&uncached_keys, block_hash).await?
			},
		};

		// Cache fetched values
		let cache_entries: Vec<(&[u8], Option<&[u8]>)> = uncached_keys
			.iter()
			.zip(fetched_values.iter())
			.map(|(k, v)| (*k, v.as_deref()))
			.collect();

		if !cache_entries.is_empty() {
			self.cache.set_storage_batch(block_hash, &cache_entries).await?;
		}

		// Build final result, merging cached and fetched values
		let mut results: Vec<Option<Vec<u8>>> =
			cached_results.into_iter().map(|c| c.flatten()).collect();

		for (i, idx) in uncached_indices.into_iter().enumerate() {
			results[idx] = fetched_values[i].clone();
		}

		Ok(results)
	}

	/// Prefetch a range of storage keys by prefix (resumable).
	///
	/// Fetches all keys matching the prefix and caches their values.
	/// This operation is resumable - if interrupted, calling it again will
	/// continue from where it left off.
	///
	/// # Arguments
	/// * `block_hash`.
	/// * `prefix` - Storage key prefix to match
	/// * `page_size` - Number of keys to fetch per RPC call
	///
	/// # Returns
	/// The total number of keys for this prefix (including previously cached).
	pub async fn prefetch_prefix(
		&self,
		block_hash: H256,
		prefix: &[u8],
		page_size: u32,
	) -> Result<usize, RemoteStorageError> {
		// Check existing progress
		let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;

		if let Some(ref p) = progress &&
			p.is_complete
		{
			// Already done - return cached count
			return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
		}

		// Resume from last scanned key if we have progress
		let mut start_key = progress.and_then(|p| p.last_scanned_key);

		loop {
			// Get next page of keys (with reconnect-retry)
			let keys = match self
				.rpc
				.storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
				.await
			{
				Ok(v) => v,
				Err(_) => {
					self.rpc.reconnect().await?;
					self.rpc
						.storage_keys_paged(prefix, page_size, start_key.as_deref(), block_hash)
						.await?
				},
			};

			if keys.is_empty() {
				// No keys found - mark as complete if this is the first page
				if start_key.is_none() {
					// Empty prefix, mark complete with empty marker
					self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
				}
				break;
			}

			// Determine pagination state before consuming keys
			let is_last_page = keys.len() < page_size as usize;

			// Fetch values for these keys (with reconnect-retry)
			let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
			let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
				Ok(v) => v,
				Err(_) => {
					self.rpc.reconnect().await?;
					self.rpc.storage_batch(&key_refs, block_hash).await?
				},
			};

			// Cache all key-value pairs
			let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
				key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();

			self.cache.set_storage_batch(block_hash, &cache_entries).await?;

			// Update progress with the last key from this page.
			// We consume keys here to avoid cloning for the next iteration's start_key.
			let last_key = keys.into_iter().last();
			if let Some(ref key) = last_key {
				self.cache.update_prefix_scan(block_hash, prefix, key, is_last_page).await?;
			}

			if is_last_page {
				break;
			}

			// Set up for next page (last_key is already owned, no extra allocation)
			start_key = last_key;
		}

		// Return total count (includes any previously cached keys)
		Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?)
	}

	/// Fetch a single page of keys for a prefix and cache their values.
	///
	/// Unlike [`prefetch_prefix`](Self::prefetch_prefix), this fetches only the first
	/// page of keys (up to `page_size`) without looping through subsequent pages.
	/// This keeps the cost bounded regardless of how many keys exist under the prefix.
	///
	/// Records scan progress so that subsequent calls to `prefetch_prefix` can
	/// resume from where this left off.
	pub async fn prefetch_prefix_single_page(
		&self,
		block_hash: H256,
		prefix: &[u8],
		page_size: u32,
	) -> Result<usize, RemoteStorageError> {
		// Check existing progress
		let progress = self.cache.get_prefix_scan_progress(block_hash, prefix).await?;

		if let Some(ref p) = progress {
			if p.is_complete {
				return Ok(self.cache.count_keys_by_prefix(block_hash, prefix).await?);
			}
			// A scan is already in progress (from a concurrent call or prior run),
			// don't start another one.
			return Ok(0);
		}

		// Fetch first page of keys (with reconnect-retry)
		let keys = match self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await {
			Ok(v) => v,
			Err(_) => {
				self.rpc.reconnect().await?;
				self.rpc.storage_keys_paged(prefix, page_size, None, block_hash).await?
			},
		};

		if keys.is_empty() {
			self.cache.update_prefix_scan(block_hash, prefix, prefix, true).await?;
			return Ok(0);
		}

		let is_last_page = keys.len() < page_size as usize;

		// Fetch values for these keys (with reconnect-retry)
		let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
		let values = match self.rpc.storage_batch(&key_refs, block_hash).await {
			Ok(v) => v,
			Err(_) => {
				self.rpc.reconnect().await?;
				self.rpc.storage_batch(&key_refs, block_hash).await?
			},
		};

		// Cache all key-value pairs
		let cache_entries: Vec<(&[u8], Option<&[u8]>)> =
			key_refs.iter().zip(values.iter()).map(|(k, v)| (*k, v.as_deref())).collect();

		self.cache.set_storage_batch(block_hash, &cache_entries).await?;

		let count = keys.len();
		if let Some(last_key) = keys.into_iter().last() {
			self.cache
				.update_prefix_scan(block_hash, prefix, &last_key, is_last_page)
				.await?;
		}

		Ok(count)
	}

	/// Get all keys for a prefix, fetching from RPC if not fully cached.
	///
	/// This is a convenience method that:
	/// 1. Ensures the prefix is fully scanned (calls [`Self::prefetch_prefix`] if needed)
	/// 2. Returns all cached keys matching the prefix
	///
	/// Useful for enumerating all entries in a storage map (e.g., all accounts
	/// in a balances pallet).
	///
	/// # Arguments
	/// * `block_hash` - Block hash to query at
	/// * `prefix` - Storage key prefix to match (typically a pallet + storage item prefix)
	///
	/// # Returns
	/// All keys matching the prefix at the specified block hash.
	///
	/// # Performance
	/// First call may be slow if the prefix hasn't been scanned yet.
	/// Subsequent calls return cached data immediately.
	pub async fn get_keys(
		&self,
		block_hash: H256,
		prefix: &[u8],
	) -> Result<Vec<Vec<u8>>, RemoteStorageError> {
		// Ensure prefix is fully scanned
		self.prefetch_prefix(block_hash, prefix, DEFAULT_PREFETCH_PAGE_SIZE).await?;

		// Return from cache
		Ok(self.cache.get_keys_by_prefix(block_hash, prefix).await?)
	}

	/// Fetch a block by number from the remote RPC and cache it.
	///
	/// This method fetches the block data for the given block number and caches
	/// the block metadata in the cache.
	///
	/// # Arguments
	/// * `block_number` - The block number to fetch and cache
	///
	/// # Returns
	/// * `Ok(Some(block_row))` - Block was fetched and cached successfully
	/// * `Ok(None)` - Block number doesn't exist
	/// * `Err(_)` - RPC or cache error
	///
	/// # Caching Behavior
	/// - Fetches block hash and data from block number using `chain_getBlockHash` and
	///   `chain_getBlock`
	/// - Caches block metadata (hash, number, parent_hash, header) in the cache
	/// - If block is already cached, this will update the cache entry
	pub async fn fetch_and_cache_block_by_number(
		&self,
		block_number: u32,
	) -> Result<Option<BlockRow>, RemoteStorageError> {
		// Get block hash and full block data in one call
		let (block_hash, block) = match self.rpc.block_by_number(block_number).await? {
			Some((hash, block)) => (hash, block),
			None => return Ok(None),
		};

		// Extract header and parent hash
		let header = block.header;
		let parent_hash = header.parent_hash;
		let header_encoded = header.encode();

		// Cache the block
		self.cache
			.cache_block(block_hash, block_number, parent_hash, &header_encoded)
			.await?;

		// Return the cached block row
		Ok(Some(BlockRow {
			hash: block_hash.as_bytes().to_vec(),
			number: block_number as i64,
			parent_hash: parent_hash.as_bytes().to_vec(),
			header: header_encoded,
		}))
	}

	/// Get the next key after the given key that starts with the prefix.
	///
	/// This method is used for key enumeration during runtime execution.
	/// Before hitting the RPC, it checks whether a complete prefix scan exists
	/// in the cache for the queried prefix (or parent prefixes at 32 or 16 bytes).
	/// If so, the answer is served from the local SQLite cache, avoiding an RPC
	/// round-trip entirely.
	///
	/// # Arguments
	/// * `block_hash` - Block hash to query at
	/// * `prefix` - Storage key prefix to match
	/// * `key` - The current key; returns the next key after this one
	///
	/// # Returns
	/// * `Ok(Some(key))` - The next key after `key` that starts with `prefix`
	/// * `Ok(None)` - No more keys with this prefix
	pub async fn next_key(
		&self,
		block_hash: H256,
		prefix: &[u8],
		key: &[u8],
	) -> Result<Option<Vec<u8>>, RemoteStorageError> {
		// Check if we have a complete prefix scan that covers this query.
		// Try the exact prefix first, then common parent lengths (32-byte = pallet+item,
		// 16-byte = pallet-only).
		let candidate_lengths: &[usize] = &[prefix.len(), 32, 16];
		for &len in candidate_lengths {
			if len > prefix.len() {
				continue;
			}
			let candidate = &prefix[..len];
			if let Some(progress) =
				self.cache.get_prefix_scan_progress(block_hash, candidate).await? &&
				progress.is_complete
			{
				self.stats.next_key_cache.fetch_add(1, Ordering::Relaxed);
				return Ok(self.cache.next_key_from_cache(block_hash, prefix, key).await?);
			}
		}

		// Fallback: fetch from RPC (with reconnect-retry)
		self.stats.next_key_rpc.fetch_add(1, Ordering::Relaxed);
		let keys = match self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await {
			Ok(v) => v,
			Err(_) => {
				self.rpc.reconnect().await?;
				self.rpc.storage_keys_paged(prefix, 1, Some(key), block_hash).await?
			},
		};
		Ok(keys.into_iter().next())
	}

	// ============================================================================
	// Block and header fetching methods
	// ============================================================================
	// These methods provide access to block data from the remote chain,
	// allowing Blockchain to delegate remote queries without directly
	// interfacing with ForkRpcClient.

	/// Get block body (extrinsics) by hash from the remote chain.
	///
	/// # Returns
	/// * `Ok(Some(extrinsics))` - Block found, returns list of encoded extrinsics
	/// * `Ok(None)` - Block not found
	pub async fn block_body(&self, hash: H256) -> Result<Option<Vec<Vec<u8>>>, RemoteStorageError> {
		match self.rpc.block_by_hash(hash).await? {
			Some(block) => {
				let extrinsics = block.extrinsics.into_iter().map(|ext| ext.0.to_vec()).collect();
				Ok(Some(extrinsics))
			},
			None => Ok(None),
		}
	}

	/// Get block header by hash from the remote chain.
	///
	/// # Returns
	/// * `Ok(Some(header_bytes))` - Encoded header bytes
	/// * `Ok(None)` - Block not found on the remote chain
	/// * `Err(..)` - Transport/connection error (caller should retry or reconnect)
	pub async fn block_header(&self, hash: H256) -> Result<Option<Vec<u8>>, RemoteStorageError> {
		match self.rpc.header(hash).await {
			Ok(header) => Ok(Some(header.encode())),
			// Header not found (RPC returned null): legitimate "not found"
			Err(RpcClientError::InvalidResponse(_)) => Ok(None),
			// Connection/transport errors must be propagated so callers can reconnect
			Err(e) => Err(e.into()),
		}
	}

	/// Get block hash by block number from the remote chain.
	///
	/// # Returns
	/// * `Ok(Some(hash))` - Block hash at the given number
	/// * `Ok(None)` - Block number not found
	pub async fn block_hash_by_number(
		&self,
		block_number: u32,
	) -> Result<Option<H256>, RemoteStorageError> {
		Ok(self.rpc.block_hash_at(block_number).await?)
	}

	/// Get block number by hash from the remote chain.
	///
	/// This method checks the persistent SQLite cache first before hitting RPC.
	/// Results are cached for future lookups.
	///
	/// # Returns
	/// * `Ok(Some(number))` - Block number for the given hash
	/// * `Ok(None)` - Block not found
	pub async fn block_number_by_hash(
		&self,
		hash: H256,
	) -> Result<Option<u32>, RemoteStorageError> {
		// Check cache first
		if let Some(block) = self.cache.get_block(hash).await? {
			return Ok(Some(block.number as u32));
		}

		// Fetch from RPC
		match self.rpc.block_by_hash(hash).await? {
			Some(block) => {
				let number = block.header.number;
				let parent_hash = block.header.parent_hash;
				let header_encoded = block.header.encode();

				// Cache for future lookups
				self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;

				Ok(Some(number))
			},
			None => Ok(None),
		}
	}

	/// Get parent hash of a block from the remote chain.
	///
	/// This method checks the persistent SQLite cache first before hitting RPC.
	/// Results are cached for future lookups.
	///
	/// # Returns
	/// * `Ok(Some(parent_hash))` - Parent hash of the block
	/// * `Ok(None)` - Block not found
	pub async fn parent_hash(&self, hash: H256) -> Result<Option<H256>, RemoteStorageError> {
		// Check cache first
		if let Some(block) = self.cache.get_block(hash).await? {
			let parent_hash = H256::from_slice(&block.parent_hash);
			return Ok(Some(parent_hash));
		}

		// Fetch from RPC
		match self.rpc.block_by_hash(hash).await? {
			Some(block) => {
				let number = block.header.number;
				let parent_hash = block.header.parent_hash;
				let header_encoded = block.header.encode();

				// Cache for future lookups
				self.cache.cache_block(hash, number, parent_hash, &header_encoded).await?;

				Ok(Some(parent_hash))
			},
			None => Ok(None),
		}
	}

	/// Get full block data (hash and block) by number from the remote chain.
	///
	/// # Returns
	/// * `Ok(Some((hash, block)))` - Block found
	/// * `Ok(None)` - Block number not found
	pub async fn block_by_number(
		&self,
		block_number: u32,
	) -> Result<
		Option<(H256, subxt::backend::legacy::rpc_methods::Block<subxt::SubstrateConfig>)>,
		RemoteStorageError,
	> {
		Ok(self.rpc.block_by_number(block_number).await?)
	}

	/// Get the latest finalized block hash from the remote chain.
	pub async fn finalized_head(&self) -> Result<H256, RemoteStorageError> {
		Ok(self.rpc.finalized_head().await?)
	}

	/// Get decoded metadata at a specific block from the remote chain.
	pub async fn metadata(&self, block_hash: H256) -> Result<Metadata, RemoteStorageError> {
		Ok(self.rpc.metadata(block_hash).await?)
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn error_display_rpc() {
		use crate::error::RpcClientError;
		let inner = RpcClientError::InvalidResponse("test".to_string());
		let err = RemoteStorageError::Rpc(inner);
		assert!(err.to_string().contains("RPC error"));
	}

	#[test]
	fn error_display_cache() {
		use crate::error::CacheError;
		let inner = CacheError::DataCorruption("test".to_string());
		let err = RemoteStorageError::Cache(inner);
		assert!(err.to_string().contains("Cache error"));
	}

	/// Integration tests that spawn local test nodes.
	///
	/// These tests are run sequentially via nextest configuration to avoid
	/// concurrent node downloads causing race conditions.
	mod sequential {
		use crate::testing::{
			TestContext,
			constants::{SYSTEM_NUMBER_KEY, SYSTEM_PALLET_PREFIX, SYSTEM_PARENT_HASH_KEY},
		};
		use std::time::Duration;

		#[tokio::test(flavor = "multi_thread")]
		async fn get_fetches_and_caches() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();

			// First call should fetch from RPC and cache
			let value1 = layer.get(block_hash, &key).await.unwrap();
			assert!(value1.is_some(), "System::Number should exist");

			// Verify it was cached
			let cached = layer.cache().get_storage(block_hash, &key).await.unwrap();
			assert!(cached.is_some(), "Value should be cached after first get");
			assert_eq!(cached.unwrap(), value1);

			// Second call should return cached value (same result)
			let value2 = layer.get(block_hash, &key).await.unwrap();
			assert_eq!(value1, value2);
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn get_caches_empty_values() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			// Use a key that definitely doesn't exist
			let nonexistent_key = b"this_key_definitely_does_not_exist_12345";

			// First call fetches from RPC - should be None
			let value = layer.get(block_hash, nonexistent_key).await.unwrap();
			assert!(value.is_none(), "Nonexistent key should return None");

			// Verify it was cached as empty (Some(None))
			let cached = layer.cache().get_storage(block_hash, nonexistent_key).await.unwrap();
			assert_eq!(cached, Some(None), "Empty value should be cached as Some(None)");
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn get_batch_fetches_mixed() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			let key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
			let key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();
			let key3 = b"nonexistent_key".to_vec();

			let keys: Vec<&[u8]> = vec![key1.as_slice(), key2.as_slice(), key3.as_slice()];

			let results = layer.get_batch(block_hash, &keys).await.unwrap();

			assert_eq!(results.len(), 3);
			assert!(results[0].is_some(), "System::Number should exist");
			assert!(results[1].is_some(), "System::ParentHash should exist");
			assert!(results[2].is_none(), "Nonexistent key should be None");

			// Verify all were cached
			for (i, key) in keys.iter().enumerate() {
				let cached = layer.cache().get_storage(block_hash, key).await.unwrap();
				assert!(cached.is_some(), "Key {} should be cached", i);
			}
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn get_batch_uses_cache() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			let key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
			let key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();

			// Pre-cache key1
			let value1 = layer.get(block_hash, &key1).await.unwrap();

			// Batch get with one cached and one uncached
			let keys: Vec<&[u8]> = vec![key1.as_slice(), key2.as_slice()];
			let results = layer.get_batch(block_hash, &keys).await.unwrap();

			assert_eq!(results.len(), 2);
			assert_eq!(results[0], value1, "Cached value should match");
			assert!(results[1].is_some(), "Uncached value should be fetched");
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn prefetch_prefix() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();

			// Prefetch all System storage items (page_size is the batch size per RPC call)
			let count = layer.prefetch_prefix(block_hash, &prefix, 5).await.unwrap();

			assert!(count > 0, "Should have prefetched some keys");

			// Verify some values were cached
			let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
			let cached = layer.cache().get_storage(block_hash, &key).await.unwrap();
			assert!(cached.is_some(), "Prefetched key should be cached");
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn layer_is_cloneable() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			// Clone the layer
			let layer2 = layer.clone();

			// Both should work and share the same cache
			let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();

			let value1 = layer.get(block_hash, &key).await.unwrap();
			let value2 = layer2.get(block_hash, &key).await.unwrap();

			assert_eq!(value1, value2);
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn accessor_methods() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();
			let block_hash = ctx.block_hash();

			// Test accessor methods
			assert!(!block_hash.is_zero());
			// Verify endpoint is a valid WebSocket URL (from our local test node)
			assert!(layer.rpc().endpoint().as_str().starts_with("ws://"));
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn fetch_and_cache_block_by_number_caches_block() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();

			// Get finalized block number
			let finalized_hash = layer.rpc().finalized_head().await.unwrap();
			let finalized_header = layer.rpc().header(finalized_hash).await.unwrap();
			let finalized_number = finalized_header.number;

			// Verify block is not in cache initially
			let cached = layer.cache().get_block_by_number(finalized_number).await.unwrap();

			assert!(cached.is_none());

			// Fetch and cache the block
			let result = layer.fetch_and_cache_block_by_number(finalized_number).await.unwrap();
			assert!(result.is_some());

			let block_row = result.unwrap();
			assert_eq!(block_row.number, finalized_number as i64);
			assert_eq!(block_row.hash.len(), 32);
			assert_eq!(block_row.parent_hash.len(), 32);
			assert!(!block_row.header.is_empty());

			// Verify it's now in cache
			let cached = layer.cache().get_block_by_number(finalized_number).await.unwrap();
			assert!(cached.is_some());

			let cached_block = cached.unwrap();
			assert_eq!(cached_block.number, block_row.number);
			assert_eq!(cached_block.hash, block_row.hash);
			assert_eq!(cached_block.parent_hash, block_row.parent_hash);
			assert_eq!(cached_block.header, block_row.header);
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn fetch_and_cache_block_by_number_non_existent() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();

			// Try to fetch a block that doesn't exist
			let non_existent_number = u32::MAX;
			let result = layer.fetch_and_cache_block_by_number(non_existent_number).await.unwrap();

			assert!(result.is_none(), "Non-existent block should return None");

			// Verify it's not in cache
			let cached = layer.cache().get_block_by_number(non_existent_number).await.unwrap();
			assert!(cached.is_none(), "Non-existent block should not be cached");
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn fetch_and_cache_block_by_number_multiple_blocks() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();

			// Wait for some blocks to be finalized
			std::thread::sleep(Duration::from_secs(30));

			// Get finalized block number
			let finalized_hash = layer.rpc().finalized_head().await.unwrap();
			let finalized_header = layer.rpc().header(finalized_hash).await.unwrap();
			let finalized_number = finalized_header.number;

			// Fetch and cache multiple blocks
			let max_blocks = finalized_number.min(3);
			for block_num in 0..=max_blocks {
				let result =
					layer.fetch_and_cache_block_by_number(block_num).await.unwrap().unwrap();

				assert_eq!(result.number, block_num as i64);

				// Verify in cache
				let cached = layer.cache().get_block_by_number(block_num).await.unwrap().unwrap();
				assert_eq!(cached.number, result.number);
				assert_eq!(cached.hash, result.hash);
			}
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn fetch_and_cache_block_by_number_idempotent() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();

			let block_number = 0u32;

			// Fetch and cache the block twice
			let result1 =
				layer.fetch_and_cache_block_by_number(block_number).await.unwrap().unwrap();
			let result2 =
				layer.fetch_and_cache_block_by_number(block_number).await.unwrap().unwrap();

			// Both results should be identical
			assert_eq!(result1.number, result2.number);
			assert_eq!(result1.hash, result2.hash);
			assert_eq!(result1.parent_hash, result2.parent_hash);
			assert_eq!(result1.header, result2.header);
		}

		#[tokio::test(flavor = "multi_thread")]
		async fn fetch_and_cache_block_by_number_verifies_parent_chain() {
			let ctx = TestContext::for_remote().await;
			let layer = ctx.remote();

			// Wait for some blocks to be finalized
			std::thread::sleep(Duration::from_secs(30));

			// Get finalized block number
			let finalized_hash = layer.rpc().finalized_head().await.unwrap();
			let finalized_header = layer.rpc().header(finalized_hash).await.unwrap();
			let finalized_number = finalized_header.number;

			// Fetch consecutive blocks and verify parent hash chain
			let max_blocks = finalized_number.min(3);
			let mut previous_hash: Option<Vec<u8>> = None;

			for block_num in 0..=max_blocks {
				let block_row =
					layer.fetch_and_cache_block_by_number(block_num).await.unwrap().unwrap();

				// Verify parent hash matches previous block hash (except for genesis)
				if let Some(prev_hash) = previous_hash {
					assert_eq!(
						block_row.parent_hash,
						prev_hash,
						"Block {} parent hash should match block {} hash",
						block_num,
						block_num - 1
					);
				}

				previous_hash = Some(block_row.hash.clone());
			}
		}
	}
}