Skip to main content

pop_fork/
local.rs

1// SPDX-License-Identifier: GPL-3.0
2
3//! Local storage layer for tracking modifications to forked state.
4//!
5//! This module provides the [`LocalStorageLayer`] which sits on top of a [`RemoteStorageLayer`]
6//! and tracks local modifications without mutating the underlying cached state. This enables
7//! transactional semantics where changes can be committed, discarded, or merged.
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │                    LocalStorageLayer                             │
14//! │                                                                   │
15//! │   get(key) ─────► Modified? ──── Yes ────► Return modified value│
16//! │                        │                                          │
17//! │                        No                                         │
18//! │                        │                                          │
19//! │                        ▼                                          │
20//! │                 Prefix deleted? ── Yes ───► Return None          │
21//! │                        │                                          │
22//! │                        No                                         │
23//! │                        │                                          │
24//! │                        ▼                                          │
25//! │                 Query parent layer                                │
26//! │                        │                                          │
27//! │                        ▼                                          │
28//! │                 Return value                                      │
29//! └─────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Example
33//!
34//! ```ignore
35//! use pop_fork::{LocalStorageLayer, RemoteStorageLayer};
36//!
37//! let remote = RemoteStorageLayer::new(rpc, cache, block_hash);
38//! let local = LocalStorageLayer::new(remote);
39//!
40//! // Set a value locally (doesn't affect remote/cache)
41//! local.set(&key, Some(&value))?;
42//!
43//! // Read returns the modified value
44//! let value = local.get(&key).await?;
45//!
46//! // Delete all keys with a prefix
47//! local.delete_prefix(&prefix)?;
48//! ```
49
50use crate::{error::LocalStorageError, models::BlockRow, remote::RemoteStorageLayer};
51use std::{
52	collections::{BTreeMap, HashMap},
53	sync::{Arc, RwLock},
54};
55use subxt::{Metadata, config::substrate::H256};
56
57const ONE_BLOCK: u32 = 1;
58
59/// A value that can be shared accross different local storage layer instances
60#[derive(Debug, PartialEq)]
61pub struct LocalSharedValue {
62	last_modification_block: u32,
63	/// The actual value
64	pub value: Option<Vec<u8>>,
65}
66
67static EMPTY_MEMORY: [u8; 0] = [];
68impl AsRef<[u8]> for LocalSharedValue {
69	/// AsRef implementation to get the value bytes of this local shared value
70	fn as_ref(&self) -> &[u8] {
71		match &self.value {
72			Some(value) => value.as_ref(),
73			None => &EMPTY_MEMORY,
74		}
75	}
76}
77
78type SharedValue = Arc<LocalSharedValue>;
79type Modifications = HashMap<Vec<u8>, Option<SharedValue>>;
80type DeletedPrefixes = Vec<Vec<u8>>;
81type DiffLocalStorage = Vec<(Vec<u8>, Option<SharedValue>)>;
82/// Maps block number (when metadata became valid) to metadata.
83/// Used to track metadata versions across runtime upgrades.
84type MetadataVersions = BTreeMap<u32, Arc<Metadata>>;
85
86/// Local storage layer that tracks modifications on top of a remote layer.
87///
88/// Provides transactional semantics: modifications are tracked locally without
89/// affecting the underlying remote layer or cache. Changes can be inspected via
90/// [`diff`](Self::diff).
91///
92/// # Block-based Storage Strategy
93///
94/// - `latest_block_number`: Current working block number (modifications in HashMap)
95/// - Keys queried at a block higher than the last modification of that key, queried at
96///   modifications HashMap
97/// - Blocks between first_forked_block_number and latest_block_number are in local_storage table
98/// - Blocks before first_forked_block_number come from remote provider
99///
100/// # Cloning
101///
102/// `LocalStorageLayer` is cheap to clone. The underlying modifications and
103/// deleted prefixes use `Arc<RwLock<_>>`, so clones share the same state.
104///
105/// # Thread Safety
106///
107/// The layer is `Send + Sync` and can be shared across async tasks. All
108/// operations use `read`/`write` locks which will block until the lock is acquired.
109#[derive(Clone, Debug)]
110pub struct LocalStorageLayer {
111	parent: RemoteStorageLayer,
112	first_forked_block_hash: H256,
113	first_forked_block_number: u32,
114	current_block_number: u32,
115	modifications: Arc<RwLock<Modifications>>,
116	deleted_prefixes: Arc<RwLock<DeletedPrefixes>>,
117	/// Metadata versions indexed by the block number when they became valid.
118	/// Enables looking up the correct metadata for any block in the fork.
119	metadata_versions: Arc<RwLock<MetadataVersions>>,
120}
121
122impl LocalStorageLayer {
123	/// Create a new local storage layer.
124	///
125	/// # Arguments
126	/// * `parent` - The remote storage layer to use as the base state
127	/// * `first_forked_block_number` - The initial block number where the fork started
128	/// * `first_forked_block_hash` - The hash of the first forked block
129	/// * `metadata` - The runtime metadata at the fork point
130	///
131	/// # Returns
132	/// A new `LocalStorageLayer` with no modifications, with `current_block_number` set to
133	/// `first_forked_block_number + 1` (the block being built).
134	pub fn new(
135		parent: RemoteStorageLayer,
136		first_forked_block_number: u32,
137		first_forked_block_hash: H256,
138		metadata: Metadata,
139	) -> Self {
140		let mut metadata_versions = BTreeMap::new();
141		metadata_versions.insert(first_forked_block_number, Arc::new(metadata));
142
143		Self {
144			parent,
145			first_forked_block_hash,
146			first_forked_block_number,
147			current_block_number: first_forked_block_number + 1, /* current_block_number is the
148			                                                      * one to be produced. */
149			modifications: Arc::new(RwLock::new(HashMap::new())),
150			deleted_prefixes: Arc::new(RwLock::new(Vec::new())),
151			metadata_versions: Arc::new(RwLock::new(metadata_versions)),
152		}
153	}
154
155	/// Fetch and cache a block if it's not already in the cache.
156	///
157	/// This is a helper method used by `get` and `get_batch` to ensure blocks are
158	/// available in the cache before querying them.
159	///
160	/// # Arguments
161	/// * `block_number` - The block number to fetch and cache
162	///
163	/// # Returns
164	/// * `Ok(Some(block_row))` - Block is now in cache (either was already cached or just fetched)
165	/// * `Ok(None)` - Block number doesn't exist
166	/// * `Err(_)` - RPC or cache error
167	///
168	/// # Behavior
169	/// - First checks if block is already in cache
170	/// - If not cached, fetches from remote RPC and caches it
171	/// - If block doesn't exist, returns None
172	async fn get_block(&self, block_number: u32) -> Result<Option<BlockRow>, LocalStorageError> {
173		// First check if block is already in cache
174		if let Some(cached_block) = self.parent.cache().get_block_by_number(block_number).await? {
175			return Ok(Some(cached_block));
176		}
177
178		// Not in cache, fetch from remote and cache it
179		Ok(self.parent.fetch_and_cache_block_by_number(block_number).await?)
180	}
181
182	/// Get the current block number.
183	pub fn get_current_block_number(&self) -> u32 {
184		self.current_block_number
185	}
186
187	/// Get a reference to the underlying storage cache.
188	///
189	/// This provides access to the cache for operations like clearing local storage.
190	pub fn cache(&self) -> &crate::StorageCache {
191		self.parent.cache()
192	}
193
194	/// Get the underlying remote storage layer.
195	///
196	/// This provides access to the remote layer for operations that need to
197	/// fetch data directly from the remote chain (e.g., block headers, bodies).
198	/// The remote layer maintains a persistent connection to the RPC endpoint.
199	pub fn remote(&self) -> &crate::RemoteStorageLayer {
200		&self.parent
201	}
202
203	/// Get the hash of the first forked block.
204	///
205	/// This is the block hash at which the fork was created, used for querying
206	/// storage keys on the remote chain.
207	pub fn fork_block_hash(&self) -> H256 {
208		self.first_forked_block_hash
209	}
210
211	/// Get the metadata valid at a specific block number.
212	///
213	/// For blocks at or after the fork point, returns metadata from the local version tree.
214	/// For blocks before the fork point, fetches metadata from the remote node.
215	///
216	/// # Arguments
217	/// * `block_number` - The block number to get metadata for
218	///
219	/// # Returns
220	/// * `Ok(Arc<Metadata>)` - The metadata valid at the given block
221	/// * `Err(_)` - Lock error, RPC error, or metadata decode error
222	pub async fn metadata_at(&self, block_number: u32) -> Result<Arc<Metadata>, LocalStorageError> {
223		// For blocks before the fork point, fetch from remote
224		if block_number < self.first_forked_block_number {
225			return self.fetch_remote_metadata(block_number).await;
226		}
227
228		// For blocks at or after fork point, use local version tree
229		let versions = self
230			.metadata_versions
231			.read()
232			.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
233
234		versions
235			.range(..=block_number)
236			.next_back()
237			.map(|(_, metadata)| Arc::clone(metadata))
238			.ok_or_else(|| {
239				LocalStorageError::MetadataNotFound(format!(
240					"No metadata found for block {}",
241					block_number
242				))
243			})
244	}
245
246	/// Fetch metadata from the remote node, usually used for a pre-fork block.
247	async fn fetch_remote_metadata(
248		&self,
249		block_number: u32,
250	) -> Result<Arc<Metadata>, LocalStorageError> {
251		// Get block hash for this block number
252		let block_hash = self.parent.rpc().block_hash_at(block_number).await?.ok_or_else(|| {
253			LocalStorageError::MetadataNotFound(format!(
254				"Block {} not found on remote node",
255				block_number
256			))
257		})?;
258
259		// Fetch and decode metadata from remote
260		let metadata = self.parent.rpc().metadata(block_hash).await?;
261
262		Ok(Arc::new(metadata))
263	}
264
265	/// Register a new metadata version starting at the given block number.
266	///
267	/// This should be called when a runtime upgrade occurs (`:code` storage key changes)
268	/// to record that a new metadata version is now active.
269	///
270	/// # Arguments
271	/// * `block_number` - The block number where this metadata becomes valid
272	/// * `metadata` - The new runtime metadata
273	///
274	/// # Returns
275	/// * `Ok(())` - Metadata version registered successfully
276	/// * `Err(_)` - Lock error
277	pub fn register_metadata_version(
278		&self,
279		block_number: u32,
280		metadata: Metadata,
281	) -> Result<(), LocalStorageError> {
282		let mut versions = self
283			.metadata_versions
284			.write()
285			.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
286
287		versions.insert(block_number, Arc::new(metadata));
288		Ok(())
289	}
290
291	/// Check if the `:code` storage key was modified at the specified block.
292	///
293	/// This is used to detect runtime upgrades. When a runtime upgrade occurs in block X,
294	/// the new runtime is used starting from block X+1. So when building X+1, we check
295	/// if code changed in X (the parent) to determine if we're now using a new runtime.
296	///
297	/// # Arguments
298	/// * `block_number` - The block number to check for code modifications
299	///
300	/// # Returns
301	/// * `Ok(true)` - The `:code` key was modified at the specified block
302	/// * `Ok(false)` - The `:code` key was not modified at the specified block
303	/// * `Err(_)` - Lock error
304	pub fn has_code_changed_at(&self, block_number: u32) -> Result<bool, LocalStorageError> {
305		let modifications =
306			self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
307
308		// The well-known `:code` storage key
309		let code_key = sp_core::storage::well_known_keys::CODE;
310
311		// Check if :code was modified at the specified block
312		Ok(modifications.get(code_key).is_some_and(|value| {
313			value.as_ref().is_some_and(|v| v.last_modification_block == block_number)
314		}))
315	}
316
317	/// Return the value of the specified key at height `block_number` if the value contained in the
318	/// local modifications is valid at that height.
319	///
320	/// # Arguments
321	/// - `key`
322	/// - `block_number`
323	fn get_local_modification(
324		&self,
325		key: &[u8],
326		block_number: u32,
327	) -> Result<Option<Option<SharedValue>>, LocalStorageError> {
328		let latest_block_number = self.get_current_block_number();
329		let modifications_lock =
330			self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
331		let deleted_prefixes_lock = self
332			.deleted_prefixes
333			.read()
334			.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
335
336		match modifications_lock.get(key) {
337			local_modification @ Some(Some(shared_value))
338				if latest_block_number == block_number ||
339					shared_value.last_modification_block < block_number =>
340				Ok(local_modification.cloned()), /* <- Cheap clone as it's Option<Option<Arc<_>>> */
341			None if deleted_prefixes_lock
342				.iter()
343				.any(|prefix| key.starts_with(prefix.as_slice())) =>
344				Ok(Some(None)),
345			_ => Ok(None),
346		}
347	}
348
349	/// Get a storage value, checking local modifications first.
350	///
351	/// # Arguments
352	/// * `key` - The storage key to fetch
353	///
354	/// # Returns
355	/// * `Ok(Some(value))` - Value exists (either modified locally, in local_storage, or in parent)
356	/// * `Ok(None)` - Key doesn't exist or was deleted via prefix deletion
357	/// * `Err(_)` - Lock error or parent layer error
358	///
359	/// # Behavior
360	/// Storage lookup strategy based on block_number:
361	/// 1. If `block_number == latest_block_number` or the key in local modifications is valid for
362	///    this block: Check modifications HashMap, then remote at first_forked_block
363	/// 2. If `first_forked_block_number < block_number < latest_block_number`: Check local_storage
364	///    table
365	/// 3. Otherwise: Check remote provider directly (fetches block_hash from blocks table)
366	pub async fn get(
367		&self,
368		block_number: u32,
369		key: &[u8],
370	) -> Result<Option<SharedValue>, LocalStorageError> {
371		let latest_block_number = self.get_current_block_number();
372
373		// First check if the key has a local modification
374		if let local_modification @ Ok(Some(_)) = self.get_local_modification(key, block_number) {
375			return local_modification.map(|local_modification| local_modification.flatten());
376		}
377
378		// Case 1: Query for latest block - check modifications, then remote at first_forked_block
379		if block_number == latest_block_number {
380			// Not in modifications, query remote at first_forked_block
381			return Ok(self
382				.parent
383				.get(self.first_forked_block_hash, key)
384				.await?
385				.map(|value| LocalSharedValue {
386					last_modification_block: 0, /* <- We don't care about the validity block for
387					                             * this value as it came from the remote layer */
388					value: Some(value),
389				})
390				.map(Arc::new));
391		}
392
393		// Case 2: Historical block after fork such that the local modification is still valid -
394		// check the local modifications map, otherwise fallback to cache and finally remote layer.
395		if block_number > self.first_forked_block_number && block_number < latest_block_number {
396			// Try to get value from local_values table using validity ranges
397			// get_local_value_at_block returns Option<Option<Vec<u8>>>:
398			// - None = no row found
399			// - Some(None) = row found but value is NULL (deleted)
400			// - Some(Some(value)) = row found with data
401			let value = if let Some(local_value) =
402				self.parent.cache().get_local_value_at_block(key, block_number).await?
403			{
404				local_value
405			}
406			// Not found in local storage, try remote at first_forked_block
407			else if let Some(remote_value) =
408				self.parent.get(self.first_forked_block_hash, key).await?
409			{
410				Some(remote_value)
411			} else {
412				return Ok(None);
413			};
414
415			return Ok(Some(Arc::new(LocalSharedValue {
416				last_modification_block: 0, /* <- Value came from remote or cache layer */
417				value,
418			})));
419		}
420
421		// Case 3: Block before or at fork point
422		let block = self.get_block(block_number).await?;
423
424		if let Some(block_row) = block {
425			let block_hash = H256::from_slice(&block_row.hash);
426			Ok(self
427				.parent
428				.get(block_hash, key)
429				.await?
430				.map(|value| LocalSharedValue {
431					last_modification_block: 0, /* <- We don't care about the validity block of
432					                             * this value as it came from the remote layer */
433					value: Some(value),
434				})
435				.map(Arc::new))
436		} else {
437			// Block not found
438			Ok(None)
439		}
440	}
441
442	/// Get the next key after the given key that starts with the prefix.
443	///
444	/// # Arguments
445	/// * `prefix` - Storage key prefix to match
446	/// * `key` - The current key; returns the next key after this one
447	///
448	/// # Returns
449	/// * `Ok(Some(key))` - The next key after `key` that starts with `prefix`
450	/// * `Ok(None)` - No more keys with this prefix
451	/// * `Err(_)` - Lock error or parent layer error
452	///
453	/// # Behavior
454	/// 1. Queries the parent layer for the next key
455	/// 2. Skips keys that match deleted prefixes
456	/// 3. Does not consider locally modified keys (they are transient)
457	///
458	/// # Note
459	/// This method currently delegates directly to the parent layer.
460	/// Locally modified keys are not included in key enumeration since
461	/// they represent uncommitted changes.
462	pub async fn next_key(
463		&self,
464		prefix: &[u8],
465		key: &[u8],
466	) -> Result<Option<Vec<u8>>, LocalStorageError> {
467		// Clone deleted prefixes upfront - we can't hold the lock across await points
468		let deleted_prefixes = self
469			.deleted_prefixes
470			.read()
471			.map_err(|e| LocalStorageError::Lock(e.to_string()))?
472			.clone();
473
474		// Query parent and skip deleted keys
475		let mut current_key = key.to_vec();
476		loop {
477			let next =
478				self.parent.next_key(self.first_forked_block_hash, prefix, &current_key).await?;
479			match next {
480				Some(next_key) => {
481					// Check if this key matches any deleted prefix
482					if deleted_prefixes
483						.iter()
484						.any(|deleted| next_key.starts_with(deleted.as_slice()))
485					{
486						// Skip this key and continue searching
487						current_key = next_key;
488						continue;
489					}
490					return Ok(Some(next_key));
491				},
492				None => return Ok(None),
493			}
494		}
495	}
496
497	/// Enumerate all keys matching a prefix, merging remote and local state.
498	///
499	/// This method combines keys from the remote layer (at the fork point) with
500	/// locally modified keys, producing a sorted, deduplicated list of keys that
501	/// exist at the specified fork-local block.
502	///
503	/// For the latest block, uses the in-memory `modifications` and
504	/// `deleted_prefixes` snapshots. For historical fork-local blocks, queries
505	/// the persisted local values in the cache to reconstruct the key set that
506	/// existed at that block.
507	///
508	/// Keys that were deleted locally (either individually via `set(key, None)`
509	/// or via `delete_prefix`) are excluded.
510	pub async fn keys_by_prefix(
511		&self,
512		prefix: &[u8],
513		block_number: u32,
514	) -> Result<Vec<Vec<u8>>, LocalStorageError> {
515		// 1. Get remote keys at the fork point.
516		let remote_keys = self.parent.get_keys(self.first_forked_block_hash, prefix).await?;
517
518		let latest_block_number = self.get_current_block_number();
519
520		if block_number >= latest_block_number {
521			// Latest block: use in-memory modifications (fast path).
522			self.merge_keys_with_in_memory(remote_keys, prefix)
523		} else {
524			// Historical fork-local block: query persisted local values from cache.
525			self.merge_keys_with_cache(remote_keys, prefix, block_number).await
526		}
527	}
528
529	/// Merge remote keys with in-memory modifications for the latest block.
530	fn merge_keys_with_in_memory(
531		&self,
532		remote_keys: Vec<Vec<u8>>,
533		prefix: &[u8],
534	) -> Result<Vec<Vec<u8>>, LocalStorageError> {
535		let modifications = self
536			.modifications
537			.read()
538			.map_err(|e| LocalStorageError::Lock(e.to_string()))?
539			.clone();
540		let deleted_prefixes = self
541			.deleted_prefixes
542			.read()
543			.map_err(|e| LocalStorageError::Lock(e.to_string()))?
544			.clone();
545
546		let is_deleted = |key: &[u8]| -> bool {
547			deleted_prefixes.iter().any(|dp| key.starts_with(dp.as_slice()))
548		};
549
550		let is_locally_deleted = |key: &[u8]| -> bool {
551			modifications
552				.get::<[u8]>(key)
553				.and_then(|sv| sv.as_ref())
554				.is_some_and(|sv| sv.value.is_none())
555		};
556
557		let mut merged: std::collections::BTreeSet<Vec<u8>> = remote_keys
558			.into_iter()
559			.filter(|k| !is_deleted(k) && !is_locally_deleted(k))
560			.collect();
561
562		for (key, maybe_sv) in modifications.iter() {
563			if key.starts_with(prefix) && maybe_sv.as_ref().is_some_and(|sv| sv.value.is_some()) {
564				merged.insert(key.clone());
565			}
566		}
567
568		Ok(merged.into_iter().collect())
569	}
570
571	/// Merge remote keys with persisted cache data for a historical fork-local block.
572	async fn merge_keys_with_cache(
573		&self,
574		remote_keys: Vec<Vec<u8>>,
575		prefix: &[u8],
576		block_number: u32,
577	) -> Result<Vec<Vec<u8>>, LocalStorageError> {
578		let cache = self.parent.cache();
579
580		// Get keys that had non-NULL values at this block.
581		let local_live_keys = cache.get_local_keys_at_block(prefix, block_number).await?;
582
583		// Get keys that were explicitly deleted at this block.
584		let local_deleted_keys =
585			cache.get_local_deleted_keys_at_block(prefix, block_number).await?;
586
587		let deleted_set: std::collections::HashSet<Vec<u8>> =
588			local_deleted_keys.into_iter().collect();
589
590		// Start with remote keys, excluding those deleted locally at this block.
591		let mut merged: std::collections::BTreeSet<Vec<u8>> =
592			remote_keys.into_iter().filter(|k| !deleted_set.contains(k)).collect();
593
594		// Add locally-live keys.
595		merged.extend(local_live_keys);
596
597		Ok(merged.into_iter().collect())
598	}
599
600	/// Set a storage value locally.
601	///
602	/// # Arguments
603	/// * `key` - The storage key to set
604	/// * `value` - The value to set, or `None` to mark as deleted
605	///
606	/// # Returns
607	/// * `Ok(())` - Value was set successfully
608	/// * `Err(_)` - Lock error
609	///
610	/// # Behavior
611	/// - Does not affect the parent layer or underlying cache
612	/// - Overwrites any previous local modification for this key
613	/// - Passing `None` marks the key as explicitly deleted (different from never set)
614	pub fn set(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
615		let mut modifications_lock =
616			self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
617
618		let latest_block_number = self.get_current_block_number();
619
620		modifications_lock.insert(
621			key.to_vec(),
622			Some(Arc::new({
623				LocalSharedValue {
624					last_modification_block: latest_block_number,
625					value: value.map(|value| value.to_vec()),
626				}
627			})),
628		);
629
630		Ok(())
631	}
632
633	/// Set a storage value visible from the fork point onwards.
634	///
635	/// Unlike [`Self::set`], which records the modification at the current working block,
636	/// this marks the entry with `last_modification_block = first_forked_block_number - 1`
637	/// (saturating at 0) so it is visible immediately at the fork head and for any later
638	/// fork-local query, but not for historical pre-fork queries.
639	/// This is used for injecting initial state (e.g., dev accounts, sudo key)
640	/// that should be readable before any block is built.
641	///
642	/// These entries are never committed to the persistent cache by [`Self::commit`]
643	/// (which only commits entries at `current_block_number`), but they remain in
644	/// the in-memory modifications map for the lifetime of the fork.
645	pub fn set_initial(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
646		let mut modifications_lock =
647			self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
648		let initial_visibility_block = self.first_forked_block_number.saturating_sub(ONE_BLOCK);
649
650		modifications_lock.insert(
651			key.to_vec(),
652			Some(Arc::new(LocalSharedValue {
653				last_modification_block: initial_visibility_block,
654				value: value.map(|v| v.to_vec()),
655			})),
656		);
657
658		Ok(())
659	}
660
661	/// Get multiple storage values in a batch.
662	///
663	/// # Arguments
664	/// * `block_number` - The block number to query
665	/// * `keys` - Slice of storage keys to fetch (as byte slices)
666	///
667	/// # Returns
668	/// * `Ok(vec)` - Vector of optional values, in the same order as input keys
669	/// * `Err(_)` - Lock error or parent layer error
670	///
671	/// # Behavior
672	/// Storage lookup strategy based on block_number (same as `get`):
673	/// 1. If `block_number == latest_block_number`: Check modifications HashMap, then remote at
674	///    first_forked_block
675	/// 2. If `first_forked_block_number < block_number < latest_block_number` and the key is valid
676	///    for `block_number`: Check modifications HashMap table
677	/// 3. If `first_forked_block_number < block_number < latest_block_number`: Check local_storage
678	///    table
679	/// 4. Otherwise: Check remote provider directly (fetches block_hash from blocks table)
680	pub async fn get_batch(
681		&self,
682		block_number: u32,
683		keys: &[&[u8]],
684	) -> Result<Vec<Option<SharedValue>>, LocalStorageError> {
685		if keys.is_empty() {
686			return Ok(vec![]);
687		}
688
689		let latest_block_number = self.get_current_block_number();
690		let mut results: Vec<Option<SharedValue>> = Vec::with_capacity(keys.len());
691		let mut non_local_keys: Vec<&[u8]> = Vec::new();
692		let mut non_local_indices: Vec<usize> = Vec::new();
693
694		for (i, key) in keys.iter().enumerate() {
695			match self.get_local_modification(key, block_number)? {
696				Some(local_modification) => results.push(local_modification),
697				_ => {
698					results.push(None);
699					non_local_keys.push(*key);
700					non_local_indices.push(i)
701				},
702			}
703		}
704
705		// Case 1: Query for latest block - Complete non local keys with the remote layer
706		if block_number == latest_block_number {
707			if !non_local_keys.is_empty() {
708				let parent_values =
709					self.parent.get_batch(self.first_forked_block_hash, &non_local_keys).await?;
710				for (i, parent_value) in parent_values.into_iter().enumerate() {
711					let result_idx = non_local_indices[i];
712					results[result_idx] = parent_value
713						.map(|value| LocalSharedValue {
714							last_modification_block: 0, /* <- We don't care about the validity
715							                             * block for this value as it came from
716							                             * remote layer */
717							value: Some(value),
718						})
719						.map(Arc::new);
720				}
721			}
722
723			return Ok(results);
724		}
725
726		// Case 2: Historical block after fork -
727		// local_values table (using validity) for non local keys. Remote query for non found keys
728		if block_number > self.first_forked_block_number && block_number < latest_block_number {
729			if !non_local_keys.is_empty() {
730				// Use validity-based query to get values from local_values table
731				// get_local_values_at_block_batch returns Vec<Option<Option<Vec<u8>>>>:
732				// - None = no row found
733				// - Some(None) = row found but value is NULL (deleted)
734				// - Some(Some(value)) = row found with data
735				let cached_values = self
736					.parent
737					.cache()
738					.get_local_values_at_block_batch(&non_local_keys, block_number)
739					.await?;
740				for (i, cache_value) in cached_values.into_iter().enumerate() {
741					let result_idx = non_local_indices[i];
742					// cache_value is Option<Option<Vec<u8>>>, map it to Option<SharedValue>
743					results[result_idx] = cache_value.map(|value| {
744						Arc::new(LocalSharedValue {
745							last_modification_block: 0, /* <- We don't care about the validity
746							                             * block for this value as it came from
747							                             * cache */
748							value,
749						})
750					});
751				}
752			}
753
754			// For non found values, we need to query the remote storage at the first forked block
755			let mut final_results = Vec::with_capacity(keys.len());
756			for (i, value) in results.into_iter().enumerate() {
757				let final_value = if value.is_some() {
758					value
759				} else {
760					self.parent
761						.get(self.first_forked_block_hash, keys[i])
762						.await?
763						.map(|value| {
764							LocalSharedValue {
765								last_modification_block: 0, /* <- Value came from remote layer */
766								value: Some(value),
767							}
768						})
769						.map(Arc::new)
770				};
771				final_results.push(final_value);
772			}
773			return Ok(final_results);
774		}
775
776		// Case 3: Block before or at fork point - fetch and cache block if needed
777		let block = self.get_block(block_number).await?;
778
779		if let Some(block_row) = block {
780			let block_hash = H256::from_slice(&block_row.hash);
781			let parent_values = self.parent.get_batch(block_hash, keys).await?;
782			Ok(parent_values
783				.into_iter()
784				.map(|value| {
785					value.map(|value| {
786						Arc::new(LocalSharedValue {
787							last_modification_block: 0, /* <- We don't care about this value as
788							                             * it came
789							                             * from the remote layer, */
790							value: Some(value),
791						})
792					})
793				})
794				.collect())
795		} else {
796			// Block not found - return None for all keys
797			Ok(vec![None; keys.len()])
798		}
799	}
800
801	/// Set multiple storage values locally in a batch.
802	///
803	/// # Arguments
804	/// * `entries` - Slice of (key, value) pairs to set
805	///
806	/// # Returns
807	/// * `Ok(())` - All values were set successfully
808	/// * `Err(_)` - Lock error
809	///
810	/// # Behavior
811	/// - Does not affect the parent layer or underlying cache
812	/// - Overwrites any previous local modifications for the given keys
813	/// - `None` values mark keys as explicitly deleted
814	/// - More efficient than calling `set()` multiple times due to single lock acquisition
815	pub fn set_batch(&self, entries: &[(&[u8], Option<&[u8]>)]) -> Result<(), LocalStorageError> {
816		if entries.is_empty() {
817			return Ok(());
818		}
819
820		let latest_block_number = self.get_current_block_number();
821
822		let mut modifications_lock =
823			self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
824
825		for (key, value) in entries {
826			modifications_lock.insert(
827				key.to_vec(),
828				Some(Arc::new(LocalSharedValue {
829					last_modification_block: latest_block_number,
830					value: value.map(|value| value.to_vec()),
831				})),
832			);
833		}
834
835		Ok(())
836	}
837
838	/// Batch version of [`Self::set_initial`].
839	///
840	/// Sets multiple storage values visible from the fork point onwards, using
841	/// `last_modification_block = first_forked_block_number - 1` (saturating at 0).
842	/// See [`Self::set_initial`]
843	/// for details.
844	pub fn set_batch_initial(
845		&self,
846		entries: &[(&[u8], Option<&[u8]>)],
847	) -> Result<(), LocalStorageError> {
848		if entries.is_empty() {
849			return Ok(());
850		}
851
852		let mut modifications_lock =
853			self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
854		let initial_visibility_block = self.first_forked_block_number.saturating_sub(ONE_BLOCK);
855
856		for (key, value) in entries {
857			modifications_lock.insert(
858				key.to_vec(),
859				Some(Arc::new(LocalSharedValue {
860					last_modification_block: initial_visibility_block,
861					value: value.map(|v| v.to_vec()),
862				})),
863			);
864		}
865
866		Ok(())
867	}
868
869	/// Delete all keys matching a prefix.
870	///
871	/// # Arguments
872	/// * `prefix` - The prefix to match for deletion
873	///
874	/// # Returns
875	/// * `Ok(())` - Prefix was marked as deleted successfully
876	/// * `Err(_)` - Lock error
877	///
878	/// # Behavior
879	/// - Removes all locally modified keys that start with the prefix
880	/// - Marks the prefix as deleted, affecting future `get()` calls
881	/// - Keys in the parent layer matching this prefix will return `None` after this call
882	pub fn delete_prefix(&self, prefix: &[u8]) -> Result<(), LocalStorageError> {
883		let mut modifications_lock =
884			self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
885		let mut deleted_prefixes_lock = self
886			.deleted_prefixes
887			.write()
888			.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
889
890		// Remove all keys starting with the prefix using retain
891		modifications_lock.retain(|key, _| !key.starts_with(prefix));
892
893		// Add prefix to deleted_prefixes
894		deleted_prefixes_lock.push(prefix.to_vec());
895
896		Ok(())
897	}
898
899	/// Check if a prefix has been deleted.
900	///
901	/// # Arguments
902	/// * `prefix` - The prefix to check
903	///
904	/// # Returns
905	/// * `Ok(true)` - Prefix has been deleted via [`delete_prefix`](Self::delete_prefix)
906	/// * `Ok(false)` - Prefix has not been deleted
907	/// * `Err(_)` - Lock error
908	pub fn is_deleted(&self, prefix: &[u8]) -> Result<bool, LocalStorageError> {
909		let deleted_prefixes_lock = self
910			.deleted_prefixes
911			.read()
912			.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
913
914		Ok(deleted_prefixes_lock
915			.iter()
916			.any(|deleted_prefix| deleted_prefix.as_slice() == prefix))
917	}
918
919	/// Get all local modifications as a vector.
920	///
921	/// # Returns
922	/// * `Ok(vec)` - Vector of (key, value) pairs representing all local changes
923	/// * `Err(_)` - Lock error
924	///
925	/// # Behavior
926	/// - Returns only locally modified keys, not the full state
927	/// - `None` values indicate keys that were explicitly deleted
928	/// - Does not include keys deleted via prefix deletion
929	pub fn diff(&self) -> Result<DiffLocalStorage, LocalStorageError> {
930		let modifications_lock =
931			self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
932
933		Ok(modifications_lock
934			.iter()
935			.map(|(key, value)| (key.clone(), value.clone()))
936			.collect())
937	}
938
939	/// Commit modifications to the local storage tables, creating new validity entries.
940	///
941	/// # Returns
942	/// * `Ok(())` - All modifications were successfully committed to the cache
943	/// * `Err(_)` - Lock error or cache error
944	///
945	/// # Behavior
946	/// - Only commits modifications whose `last_modification_block == latest_block_number`
947	/// - For each key to commit:
948	///   - If key not in local_keys: insert key, then insert value with valid_from =
949	///     latest_block_number
950	///   - If key exists: close current open value (set valid_until), insert new value
951	/// - The modifications HashMap remains intact and available after commit
952	/// - Increases the latest block number
953	pub async fn commit(&mut self) -> Result<(), LocalStorageError> {
954		let current_block_number = self.get_current_block_number();
955		let new_latest_block = current_block_number
956			.checked_add(ONE_BLOCK)
957			.ok_or(LocalStorageError::Arithmetic)?;
958
959		// Collect modifications that need to be committed (only those modified at
960		// latest_block_number)
961		let diff = self.diff()?;
962
963		// Filter to only include modifications made at the current latest_block_number
964		// entries_to_commit contains (key, Option<value>) where None means deletion
965		let entries_to_commit: Vec<(&[u8], Option<&[u8]>)> = diff
966			.iter()
967			.filter_map(|(key, shared_value)| {
968				shared_value.as_ref().and_then(|sv| {
969					if sv.last_modification_block == current_block_number {
970						Some((key.as_slice(), sv.value.as_deref()))
971					} else {
972						None
973					}
974				})
975			})
976			.collect();
977
978		// Commit all changes in a single transaction
979		self.parent
980			.cache()
981			.commit_local_changes(&entries_to_commit, current_block_number)
982			.await?;
983
984		self.current_block_number = new_latest_block;
985
986		Ok(())
987	}
988
989	/// Create a child layer for nested modifications.
990	///
991	/// # Returns
992	/// A cloned `LocalStorageLayer` that shares the same parent and state.
993	///
994	/// # Behavior
995	/// - The child shares the same `modifications` and `deleted_prefixes` via `Arc`
996	/// - Changes in the child affect the parent and vice versa
997	/// - Useful for creating temporary scopes that can be discarded
998	///
999	/// # Note
1000	/// This is currently a simple clone. In the future, this may be updated to create
1001	/// true isolated child layers with proper parent-child relationships.
1002	pub fn child(&self) -> LocalStorageLayer {
1003		self.clone()
1004	}
1005}