pop_fork/local.rs
1// SPDX-License-Identifier: GPL-3.0
2
3//! Local storage layer for tracking modifications to forked state.
4//!
5//! This module provides the [`LocalStorageLayer`] which sits on top of a [`RemoteStorageLayer`]
6//! and tracks local modifications without mutating the underlying cached state. This enables
7//! transactional semantics where changes can be committed, discarded, or merged.
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │ LocalStorageLayer │
14//! │ │
15//! │ get(key) ─────► Modified? ──── Yes ────► Return modified value│
16//! │ │ │
17//! │ No │
18//! │ │ │
19//! │ ▼ │
20//! │ Prefix deleted? ── Yes ───► Return None │
21//! │ │ │
22//! │ No │
23//! │ │ │
24//! │ ▼ │
25//! │ Query parent layer │
26//! │ │ │
27//! │ ▼ │
28//! │ Return value │
29//! └─────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Example
33//!
34//! ```ignore
35//! use pop_fork::{LocalStorageLayer, RemoteStorageLayer};
36//!
37//! let remote = RemoteStorageLayer::new(rpc, cache, block_hash);
38//! let local = LocalStorageLayer::new(remote);
39//!
40//! // Set a value locally (doesn't affect remote/cache)
41//! local.set(&key, Some(&value))?;
42//!
43//! // Read returns the modified value
44//! let value = local.get(&key).await?;
45//!
46//! // Delete all keys with a prefix
47//! local.delete_prefix(&prefix)?;
48//! ```
49
50use crate::{error::LocalStorageError, models::BlockRow, remote::RemoteStorageLayer};
51use std::{
52 collections::{BTreeMap, HashMap},
53 sync::{Arc, RwLock},
54};
55use subxt::{Metadata, config::substrate::H256};
56
57const ONE_BLOCK: u32 = 1;
58
59/// A value that can be shared accross different local storage layer instances
60#[derive(Debug, PartialEq)]
61pub struct LocalSharedValue {
62 last_modification_block: u32,
63 /// The actual value
64 pub value: Option<Vec<u8>>,
65}
66
67static EMPTY_MEMORY: [u8; 0] = [];
68impl AsRef<[u8]> for LocalSharedValue {
69 /// AsRef implementation to get the value bytes of this local shared value
70 fn as_ref(&self) -> &[u8] {
71 match &self.value {
72 Some(value) => value.as_ref(),
73 None => &EMPTY_MEMORY,
74 }
75 }
76}
77
78type SharedValue = Arc<LocalSharedValue>;
79type Modifications = HashMap<Vec<u8>, Option<SharedValue>>;
80type DeletedPrefixes = Vec<Vec<u8>>;
81type DiffLocalStorage = Vec<(Vec<u8>, Option<SharedValue>)>;
82/// Maps block number (when metadata became valid) to metadata.
83/// Used to track metadata versions across runtime upgrades.
84type MetadataVersions = BTreeMap<u32, Arc<Metadata>>;
85
86/// Local storage layer that tracks modifications on top of a remote layer.
87///
88/// Provides transactional semantics: modifications are tracked locally without
89/// affecting the underlying remote layer or cache. Changes can be inspected via
90/// [`diff`](Self::diff).
91///
92/// # Block-based Storage Strategy
93///
94/// - `latest_block_number`: Current working block number (modifications in HashMap)
95/// - Keys queried at a block higher than the last modification of that key, queried at
96/// modifications HashMap
97/// - Blocks between first_forked_block_number and latest_block_number are in local_storage table
98/// - Blocks before first_forked_block_number come from remote provider
99///
100/// # Cloning
101///
102/// `LocalStorageLayer` is cheap to clone. The underlying modifications and
103/// deleted prefixes use `Arc<RwLock<_>>`, so clones share the same state.
104///
105/// # Thread Safety
106///
107/// The layer is `Send + Sync` and can be shared across async tasks. All
108/// operations use `read`/`write` locks which will block until the lock is acquired.
109#[derive(Clone, Debug)]
110pub struct LocalStorageLayer {
111 parent: RemoteStorageLayer,
112 first_forked_block_hash: H256,
113 first_forked_block_number: u32,
114 current_block_number: u32,
115 modifications: Arc<RwLock<Modifications>>,
116 deleted_prefixes: Arc<RwLock<DeletedPrefixes>>,
117 /// Metadata versions indexed by the block number when they became valid.
118 /// Enables looking up the correct metadata for any block in the fork.
119 metadata_versions: Arc<RwLock<MetadataVersions>>,
120}
121
122impl LocalStorageLayer {
123 /// Create a new local storage layer.
124 ///
125 /// # Arguments
126 /// * `parent` - The remote storage layer to use as the base state
127 /// * `first_forked_block_number` - The initial block number where the fork started
128 /// * `first_forked_block_hash` - The hash of the first forked block
129 /// * `metadata` - The runtime metadata at the fork point
130 ///
131 /// # Returns
132 /// A new `LocalStorageLayer` with no modifications, with `current_block_number` set to
133 /// `first_forked_block_number + 1` (the block being built).
134 pub fn new(
135 parent: RemoteStorageLayer,
136 first_forked_block_number: u32,
137 first_forked_block_hash: H256,
138 metadata: Metadata,
139 ) -> Self {
140 let mut metadata_versions = BTreeMap::new();
141 metadata_versions.insert(first_forked_block_number, Arc::new(metadata));
142
143 Self {
144 parent,
145 first_forked_block_hash,
146 first_forked_block_number,
147 current_block_number: first_forked_block_number + 1, /* current_block_number is the
148 * one to be produced. */
149 modifications: Arc::new(RwLock::new(HashMap::new())),
150 deleted_prefixes: Arc::new(RwLock::new(Vec::new())),
151 metadata_versions: Arc::new(RwLock::new(metadata_versions)),
152 }
153 }
154
155 /// Fetch and cache a block if it's not already in the cache.
156 ///
157 /// This is a helper method used by `get` and `get_batch` to ensure blocks are
158 /// available in the cache before querying them.
159 ///
160 /// # Arguments
161 /// * `block_number` - The block number to fetch and cache
162 ///
163 /// # Returns
164 /// * `Ok(Some(block_row))` - Block is now in cache (either was already cached or just fetched)
165 /// * `Ok(None)` - Block number doesn't exist
166 /// * `Err(_)` - RPC or cache error
167 ///
168 /// # Behavior
169 /// - First checks if block is already in cache
170 /// - If not cached, fetches from remote RPC and caches it
171 /// - If block doesn't exist, returns None
172 async fn get_block(&self, block_number: u32) -> Result<Option<BlockRow>, LocalStorageError> {
173 // First check if block is already in cache
174 if let Some(cached_block) = self.parent.cache().get_block_by_number(block_number).await? {
175 return Ok(Some(cached_block));
176 }
177
178 // Not in cache, fetch from remote and cache it
179 Ok(self.parent.fetch_and_cache_block_by_number(block_number).await?)
180 }
181
182 /// Get the current block number.
183 pub fn get_current_block_number(&self) -> u32 {
184 self.current_block_number
185 }
186
187 /// Get a reference to the underlying storage cache.
188 ///
189 /// This provides access to the cache for operations like clearing local storage.
190 pub fn cache(&self) -> &crate::StorageCache {
191 self.parent.cache()
192 }
193
194 /// Get the underlying remote storage layer.
195 ///
196 /// This provides access to the remote layer for operations that need to
197 /// fetch data directly from the remote chain (e.g., block headers, bodies).
198 /// The remote layer maintains a persistent connection to the RPC endpoint.
199 pub fn remote(&self) -> &crate::RemoteStorageLayer {
200 &self.parent
201 }
202
203 /// Get the hash of the first forked block.
204 ///
205 /// This is the block hash at which the fork was created, used for querying
206 /// storage keys on the remote chain.
207 pub fn fork_block_hash(&self) -> H256 {
208 self.first_forked_block_hash
209 }
210
211 /// Get the metadata valid at a specific block number.
212 ///
213 /// For blocks at or after the fork point, returns metadata from the local version tree.
214 /// For blocks before the fork point, fetches metadata from the remote node.
215 ///
216 /// # Arguments
217 /// * `block_number` - The block number to get metadata for
218 ///
219 /// # Returns
220 /// * `Ok(Arc<Metadata>)` - The metadata valid at the given block
221 /// * `Err(_)` - Lock error, RPC error, or metadata decode error
222 pub async fn metadata_at(&self, block_number: u32) -> Result<Arc<Metadata>, LocalStorageError> {
223 // For blocks before the fork point, fetch from remote
224 if block_number < self.first_forked_block_number {
225 return self.fetch_remote_metadata(block_number).await;
226 }
227
228 // For blocks at or after fork point, use local version tree
229 let versions = self
230 .metadata_versions
231 .read()
232 .map_err(|e| LocalStorageError::Lock(e.to_string()))?;
233
234 versions
235 .range(..=block_number)
236 .next_back()
237 .map(|(_, metadata)| Arc::clone(metadata))
238 .ok_or_else(|| {
239 LocalStorageError::MetadataNotFound(format!(
240 "No metadata found for block {}",
241 block_number
242 ))
243 })
244 }
245
246 /// Fetch metadata from the remote node, usually used for a pre-fork block.
247 async fn fetch_remote_metadata(
248 &self,
249 block_number: u32,
250 ) -> Result<Arc<Metadata>, LocalStorageError> {
251 // Get block hash for this block number
252 let block_hash = self.parent.rpc().block_hash_at(block_number).await?.ok_or_else(|| {
253 LocalStorageError::MetadataNotFound(format!(
254 "Block {} not found on remote node",
255 block_number
256 ))
257 })?;
258
259 // Fetch and decode metadata from remote
260 let metadata = self.parent.rpc().metadata(block_hash).await?;
261
262 Ok(Arc::new(metadata))
263 }
264
265 /// Register a new metadata version starting at the given block number.
266 ///
267 /// This should be called when a runtime upgrade occurs (`:code` storage key changes)
268 /// to record that a new metadata version is now active.
269 ///
270 /// # Arguments
271 /// * `block_number` - The block number where this metadata becomes valid
272 /// * `metadata` - The new runtime metadata
273 ///
274 /// # Returns
275 /// * `Ok(())` - Metadata version registered successfully
276 /// * `Err(_)` - Lock error
277 pub fn register_metadata_version(
278 &self,
279 block_number: u32,
280 metadata: Metadata,
281 ) -> Result<(), LocalStorageError> {
282 let mut versions = self
283 .metadata_versions
284 .write()
285 .map_err(|e| LocalStorageError::Lock(e.to_string()))?;
286
287 versions.insert(block_number, Arc::new(metadata));
288 Ok(())
289 }
290
291 /// Check if the `:code` storage key was modified at the specified block.
292 ///
293 /// This is used to detect runtime upgrades. When a runtime upgrade occurs in block X,
294 /// the new runtime is used starting from block X+1. So when building X+1, we check
295 /// if code changed in X (the parent) to determine if we're now using a new runtime.
296 ///
297 /// # Arguments
298 /// * `block_number` - The block number to check for code modifications
299 ///
300 /// # Returns
301 /// * `Ok(true)` - The `:code` key was modified at the specified block
302 /// * `Ok(false)` - The `:code` key was not modified at the specified block
303 /// * `Err(_)` - Lock error
304 pub fn has_code_changed_at(&self, block_number: u32) -> Result<bool, LocalStorageError> {
305 let modifications =
306 self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
307
308 // The well-known `:code` storage key
309 let code_key = sp_core::storage::well_known_keys::CODE;
310
311 // Check if :code was modified at the specified block
312 Ok(modifications.get(code_key).is_some_and(|value| {
313 value.as_ref().is_some_and(|v| v.last_modification_block == block_number)
314 }))
315 }
316
317 /// Return the value of the specified key at height `block_number` if the value contained in the
318 /// local modifications is valid at that height.
319 ///
320 /// # Arguments
321 /// - `key`
322 /// - `block_number`
323 fn get_local_modification(
324 &self,
325 key: &[u8],
326 block_number: u32,
327 ) -> Result<Option<Option<SharedValue>>, LocalStorageError> {
328 let latest_block_number = self.get_current_block_number();
329 let modifications_lock =
330 self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
331 let deleted_prefixes_lock = self
332 .deleted_prefixes
333 .read()
334 .map_err(|e| LocalStorageError::Lock(e.to_string()))?;
335
336 match modifications_lock.get(key) {
337 local_modification @ Some(Some(shared_value))
338 if latest_block_number == block_number ||
339 shared_value.last_modification_block < block_number =>
340 Ok(local_modification.cloned()), /* <- Cheap clone as it's Option<Option<Arc<_>>> */
341 None if deleted_prefixes_lock
342 .iter()
343 .any(|prefix| key.starts_with(prefix.as_slice())) =>
344 Ok(Some(None)),
345 _ => Ok(None),
346 }
347 }
348
349 /// Get a storage value, checking local modifications first.
350 ///
351 /// # Arguments
352 /// * `key` - The storage key to fetch
353 ///
354 /// # Returns
355 /// * `Ok(Some(value))` - Value exists (either modified locally, in local_storage, or in parent)
356 /// * `Ok(None)` - Key doesn't exist or was deleted via prefix deletion
357 /// * `Err(_)` - Lock error or parent layer error
358 ///
359 /// # Behavior
360 /// Storage lookup strategy based on block_number:
361 /// 1. If `block_number == latest_block_number` or the key in local modifications is valid for
362 /// this block: Check modifications HashMap, then remote at first_forked_block
363 /// 2. If `first_forked_block_number < block_number < latest_block_number`: Check local_storage
364 /// table
365 /// 3. Otherwise: Check remote provider directly (fetches block_hash from blocks table)
366 pub async fn get(
367 &self,
368 block_number: u32,
369 key: &[u8],
370 ) -> Result<Option<SharedValue>, LocalStorageError> {
371 let latest_block_number = self.get_current_block_number();
372
373 // First check if the key has a local modification
374 if let local_modification @ Ok(Some(_)) = self.get_local_modification(key, block_number) {
375 return local_modification.map(|local_modification| local_modification.flatten());
376 }
377
378 // Case 1: Query for latest block - check modifications, then remote at first_forked_block
379 if block_number == latest_block_number {
380 // Not in modifications, query remote at first_forked_block
381 return Ok(self
382 .parent
383 .get(self.first_forked_block_hash, key)
384 .await?
385 .map(|value| LocalSharedValue {
386 last_modification_block: 0, /* <- We don't care about the validity block for
387 * this value as it came from the remote layer */
388 value: Some(value),
389 })
390 .map(Arc::new));
391 }
392
393 // Case 2: Historical block after fork such that the local modification is still valid -
394 // check the local modifications map, otherwise fallback to cache and finally remote layer.
395 if block_number > self.first_forked_block_number && block_number < latest_block_number {
396 // Try to get value from local_values table using validity ranges
397 // get_local_value_at_block returns Option<Option<Vec<u8>>>:
398 // - None = no row found
399 // - Some(None) = row found but value is NULL (deleted)
400 // - Some(Some(value)) = row found with data
401 let value = if let Some(local_value) =
402 self.parent.cache().get_local_value_at_block(key, block_number).await?
403 {
404 local_value
405 }
406 // Not found in local storage, try remote at first_forked_block
407 else if let Some(remote_value) =
408 self.parent.get(self.first_forked_block_hash, key).await?
409 {
410 Some(remote_value)
411 } else {
412 return Ok(None);
413 };
414
415 return Ok(Some(Arc::new(LocalSharedValue {
416 last_modification_block: 0, /* <- Value came from remote or cache layer */
417 value,
418 })));
419 }
420
421 // Case 3: Block before or at fork point
422 let block = self.get_block(block_number).await?;
423
424 if let Some(block_row) = block {
425 let block_hash = H256::from_slice(&block_row.hash);
426 Ok(self
427 .parent
428 .get(block_hash, key)
429 .await?
430 .map(|value| LocalSharedValue {
431 last_modification_block: 0, /* <- We don't care about the validity block of
432 * this value as it came from the remote layer */
433 value: Some(value),
434 })
435 .map(Arc::new))
436 } else {
437 // Block not found
438 Ok(None)
439 }
440 }
441
442 /// Get the next key after the given key that starts with the prefix.
443 ///
444 /// # Arguments
445 /// * `prefix` - Storage key prefix to match
446 /// * `key` - The current key; returns the next key after this one
447 ///
448 /// # Returns
449 /// * `Ok(Some(key))` - The next key after `key` that starts with `prefix`
450 /// * `Ok(None)` - No more keys with this prefix
451 /// * `Err(_)` - Lock error or parent layer error
452 ///
453 /// # Behavior
454 /// 1. Queries the parent layer for the next key
455 /// 2. Skips keys that match deleted prefixes
456 /// 3. Does not consider locally modified keys (they are transient)
457 ///
458 /// # Note
459 /// This method currently delegates directly to the parent layer.
460 /// Locally modified keys are not included in key enumeration since
461 /// they represent uncommitted changes.
462 pub async fn next_key(
463 &self,
464 prefix: &[u8],
465 key: &[u8],
466 ) -> Result<Option<Vec<u8>>, LocalStorageError> {
467 // Clone deleted prefixes upfront - we can't hold the lock across await points
468 let deleted_prefixes = self
469 .deleted_prefixes
470 .read()
471 .map_err(|e| LocalStorageError::Lock(e.to_string()))?
472 .clone();
473
474 // Query parent and skip deleted keys
475 let mut current_key = key.to_vec();
476 loop {
477 let next =
478 self.parent.next_key(self.first_forked_block_hash, prefix, ¤t_key).await?;
479 match next {
480 Some(next_key) => {
481 // Check if this key matches any deleted prefix
482 if deleted_prefixes
483 .iter()
484 .any(|deleted| next_key.starts_with(deleted.as_slice()))
485 {
486 // Skip this key and continue searching
487 current_key = next_key;
488 continue;
489 }
490 return Ok(Some(next_key));
491 },
492 None => return Ok(None),
493 }
494 }
495 }
496
497 /// Enumerate all keys matching a prefix, merging remote and local state.
498 ///
499 /// This method combines keys from the remote layer (at the fork point) with
500 /// locally modified keys, producing a sorted, deduplicated list of keys that
501 /// exist at the specified fork-local block.
502 ///
503 /// For the latest block, uses the in-memory `modifications` and
504 /// `deleted_prefixes` snapshots. For historical fork-local blocks, queries
505 /// the persisted local values in the cache to reconstruct the key set that
506 /// existed at that block.
507 ///
508 /// Keys that were deleted locally (either individually via `set(key, None)`
509 /// or via `delete_prefix`) are excluded.
510 pub async fn keys_by_prefix(
511 &self,
512 prefix: &[u8],
513 block_number: u32,
514 ) -> Result<Vec<Vec<u8>>, LocalStorageError> {
515 // 1. Get remote keys at the fork point.
516 let remote_keys = self.parent.get_keys(self.first_forked_block_hash, prefix).await?;
517
518 let latest_block_number = self.get_current_block_number();
519
520 if block_number >= latest_block_number {
521 // Latest block: use in-memory modifications (fast path).
522 self.merge_keys_with_in_memory(remote_keys, prefix)
523 } else {
524 // Historical fork-local block: query persisted local values from cache.
525 self.merge_keys_with_cache(remote_keys, prefix, block_number).await
526 }
527 }
528
529 /// Merge remote keys with in-memory modifications for the latest block.
530 fn merge_keys_with_in_memory(
531 &self,
532 remote_keys: Vec<Vec<u8>>,
533 prefix: &[u8],
534 ) -> Result<Vec<Vec<u8>>, LocalStorageError> {
535 let modifications = self
536 .modifications
537 .read()
538 .map_err(|e| LocalStorageError::Lock(e.to_string()))?
539 .clone();
540 let deleted_prefixes = self
541 .deleted_prefixes
542 .read()
543 .map_err(|e| LocalStorageError::Lock(e.to_string()))?
544 .clone();
545
546 let is_deleted = |key: &[u8]| -> bool {
547 deleted_prefixes.iter().any(|dp| key.starts_with(dp.as_slice()))
548 };
549
550 let is_locally_deleted = |key: &[u8]| -> bool {
551 modifications
552 .get::<[u8]>(key)
553 .and_then(|sv| sv.as_ref())
554 .is_some_and(|sv| sv.value.is_none())
555 };
556
557 let mut merged: std::collections::BTreeSet<Vec<u8>> = remote_keys
558 .into_iter()
559 .filter(|k| !is_deleted(k) && !is_locally_deleted(k))
560 .collect();
561
562 for (key, maybe_sv) in modifications.iter() {
563 if key.starts_with(prefix) && maybe_sv.as_ref().is_some_and(|sv| sv.value.is_some()) {
564 merged.insert(key.clone());
565 }
566 }
567
568 Ok(merged.into_iter().collect())
569 }
570
571 /// Merge remote keys with persisted cache data for a historical fork-local block.
572 async fn merge_keys_with_cache(
573 &self,
574 remote_keys: Vec<Vec<u8>>,
575 prefix: &[u8],
576 block_number: u32,
577 ) -> Result<Vec<Vec<u8>>, LocalStorageError> {
578 let cache = self.parent.cache();
579
580 // Get keys that had non-NULL values at this block.
581 let local_live_keys = cache.get_local_keys_at_block(prefix, block_number).await?;
582
583 // Get keys that were explicitly deleted at this block.
584 let local_deleted_keys =
585 cache.get_local_deleted_keys_at_block(prefix, block_number).await?;
586
587 let deleted_set: std::collections::HashSet<Vec<u8>> =
588 local_deleted_keys.into_iter().collect();
589
590 // Start with remote keys, excluding those deleted locally at this block.
591 let mut merged: std::collections::BTreeSet<Vec<u8>> =
592 remote_keys.into_iter().filter(|k| !deleted_set.contains(k)).collect();
593
594 // Add locally-live keys.
595 merged.extend(local_live_keys);
596
597 Ok(merged.into_iter().collect())
598 }
599
600 /// Set a storage value locally.
601 ///
602 /// # Arguments
603 /// * `key` - The storage key to set
604 /// * `value` - The value to set, or `None` to mark as deleted
605 ///
606 /// # Returns
607 /// * `Ok(())` - Value was set successfully
608 /// * `Err(_)` - Lock error
609 ///
610 /// # Behavior
611 /// - Does not affect the parent layer or underlying cache
612 /// - Overwrites any previous local modification for this key
613 /// - Passing `None` marks the key as explicitly deleted (different from never set)
614 pub fn set(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
615 let mut modifications_lock =
616 self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
617
618 let latest_block_number = self.get_current_block_number();
619
620 modifications_lock.insert(
621 key.to_vec(),
622 Some(Arc::new({
623 LocalSharedValue {
624 last_modification_block: latest_block_number,
625 value: value.map(|value| value.to_vec()),
626 }
627 })),
628 );
629
630 Ok(())
631 }
632
633 /// Set a storage value visible from the fork point onwards.
634 ///
635 /// Unlike [`Self::set`], which records the modification at the current working block,
636 /// this marks the entry with `last_modification_block = first_forked_block_number - 1`
637 /// (saturating at 0) so it is visible immediately at the fork head and for any later
638 /// fork-local query, but not for historical pre-fork queries.
639 /// This is used for injecting initial state (e.g., dev accounts, sudo key)
640 /// that should be readable before any block is built.
641 ///
642 /// These entries are never committed to the persistent cache by [`Self::commit`]
643 /// (which only commits entries at `current_block_number`), but they remain in
644 /// the in-memory modifications map for the lifetime of the fork.
645 pub fn set_initial(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
646 let mut modifications_lock =
647 self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
648 let initial_visibility_block = self.first_forked_block_number.saturating_sub(ONE_BLOCK);
649
650 modifications_lock.insert(
651 key.to_vec(),
652 Some(Arc::new(LocalSharedValue {
653 last_modification_block: initial_visibility_block,
654 value: value.map(|v| v.to_vec()),
655 })),
656 );
657
658 Ok(())
659 }
660
661 /// Get multiple storage values in a batch.
662 ///
663 /// # Arguments
664 /// * `block_number` - The block number to query
665 /// * `keys` - Slice of storage keys to fetch (as byte slices)
666 ///
667 /// # Returns
668 /// * `Ok(vec)` - Vector of optional values, in the same order as input keys
669 /// * `Err(_)` - Lock error or parent layer error
670 ///
671 /// # Behavior
672 /// Storage lookup strategy based on block_number (same as `get`):
673 /// 1. If `block_number == latest_block_number`: Check modifications HashMap, then remote at
674 /// first_forked_block
675 /// 2. If `first_forked_block_number < block_number < latest_block_number` and the key is valid
676 /// for `block_number`: Check modifications HashMap table
677 /// 3. If `first_forked_block_number < block_number < latest_block_number`: Check local_storage
678 /// table
679 /// 4. Otherwise: Check remote provider directly (fetches block_hash from blocks table)
680 pub async fn get_batch(
681 &self,
682 block_number: u32,
683 keys: &[&[u8]],
684 ) -> Result<Vec<Option<SharedValue>>, LocalStorageError> {
685 if keys.is_empty() {
686 return Ok(vec![]);
687 }
688
689 let latest_block_number = self.get_current_block_number();
690 let mut results: Vec<Option<SharedValue>> = Vec::with_capacity(keys.len());
691 let mut non_local_keys: Vec<&[u8]> = Vec::new();
692 let mut non_local_indices: Vec<usize> = Vec::new();
693
694 for (i, key) in keys.iter().enumerate() {
695 match self.get_local_modification(key, block_number)? {
696 Some(local_modification) => results.push(local_modification),
697 _ => {
698 results.push(None);
699 non_local_keys.push(*key);
700 non_local_indices.push(i)
701 },
702 }
703 }
704
705 // Case 1: Query for latest block - Complete non local keys with the remote layer
706 if block_number == latest_block_number {
707 if !non_local_keys.is_empty() {
708 let parent_values =
709 self.parent.get_batch(self.first_forked_block_hash, &non_local_keys).await?;
710 for (i, parent_value) in parent_values.into_iter().enumerate() {
711 let result_idx = non_local_indices[i];
712 results[result_idx] = parent_value
713 .map(|value| LocalSharedValue {
714 last_modification_block: 0, /* <- We don't care about the validity
715 * block for this value as it came from
716 * remote layer */
717 value: Some(value),
718 })
719 .map(Arc::new);
720 }
721 }
722
723 return Ok(results);
724 }
725
726 // Case 2: Historical block after fork -
727 // local_values table (using validity) for non local keys. Remote query for non found keys
728 if block_number > self.first_forked_block_number && block_number < latest_block_number {
729 if !non_local_keys.is_empty() {
730 // Use validity-based query to get values from local_values table
731 // get_local_values_at_block_batch returns Vec<Option<Option<Vec<u8>>>>:
732 // - None = no row found
733 // - Some(None) = row found but value is NULL (deleted)
734 // - Some(Some(value)) = row found with data
735 let cached_values = self
736 .parent
737 .cache()
738 .get_local_values_at_block_batch(&non_local_keys, block_number)
739 .await?;
740 for (i, cache_value) in cached_values.into_iter().enumerate() {
741 let result_idx = non_local_indices[i];
742 // cache_value is Option<Option<Vec<u8>>>, map it to Option<SharedValue>
743 results[result_idx] = cache_value.map(|value| {
744 Arc::new(LocalSharedValue {
745 last_modification_block: 0, /* <- We don't care about the validity
746 * block for this value as it came from
747 * cache */
748 value,
749 })
750 });
751 }
752 }
753
754 // For non found values, we need to query the remote storage at the first forked block
755 let mut final_results = Vec::with_capacity(keys.len());
756 for (i, value) in results.into_iter().enumerate() {
757 let final_value = if value.is_some() {
758 value
759 } else {
760 self.parent
761 .get(self.first_forked_block_hash, keys[i])
762 .await?
763 .map(|value| {
764 LocalSharedValue {
765 last_modification_block: 0, /* <- Value came from remote layer */
766 value: Some(value),
767 }
768 })
769 .map(Arc::new)
770 };
771 final_results.push(final_value);
772 }
773 return Ok(final_results);
774 }
775
776 // Case 3: Block before or at fork point - fetch and cache block if needed
777 let block = self.get_block(block_number).await?;
778
779 if let Some(block_row) = block {
780 let block_hash = H256::from_slice(&block_row.hash);
781 let parent_values = self.parent.get_batch(block_hash, keys).await?;
782 Ok(parent_values
783 .into_iter()
784 .map(|value| {
785 value.map(|value| {
786 Arc::new(LocalSharedValue {
787 last_modification_block: 0, /* <- We don't care about this value as
788 * it came
789 * from the remote layer, */
790 value: Some(value),
791 })
792 })
793 })
794 .collect())
795 } else {
796 // Block not found - return None for all keys
797 Ok(vec![None; keys.len()])
798 }
799 }
800
801 /// Set multiple storage values locally in a batch.
802 ///
803 /// # Arguments
804 /// * `entries` - Slice of (key, value) pairs to set
805 ///
806 /// # Returns
807 /// * `Ok(())` - All values were set successfully
808 /// * `Err(_)` - Lock error
809 ///
810 /// # Behavior
811 /// - Does not affect the parent layer or underlying cache
812 /// - Overwrites any previous local modifications for the given keys
813 /// - `None` values mark keys as explicitly deleted
814 /// - More efficient than calling `set()` multiple times due to single lock acquisition
815 pub fn set_batch(&self, entries: &[(&[u8], Option<&[u8]>)]) -> Result<(), LocalStorageError> {
816 if entries.is_empty() {
817 return Ok(());
818 }
819
820 let latest_block_number = self.get_current_block_number();
821
822 let mut modifications_lock =
823 self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
824
825 for (key, value) in entries {
826 modifications_lock.insert(
827 key.to_vec(),
828 Some(Arc::new(LocalSharedValue {
829 last_modification_block: latest_block_number,
830 value: value.map(|value| value.to_vec()),
831 })),
832 );
833 }
834
835 Ok(())
836 }
837
838 /// Batch version of [`Self::set_initial`].
839 ///
840 /// Sets multiple storage values visible from the fork point onwards, using
841 /// `last_modification_block = first_forked_block_number - 1` (saturating at 0).
842 /// See [`Self::set_initial`]
843 /// for details.
844 pub fn set_batch_initial(
845 &self,
846 entries: &[(&[u8], Option<&[u8]>)],
847 ) -> Result<(), LocalStorageError> {
848 if entries.is_empty() {
849 return Ok(());
850 }
851
852 let mut modifications_lock =
853 self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
854 let initial_visibility_block = self.first_forked_block_number.saturating_sub(ONE_BLOCK);
855
856 for (key, value) in entries {
857 modifications_lock.insert(
858 key.to_vec(),
859 Some(Arc::new(LocalSharedValue {
860 last_modification_block: initial_visibility_block,
861 value: value.map(|v| v.to_vec()),
862 })),
863 );
864 }
865
866 Ok(())
867 }
868
869 /// Delete all keys matching a prefix.
870 ///
871 /// # Arguments
872 /// * `prefix` - The prefix to match for deletion
873 ///
874 /// # Returns
875 /// * `Ok(())` - Prefix was marked as deleted successfully
876 /// * `Err(_)` - Lock error
877 ///
878 /// # Behavior
879 /// - Removes all locally modified keys that start with the prefix
880 /// - Marks the prefix as deleted, affecting future `get()` calls
881 /// - Keys in the parent layer matching this prefix will return `None` after this call
882 pub fn delete_prefix(&self, prefix: &[u8]) -> Result<(), LocalStorageError> {
883 let mut modifications_lock =
884 self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
885 let mut deleted_prefixes_lock = self
886 .deleted_prefixes
887 .write()
888 .map_err(|e| LocalStorageError::Lock(e.to_string()))?;
889
890 // Remove all keys starting with the prefix using retain
891 modifications_lock.retain(|key, _| !key.starts_with(prefix));
892
893 // Add prefix to deleted_prefixes
894 deleted_prefixes_lock.push(prefix.to_vec());
895
896 Ok(())
897 }
898
899 /// Check if a prefix has been deleted.
900 ///
901 /// # Arguments
902 /// * `prefix` - The prefix to check
903 ///
904 /// # Returns
905 /// * `Ok(true)` - Prefix has been deleted via [`delete_prefix`](Self::delete_prefix)
906 /// * `Ok(false)` - Prefix has not been deleted
907 /// * `Err(_)` - Lock error
908 pub fn is_deleted(&self, prefix: &[u8]) -> Result<bool, LocalStorageError> {
909 let deleted_prefixes_lock = self
910 .deleted_prefixes
911 .read()
912 .map_err(|e| LocalStorageError::Lock(e.to_string()))?;
913
914 Ok(deleted_prefixes_lock
915 .iter()
916 .any(|deleted_prefix| deleted_prefix.as_slice() == prefix))
917 }
918
919 /// Get all local modifications as a vector.
920 ///
921 /// # Returns
922 /// * `Ok(vec)` - Vector of (key, value) pairs representing all local changes
923 /// * `Err(_)` - Lock error
924 ///
925 /// # Behavior
926 /// - Returns only locally modified keys, not the full state
927 /// - `None` values indicate keys that were explicitly deleted
928 /// - Does not include keys deleted via prefix deletion
929 pub fn diff(&self) -> Result<DiffLocalStorage, LocalStorageError> {
930 let modifications_lock =
931 self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
932
933 Ok(modifications_lock
934 .iter()
935 .map(|(key, value)| (key.clone(), value.clone()))
936 .collect())
937 }
938
939 /// Commit modifications to the local storage tables, creating new validity entries.
940 ///
941 /// # Returns
942 /// * `Ok(())` - All modifications were successfully committed to the cache
943 /// * `Err(_)` - Lock error or cache error
944 ///
945 /// # Behavior
946 /// - Only commits modifications whose `last_modification_block == latest_block_number`
947 /// - For each key to commit:
948 /// - If key not in local_keys: insert key, then insert value with valid_from =
949 /// latest_block_number
950 /// - If key exists: close current open value (set valid_until), insert new value
951 /// - The modifications HashMap remains intact and available after commit
952 /// - Increases the latest block number
953 pub async fn commit(&mut self) -> Result<(), LocalStorageError> {
954 let current_block_number = self.get_current_block_number();
955 let new_latest_block = current_block_number
956 .checked_add(ONE_BLOCK)
957 .ok_or(LocalStorageError::Arithmetic)?;
958
959 // Collect modifications that need to be committed (only those modified at
960 // latest_block_number)
961 let diff = self.diff()?;
962
963 // Filter to only include modifications made at the current latest_block_number
964 // entries_to_commit contains (key, Option<value>) where None means deletion
965 let entries_to_commit: Vec<(&[u8], Option<&[u8]>)> = diff
966 .iter()
967 .filter_map(|(key, shared_value)| {
968 shared_value.as_ref().and_then(|sv| {
969 if sv.last_modification_block == current_block_number {
970 Some((key.as_slice(), sv.value.as_deref()))
971 } else {
972 None
973 }
974 })
975 })
976 .collect();
977
978 // Commit all changes in a single transaction
979 self.parent
980 .cache()
981 .commit_local_changes(&entries_to_commit, current_block_number)
982 .await?;
983
984 self.current_block_number = new_latest_block;
985
986 Ok(())
987 }
988
989 /// Create a child layer for nested modifications.
990 ///
991 /// # Returns
992 /// A cloned `LocalStorageLayer` that shares the same parent and state.
993 ///
994 /// # Behavior
995 /// - The child shares the same `modifications` and `deleted_prefixes` via `Arc`
996 /// - Changes in the child affect the parent and vice versa
997 /// - Useful for creating temporary scopes that can be discarded
998 ///
999 /// # Note
1000 /// This is currently a simple clone. In the future, this may be updated to create
1001 /// true isolated child layers with proper parent-child relationships.
1002 pub fn child(&self) -> LocalStorageLayer {
1003 self.clone()
1004 }
1005}