Skip to main content

pop_fork/
cache.rs

1// SPDX-License-Identifier: GPL-3.0
2
3//! SQLite-based storage cache for fork operations.
4//!
5//! Provides persistent caching of storage values fetched from live chains,
6//! enabling fast restarts and reducing RPC calls.
7
8use crate::{
9	error::cache::CacheError,
10	models::{
11		BlockRow, LocalKeyRow, NewBlockRow, NewLocalKeyRow, NewLocalValueRow, NewPrefixScanRow,
12		NewStorageRow,
13	},
14	schema::{blocks, local_keys, local_values, prefix_scans, storage},
15	strings::cache::{errors, lock_patterns, pragmas, urls},
16};
17use bb8::CustomizeConnection;
18use diesel::{
19	OptionalExtension, prelude::*, result::Error as DieselError, sqlite::SqliteConnection,
20};
21use diesel_async::{
22	AsyncConnection, AsyncMigrationHarness, RunQueryDsl,
23	pooled_connection::{
24		AsyncDieselConnectionManager, PoolError,
25		bb8::{Pool, PooledConnection},
26	},
27	sync_connection_wrapper::SyncConnectionWrapper,
28};
29use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
30use std::{
31	collections::{HashMap, HashSet},
32	future::Future,
33	ops::{Deref, DerefMut},
34	path::Path,
35	pin::Pin,
36	sync::Arc,
37	time::Duration,
38};
39use subxt::config::substrate::H256;
40use tokio::sync::{Mutex, MutexGuard};
41
42/// Maximum number of connections in the SQLite connection pool.
43///
44/// Since Pop is the only process accessing the database, this is for internal
45/// async task concurrency. 5 provides comfortable headroom for parallel operations
46/// while remaining lightweight on end-user devices.
47const MAX_POOL_CONNECTIONS: u32 = 5;
48/// Maximum retries for transient SQLite lock/busy errors on write paths.
49const MAX_LOCK_RETRIES: u32 = 30;
50
51/// Progress information for a prefix scan operation.
52///
53/// Tracks the state of an incremental prefix scan, enabling resumable
54/// operations that can be interrupted and continued later.
55///
56/// # Lifecycle
57///
58/// 1. **Not started**: `get_prefix_scan_progress()` returns `None`
59/// 2. **In progress**: `is_complete = false`, `last_scanned_key` holds the resume point
60/// 3. **Completed**: `is_complete = true`, all keys for the prefix have been scanned
61#[derive(Debug, Clone)]
62pub struct PrefixScanProgress {
63	/// The last key that was successfully scanned.
64	/// Used as the starting point when resuming an interrupted scan.
65	pub last_scanned_key: Option<Vec<u8>>,
66	/// Whether the scan has processed all keys matching the prefix.
67	pub is_complete: bool,
68}
69
70/// SQLite-backed persistent cache for storage values.
71///
72/// Enables fast restarts without re-fetching all data from live chains
73/// and reduces load on public RPC endpoints.
74#[derive(Clone, Debug)]
75pub struct StorageCache {
76	inner: StorageConn,
77}
78
79/// Internal connection wrapper for the storage cache.
80#[derive(Clone)]
81enum StorageConn {
82	/// For file-based databases, uses a connection pool to enable concurrent access
83	/// from multiple async tasks. This is more efficient for persistent storage where multiple
84	/// operations may run in parallel.
85	Pool(Pool<SyncConnectionWrapper<SqliteConnection>>),
86	/// For in-memory databases, uses a single shared connection protected by a mutex.
87	/// In-memory databases don't benefit from connection pools since all connections share the
88	/// same memory state.
89	Single(Arc<Mutex<SyncConnectionWrapper<SqliteConnection>>>),
90}
91
92impl std::fmt::Debug for StorageConn {
93	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94		match self {
95			StorageConn::Pool(_) => f.debug_tuple("Pool").field(&"...").finish(),
96			StorageConn::Single(_) => f.debug_tuple("Single").field(&"...").finish(),
97		}
98	}
99}
100
101/// Connection guard that handles both pool and single connection types.
102///
103/// Automatically returns the connection to the pool or unlocks the mutex when dropped.
104pub(crate) enum ConnectionGuard<'a> {
105	Pool(PooledConnection<'a, SyncConnectionWrapper<SqliteConnection>>),
106	Single(MutexGuard<'a, SyncConnectionWrapper<SqliteConnection>>),
107}
108
109impl<'a> Deref for ConnectionGuard<'a> {
110	type Target = SyncConnectionWrapper<SqliteConnection>;
111
112	fn deref(&self) -> &Self::Target {
113		match self {
114			ConnectionGuard::Pool(conn) => conn,
115			ConnectionGuard::Single(guard) => guard,
116		}
117	}
118}
119
120impl<'a> DerefMut for ConnectionGuard<'a> {
121	fn deref_mut(&mut self) -> &mut Self::Target {
122		match self {
123			ConnectionGuard::Pool(conn) => conn,
124			ConnectionGuard::Single(guard) => guard,
125		}
126	}
127}
128
129/// Increments the attempt counter and sleeps with linear backoff.
130///
131/// Uses a simple linear backoff strategy: delay = 10ms * attempt_number.
132/// This gives the database time to release locks while avoiding excessive delays.
133async fn retry_conn(attempts: &mut u32) {
134	*attempts += 1;
135	let delay_ms = 10u64.saturating_mul(*attempts as u64);
136	tokio::time::sleep(Duration::from_millis(delay_ms)).await;
137}
138
139/// Connection customizer that sets SQLite pragmas on each pooled connection.
140#[derive(Debug, Clone, Copy)]
141struct SqliteConnectionCustomizer;
142
143impl CustomizeConnection<SyncConnectionWrapper<SqliteConnection>, PoolError>
144	for SqliteConnectionCustomizer
145{
146	fn on_acquire<'a>(
147		&'a self,
148		conn: &'a mut SyncConnectionWrapper<SqliteConnection>,
149	) -> Pin<Box<dyn Future<Output = Result<(), PoolError>> + Send + 'a>> {
150		Box::pin(async move {
151			// Set busy timeout to reduce lock errors under contention
152			diesel::sql_query(pragmas::BUSY_TIMEOUT)
153				.execute(conn)
154				.await
155				.map_err(PoolError::QueryError)?;
156			Ok(())
157		})
158	}
159}
160
161impl StorageCache {
162	/// Open or create a cache database at the specified path.
163	///
164	/// Creates the parent directory if it doesn't exist.
165	pub async fn open(maybe_path: Option<&Path>) -> Result<Self, CacheError> {
166		// For in-memory open a single dedicated connection; for file path use a pool.
167		if let Some(path) = maybe_path {
168			// Ensure parent directory exists
169			if let Some(parent) = path.parent() {
170				std::fs::create_dir_all(parent)?;
171			}
172			let url = path.display().to_string();
173
174			// Run migrations on a temporary async connection first
175			{
176				let mut conn = SyncConnectionWrapper::<SqliteConnection>::establish(&url).await?;
177				// Apply pragmas for better concurrency on file databases
178				// WAL mode: Persists to the database file itself
179				diesel::sql_query(pragmas::JOURNAL_MODE_WAL).execute(&mut conn).await?;
180				// Busy timeout: For this migration connection
181				diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
182				let mut harness = AsyncMigrationHarness::new(conn);
183				harness.run_pending_migrations(MIGRATIONS)?;
184				let _ = harness.into_inner();
185			}
186
187			// Build the pool with connection customizer
188			let manager =
189				AsyncDieselConnectionManager::<SyncConnectionWrapper<SqliteConnection>>::new(url);
190			let pool = Pool::builder()
191				.max_size(MAX_POOL_CONNECTIONS)
192				.connection_customizer(Box::new(SqliteConnectionCustomizer))
193				.build(manager)
194				.await?;
195			Ok(Self { inner: StorageConn::Pool(pool) })
196		} else {
197			// Single in-memory connection
198			let mut conn =
199				SyncConnectionWrapper::<SqliteConnection>::establish(urls::IN_MEMORY).await?;
200			// Run migrations on this single connection
201			// Set busy timeout to reduce lock errors under contention
202			diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
203			let mut harness = AsyncMigrationHarness::new(conn);
204			harness.run_pending_migrations(MIGRATIONS)?;
205			let conn = harness.into_inner();
206			Ok(Self {
207				inner: StorageConn::Single(std::sync::Arc::new(tokio::sync::Mutex::new(conn))),
208			})
209		}
210	}
211
212	/// Open an in-memory cache.
213	///
214	/// Creates a fresh in-memory SQLite database and runs all migrations
215	/// to set up the storage and blocks tables.
216	pub async fn in_memory() -> Result<Self, CacheError> {
217		Self::open(None).await
218	}
219
220	/// Get a database connection.
221	///
222	/// Handles acquiring the connection from either the pool or single mutex.
223	/// The connection is automatically returned to the pool or unlocks the mutex when dropped.
224	pub(crate) async fn get_conn(&self) -> Result<ConnectionGuard<'_>, CacheError> {
225		match &self.inner {
226			StorageConn::Pool(pool) => {
227				let conn = pool.get().await.map_err(|e| {
228					CacheError::Connection(ConnectionError::BadConnection(e.to_string()))
229				})?;
230				Ok(ConnectionGuard::Pool(conn))
231			},
232			StorageConn::Single(m) => {
233				let conn = m.lock().await;
234				Ok(ConnectionGuard::Single(conn))
235			},
236		}
237	}
238
239	/// Get a cached storage value.
240	///
241	/// # Returns
242	/// * `Ok(Some(Some(value)))` - Cached with a value.
243	/// * `Ok(Some(None))` - Cached as empty (storage key exists but has no value).
244	/// * `Ok(None)` - Not in cache (unknown).
245	pub async fn get_storage(
246		&self,
247		block_hash: H256,
248		key: &[u8],
249	) -> Result<Option<Option<Vec<u8>>>, CacheError> {
250		// Retrieve the cached value and its empty flag for the given block and key.
251		// We need both `value` and `is_empty` to distinguish between:
252		// - Key not in cache (no row returned)
253		// - Key cached as empty (row exists, is_empty = true)
254		// - Key cached with value (row exists, is_empty = false)
255		use crate::schema::storage::columns as sc;
256
257		let mut conn = self.get_conn().await?;
258
259		let row: Option<(Option<Vec<u8>>, bool)> = storage::table
260			.filter(sc::block_hash.eq(block_hash.as_bytes()))
261			.filter(sc::key.eq(key))
262			.select((sc::value, sc::is_empty))
263			.first::<(Option<Vec<u8>>, bool)>(&mut conn)
264			.await
265			.optional()?;
266
267		Ok(row.map(|(val, empty)| if empty { None } else { val }))
268	}
269
270	/// Cache a storage value.
271	///
272	/// # Arguments
273	/// * `block_hash` - The block hash this storage is from
274	/// * `key` - The storage key
275	/// * `value` - The storage value, or None if the key has no value (empty)
276	pub async fn set_storage(
277		&self,
278		block_hash: H256,
279		key: &[u8],
280		value: Option<&[u8]>,
281	) -> Result<(), CacheError> {
282		// Insert or update the cached storage entry with simple retry on lock contention.
283		use crate::schema::storage::columns as sc;
284
285		// Retry loop for transient SQLite lock/busy errors.
286		// SQLite may return SQLITE_BUSY when another connection holds a lock.
287		// We retry up to MAX_LOCK_RETRIES times with increasing backoff delays.
288		let mut attempts = 0;
289		loop {
290			let mut conn = self.get_conn().await?;
291
292			let row = NewStorageRow {
293				block_hash: block_hash.as_bytes(),
294				key,
295				value,
296				is_empty: value.is_none(),
297			};
298
299			let res = diesel::insert_into(storage::table)
300				.values(&row)
301				.on_conflict((sc::block_hash, sc::key))
302				.do_update()
303				.set((sc::value.eq(value), sc::is_empty.eq(row.is_empty)))
304				.execute(&mut conn)
305				.await;
306
307			match res {
308				Ok(_) => return Ok(()),
309				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
310					retry_conn(&mut attempts).await;
311					continue;
312				},
313				Err(e) => return Err(e.into()),
314			}
315		}
316	}
317
318	/// Get multiple cached storage values in a batch.
319	///
320	/// Returns results in the same order as the input keys.
321	pub async fn get_storage_batch(
322		&self,
323		block_hash: H256,
324		keys: &[&[u8]],
325	) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
326		if keys.is_empty() {
327			return Ok(vec![]);
328		}
329
330		let mut seen = HashSet::with_capacity(keys.len());
331		if keys.iter().any(|key| !seen.insert(key)) {
332			return Err(CacheError::DuplicatedKeys);
333		}
334
335		use crate::schema::storage::columns as sc;
336		let mut conn = self.get_conn().await?;
337
338		let rows: Vec<(Vec<u8>, Option<Vec<u8>>, bool)> = storage::table
339			.filter(sc::block_hash.eq(block_hash.as_bytes()))
340			.filter(sc::key.eq_any(keys))
341			.select((sc::key, sc::value, sc::is_empty))
342			.load::<(Vec<u8>, Option<Vec<u8>>, bool)>(&mut conn)
343			.await?;
344
345		// Build a map from the results. SQLite doesn't guarantee result order matches
346		// the IN clause order, so we use a HashMap to look up values by key.
347		let mut cache_map = HashMap::new();
348		for (key, value, empty) in rows {
349			let value = if empty { None } else { value };
350			cache_map.insert(key, value);
351		}
352
353		// Return values in the same order as input keys.
354		// Keys not found in cache_map (not in DB) return None (not cached).
355		// Keys found return Some(value) where value is None for empty or Some(bytes) for data.
356		Ok(keys.iter().map(|key| cache_map.remove(*key)).collect())
357	}
358
359	/// Cache multiple storage values in a batch.
360	///
361	/// Uses a transaction for efficiency.
362	pub async fn set_storage_batch(
363		&self,
364		block_hash: H256,
365		entries: &[(&[u8], Option<&[u8]>)],
366	) -> Result<(), CacheError> {
367		if entries.is_empty() {
368			return Ok(());
369		}
370
371		let mut seen = HashSet::with_capacity(entries.len());
372		if entries.iter().any(|(key, _)| !seen.insert(key)) {
373			return Err(CacheError::DuplicatedKeys);
374		}
375
376		// Use a transaction to batch all inserts together.
377		// This is significantly faster than individual inserts because:
378		// 1. SQLite commits are expensive (fsync to disk)
379		// 2. A transaction groups all inserts into a single commit
380		// 3. If any insert fails, the entire batch is rolled back
381		use crate::schema::storage::columns as sc;
382		let entries = Arc::new(entries);
383		let block_hash = Arc::new(block_hash);
384
385		// Retry loop for transient SQLite lock/busy errors.
386		// SQLite may return SQLITE_BUSY when another connection holds a lock.
387		// We retry up to MAX_LOCK_RETRIES (30) times with increasing backoff delays.
388		let mut attempts = 0;
389		loop {
390			let entries = Arc::clone(&entries);
391			let block_hash = Arc::clone(&block_hash);
392			let mut conn = self.get_conn().await?;
393			let res = conn
394				.transaction::<_, DieselError, _>(move |conn| {
395					Box::pin(async move {
396						let new_rows: Vec<NewStorageRow> = entries
397							.iter()
398							.map(|(key, value)| NewStorageRow {
399								block_hash: block_hash.as_bytes(),
400								key,
401								value: *value,
402								is_empty: value.is_none(),
403							})
404							.collect();
405						for row in new_rows {
406							diesel::insert_into(storage::table)
407								.values(&row)
408								.on_conflict((sc::block_hash, sc::key))
409								.do_update()
410								.set((sc::value.eq(row.value), sc::is_empty.eq(row.is_empty)))
411								.execute(conn)
412								.await?;
413						}
414						Ok(())
415					})
416				})
417				.await;
418
419			match res {
420				Ok(_) => return Ok(()),
421				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
422					retry_conn(&mut attempts).await;
423					continue;
424				},
425				Err(e) => return Err(e.into()),
426			}
427		}
428	}
429
430	/// Get a local key's ID from the local_keys table.
431	///
432	/// # Returns
433	/// * `Ok(Some(key_row))` - Key exists with its ID
434	/// * `Ok(None)` - Key not in local_keys table
435	pub async fn get_local_key(&self, key: &[u8]) -> Result<Option<LocalKeyRow>, CacheError> {
436		use crate::schema::local_keys::columns as lkc;
437
438		let mut conn = self.get_conn().await?;
439
440		let row = local_keys::table
441			.filter(lkc::key.eq(key))
442			.select(LocalKeyRow::as_select())
443			.first(&mut conn)
444			.await
445			.optional()?;
446
447		Ok(row)
448	}
449
450	/// Insert a new key into local_keys and return its ID.
451	///
452	/// If the key already exists, returns the existing ID.
453	pub async fn insert_local_key(&self, key: &[u8]) -> Result<i32, CacheError> {
454		use crate::schema::local_keys::columns as lkc;
455
456		let mut attempts = 0;
457		loop {
458			let mut conn = self.get_conn().await?;
459
460			// Try to insert, ignore conflict (key already exists)
461			let res = diesel::insert_into(local_keys::table)
462				.values(NewLocalKeyRow { key })
463				.on_conflict(lkc::key)
464				.do_nothing()
465				.execute(&mut conn)
466				.await;
467
468			match res {
469				Ok(_) => {
470					// Fetch the ID (either just inserted or already existed)
471					let key_id: i32 = local_keys::table
472						.filter(lkc::key.eq(key))
473						.select(lkc::id)
474						.first(&mut conn)
475						.await?;
476					return Ok(key_id);
477				},
478				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
479					retry_conn(&mut attempts).await;
480					continue;
481				},
482				Err(e) => return Err(e.into()),
483			}
484		}
485	}
486
487	/// Get a local storage value valid at a specific block number.
488	///
489	/// Queries the local_values table for a value that is valid at the given block:
490	/// valid_from <= block_number AND (valid_until IS NULL OR valid_until > block_number)
491	///
492	/// # Returns
493	/// * `Ok(Some(Some(value)))` - Value found with data
494	/// * `Ok(Some(None))` - Value found but explicitly deleted (NULL in DB)
495	/// * `Ok(None)` - No value valid at this block (no row found)
496	pub async fn get_local_value_at_block(
497		&self,
498		key: &[u8],
499		block_number: u32,
500	) -> Result<Option<Option<Vec<u8>>>, CacheError> {
501		use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
502
503		let mut conn = self.get_conn().await?;
504		let block_num = block_number as i64;
505
506		// First get the key_id
507		let key_id: i32 = match local_keys::table
508			.filter(lkc::key.eq(key))
509			.select(lkc::id)
510			.first(&mut conn)
511			.await
512			.optional()?
513		{
514			Some(id) => id,
515			_ => return Ok(None),
516		};
517
518		// Query for value valid at block_number (value can be NULL for deletions)
519		let value: Option<Option<Vec<u8>>> = local_values::table
520			.filter(lvc::key_id.eq(key_id))
521			.filter(lvc::valid_from.le(block_num))
522			.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
523			.select(lvc::value)
524			.first(&mut conn)
525			.await
526			.optional()?;
527
528		Ok(value)
529	}
530
531	/// Get multiple local storage values valid at a specific block number.
532	///
533	/// Returns results in the same order as the input keys.
534	/// * `Some(Some(value))` - Value found with data
535	/// * `Some(None)` - Value found but explicitly deleted (NULL in DB)
536	/// * `None` - No value valid at this block (no row found)
537	pub async fn get_local_values_at_block_batch(
538		&self,
539		keys: &[&[u8]],
540		block_number: u32,
541	) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
542		if keys.is_empty() {
543			return Ok(vec![]);
544		}
545
546		let mut seen = HashSet::with_capacity(keys.len());
547		if keys.iter().any(|key| !seen.insert(key)) {
548			return Err(CacheError::DuplicatedKeys);
549		}
550
551		use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
552
553		let mut conn = self.get_conn().await?;
554		let block_num = block_number as i64;
555
556		// Get all key_ids for the requested keys
557		let key_rows: Vec<LocalKeyRow> = local_keys::table
558			.filter(lkc::key.eq_any(keys))
559			.select(LocalKeyRow::as_select())
560			.load(&mut conn)
561			.await?;
562
563		// Build a map from key bytes to key_id
564		let key_to_id: HashMap<Vec<u8>, i32> =
565			key_rows.iter().map(|r| (r.key.clone(), r.id)).collect();
566
567		// Get all key_ids that exist
568		let key_ids: Vec<i32> = key_to_id.values().copied().collect();
569
570		if key_ids.is_empty() {
571			return Ok(vec![None; keys.len()]);
572		}
573
574		// Query for values valid at block_number for all key_ids (value can be NULL for deletions)
575		let value_rows: Vec<(i32, Option<Vec<u8>>)> = local_values::table
576			.filter(lvc::key_id.eq_any(&key_ids))
577			.filter(lvc::valid_from.le(block_num))
578			.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
579			.select((lvc::key_id, lvc::value))
580			.load(&mut conn)
581			.await?;
582
583		// Build a map from key_id to value (Option<Vec<u8>> to track deletions)
584		let mut id_to_value: HashMap<i32, Option<Vec<u8>>> = HashMap::new();
585		for (key_id, value) in value_rows {
586			id_to_value.insert(key_id, value);
587		}
588
589		// Build result in same order as input keys
590		// None = key not found in local storage
591		// Some(None) = key was deleted
592		// Some(Some(value)) = key has a value
593		Ok(keys
594			.iter()
595			.map(|key| key_to_id.get(*key).and_then(|key_id| id_to_value.remove(key_id)))
596			.collect())
597	}
598
599	/// Get all locally-modified keys matching a prefix that existed at a specific block.
600	///
601	/// Joins `local_keys` with `local_values` to find keys where:
602	/// - The key starts with `prefix`
603	/// - A value entry is valid at `block_number` (valid_from <= block AND (valid_until IS NULL OR
604	///   valid_until > block))
605	/// - The value is not NULL (i.e., the key was not deleted at that block)
606	///
607	/// Returns a sorted list of matching keys.
608	pub async fn get_local_keys_at_block(
609		&self,
610		prefix: &[u8],
611		block_number: u32,
612	) -> Result<Vec<Vec<u8>>, CacheError> {
613		use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
614
615		let mut conn = self.get_conn().await?;
616		let block_num = block_number as i64;
617		let prefix_vec = prefix.to_vec();
618
619		let mut query = local_keys::table
620			.inner_join(local_values::table)
621			.filter(lkc::key.ge(&prefix_vec))
622			.filter(lvc::valid_from.le(block_num))
623			.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
624			.filter(lvc::value.is_not_null())
625			.select(lkc::key)
626			.distinct()
627			.order(lkc::key.asc())
628			.into_boxed();
629
630		if let Some(upper) = Self::prefix_upper_bound(prefix) {
631			query = query.filter(lkc::key.lt(upper));
632		}
633
634		let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
635		Ok(keys)
636	}
637
638	/// Get all locally-deleted keys matching a prefix at a specific block.
639	///
640	/// Returns keys where the value entry valid at `block_number` has a NULL value
641	/// (explicitly deleted). This is needed to exclude deleted keys from merged
642	/// remote + local key enumeration.
643	pub async fn get_local_deleted_keys_at_block(
644		&self,
645		prefix: &[u8],
646		block_number: u32,
647	) -> Result<Vec<Vec<u8>>, CacheError> {
648		use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
649
650		let mut conn = self.get_conn().await?;
651		let block_num = block_number as i64;
652		let prefix_vec = prefix.to_vec();
653
654		let mut query = local_keys::table
655			.inner_join(local_values::table)
656			.filter(lkc::key.ge(&prefix_vec))
657			.filter(lvc::valid_from.le(block_num))
658			.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
659			.filter(lvc::value.is_null())
660			.select(lkc::key)
661			.distinct()
662			.order(lkc::key.asc())
663			.into_boxed();
664
665		if let Some(upper) = Self::prefix_upper_bound(prefix) {
666			query = query.filter(lkc::key.lt(upper));
667		}
668
669		let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
670		Ok(keys)
671	}
672
673	/// Compute the exclusive upper bound for a binary prefix range query.
674	///
675	/// Increments the prefix to produce the first byte sequence that does NOT
676	/// start with `prefix`. Returns `None` if the prefix is all `0xFF` bytes
677	/// (no upper bound needed, `>=` is sufficient).
678	fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
679		let mut upper = prefix.to_vec();
680		// Walk backwards, incrementing the last non-0xFF byte.
681		while let Some(last) = upper.last_mut() {
682			if *last < 0xFF {
683				*last += 1;
684				return Some(upper);
685			}
686			upper.pop();
687		}
688		// All bytes were 0xFF, no upper bound possible.
689		None
690	}
691
692	/// Insert a new local value entry.
693	///
694	/// # Arguments
695	/// * `key_id` - The key ID from local_keys table
696	/// * `value` - The value bytes, or None to record a deletion
697	/// * `valid_from` - Block number when this value becomes valid
698	pub async fn insert_local_value(
699		&self,
700		key_id: i32,
701		value: Option<&[u8]>,
702		valid_from: u32,
703	) -> Result<(), CacheError> {
704		let mut attempts = 0;
705		loop {
706			let mut conn = self.get_conn().await?;
707
708			let row = NewLocalValueRow {
709				key_id,
710				value: value.map(|v| v.to_vec()),
711				valid_from: valid_from as i64,
712				valid_until: None,
713			};
714
715			let res =
716				diesel::insert_into(local_values::table).values(&row).execute(&mut conn).await;
717
718			match res {
719				Ok(_) => return Ok(()),
720				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
721					retry_conn(&mut attempts).await;
722					continue;
723				},
724				Err(e) => return Err(e.into()),
725			}
726		}
727	}
728
729	/// Close the currently open local value entry (set valid_until).
730	///
731	/// Finds the entry for this key_id where valid_until IS NULL and sets it.
732	pub async fn close_local_value(&self, key_id: i32, valid_until: u32) -> Result<(), CacheError> {
733		use crate::schema::local_values::columns as lvc;
734
735		let mut attempts = 0;
736		loop {
737			let mut conn = self.get_conn().await?;
738
739			let res = diesel::update(
740				local_values::table
741					.filter(lvc::key_id.eq(key_id))
742					.filter(lvc::valid_until.is_null()),
743			)
744			.set(lvc::valid_until.eq(Some(valid_until as i64)))
745			.execute(&mut conn)
746			.await;
747
748			match res {
749				Ok(_) => return Ok(()),
750				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
751					retry_conn(&mut attempts).await;
752					continue;
753				},
754				Err(e) => return Err(e.into()),
755			}
756		}
757	}
758
759	/// Commit a batch of local storage changes in a single transaction.
760	///
761	/// For each entry: upserts the key into `local_keys`, closes the previous value
762	/// (sets `valid_until`), and inserts the new value. Wrapping everything in one
763	/// transaction avoids per-operation fsync overhead, reducing commit time from
764	/// tens of seconds to sub-second.
765	pub async fn commit_local_changes(
766		&self,
767		entries: &[(&[u8], Option<&[u8]>)],
768		block_number: u32,
769	) -> Result<(), CacheError> {
770		use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
771
772		if entries.is_empty() {
773			return Ok(());
774		}
775
776		// Clone entries into owned data so they can move into the async closure.
777		let owned: Vec<(Vec<u8>, Option<Vec<u8>>)> =
778			entries.iter().map(|(k, v)| (k.to_vec(), v.map(|val| val.to_vec()))).collect();
779
780		let mut attempts = 0;
781		loop {
782			let owned = owned.clone();
783			let mut conn = self.get_conn().await?;
784
785			let res = conn
786				.transaction::<_, DieselError, _>(move |conn| {
787					Box::pin(async move {
788						for (key, value) in &owned {
789							// Upsert key
790							diesel::insert_into(local_keys::table)
791								.values(NewLocalKeyRow { key })
792								.on_conflict(lkc::key)
793								.do_nothing()
794								.execute(conn)
795								.await?;
796
797							let key_id: i32 = local_keys::table
798								.filter(lkc::key.eq(key.as_slice()))
799								.select(lkc::id)
800								.first(conn)
801								.await?;
802
803							// Close previous value
804							diesel::update(
805								local_values::table
806									.filter(lvc::key_id.eq(key_id))
807									.filter(lvc::valid_until.is_null()),
808							)
809							.set(lvc::valid_until.eq(Some(block_number as i64)))
810							.execute(conn)
811							.await?;
812
813							// Insert new value
814							let row = NewLocalValueRow {
815								key_id,
816								value: value.clone(),
817								valid_from: block_number as i64,
818								valid_until: None,
819							};
820							diesel::insert_into(local_values::table)
821								.values(&row)
822								.execute(conn)
823								.await?;
824						}
825						Ok(())
826					})
827				})
828				.await;
829
830			match res {
831				Ok(_) => return Ok(()),
832				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
833					retry_conn(&mut attempts).await;
834					continue;
835				},
836				Err(e) => return Err(e.into()),
837			}
838		}
839	}
840
841	/// Clear all local storage data (both local_keys and local_values tables).
842	///
843	/// This removes all locally tracked key-value pairs and their validity history.
844	/// Uses a transaction to ensure both tables are cleared atomically.
845	pub async fn clear_local_storage(&self) -> Result<(), CacheError> {
846		let mut attempts = 0;
847		loop {
848			let mut conn = self.get_conn().await?;
849
850			let res = conn
851				.transaction::<_, DieselError, _>(|conn| {
852					Box::pin(async move {
853						// Delete local_values first (has foreign key to local_keys)
854						diesel::delete(local_values::table).execute(conn).await?;
855						// Then delete local_keys
856						diesel::delete(local_keys::table).execute(conn).await?;
857						Ok(())
858					})
859				})
860				.await;
861
862			match res {
863				Ok(_) => return Ok(()),
864				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
865					retry_conn(&mut attempts).await;
866					continue;
867				},
868				Err(e) => return Err(e.into()),
869			}
870		}
871	}
872
873	/// Cache block metadata.
874	pub async fn cache_block(
875		&self,
876		hash: H256,
877		number: u32,
878		parent_hash: H256,
879		header: &[u8],
880	) -> Result<(), CacheError> {
881		// Store block metadata for quick lookup without hitting the remote RPC.
882		use crate::schema::blocks::columns as bc;
883
884		// Retry loop for transient SQLite lock/busy errors.
885		// SQLite may return SQLITE_BUSY when another connection holds a lock.
886		// We retry up to MAX_LOCK_RETRIES (30) times with increasing backoff delays.
887		let mut attempts = 0;
888		let parent_hash_bytes = parent_hash.as_bytes();
889		loop {
890			let mut conn = self.get_conn().await?;
891
892			let block = NewBlockRow {
893				hash: hash.as_bytes(),
894				number: number as i64,
895				parent_hash: parent_hash_bytes,
896				header,
897			};
898
899			let res = diesel::insert_into(blocks::table)
900				.values(&block)
901				.on_conflict(bc::hash)
902				.do_update()
903				.set((
904					bc::number.eq(number as i64),
905					bc::parent_hash.eq(parent_hash_bytes),
906					bc::header.eq(header),
907				))
908				.execute(&mut conn)
909				.await;
910
911			match res {
912				Ok(_) => return Ok(()),
913				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
914					retry_conn(&mut attempts).await;
915					continue;
916				},
917				Err(e) => return Err(e.into()),
918			}
919		}
920	}
921
922	/// Get cached block metadata.
923	pub async fn get_block(&self, hash: H256) -> Result<Option<BlockRow>, CacheError> {
924		// Retrieve all block metadata fields by the block's hash (primary key).
925		// Returns None if the block hasn't been cached yet.
926		use crate::schema::blocks::columns as bc;
927
928		let mut conn = self.get_conn().await?;
929
930		let row = blocks::table
931			.filter(bc::hash.eq(hash.as_bytes()))
932			.select(BlockRow::as_select())
933			.first(&mut conn)
934			.await
935			.optional()?;
936
937		match row {
938			// Sanity check on the block number, as we use i64 to represent them in SQLite but
939			// Substrate blocks are u32
940			Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
941				Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
942			row @ Some(_) => Ok(row),
943			None => Ok(None),
944		}
945	}
946
947	/// Get cached block metadata by block number.
948	pub async fn get_block_by_number(
949		&self,
950		block_number: u32,
951	) -> Result<Option<BlockRow>, CacheError> {
952		// Retrieve block metadata by block number.
953		// Returns None if the block hasn't been cached yet.
954		use crate::schema::blocks::columns as bc;
955
956		let mut conn = self.get_conn().await?;
957
958		let row = blocks::table
959			.filter(bc::number.eq(block_number as i64))
960			.select(BlockRow::as_select())
961			.first(&mut conn)
962			.await
963			.optional()?;
964
965		match row {
966			// Sanity check on the block number
967			Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
968				Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
969			row @ Some(_) => Ok(row),
970			None => Ok(None),
971		}
972	}
973
974	/// Clear all cached data for a specific block.
975	pub async fn clear_block(&self, hash: H256) -> Result<(), CacheError> {
976		// Use a transaction to ensure both deletes succeed or fail together.
977		// This maintains consistency: we never have orphaned storage entries
978		// without their parent block, or vice versa.
979		use crate::schema::{
980			blocks::columns as bc, prefix_scans::columns as psc, storage::columns as sc,
981		};
982		let block_hash = Arc::new(hash.as_bytes());
983
984		// Retry loop for transient SQLite lock/busy errors.
985		// SQLite may return SQLITE_BUSY when another connection holds a lock.
986		// We retry up to MAX_LOCK_RETRIES (30) times with increasing backoff delays.
987		let mut attempts = 0;
988		loop {
989			let block_hash = Arc::clone(&block_hash);
990			let mut conn = self.get_conn().await?;
991
992			let res = conn
993				.transaction::<_, DieselError, _>(move |conn| {
994					Box::pin(async move {
995						diesel::delete(storage::table.filter(sc::block_hash.eq(*block_hash)))
996							.execute(conn)
997							.await?;
998						diesel::delete(blocks::table.filter(bc::hash.eq(*block_hash)))
999							.execute(conn)
1000							.await?;
1001						diesel::delete(prefix_scans::table.filter(psc::block_hash.eq(*block_hash)))
1002							.execute(conn)
1003							.await?;
1004						Ok(())
1005					})
1006				})
1007				.await;
1008
1009			match res {
1010				Ok(_) => return Ok(()),
1011				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
1012					retry_conn(&mut attempts).await;
1013					continue;
1014				},
1015				Err(e) => return Err(e.into()),
1016			}
1017		}
1018	}
1019
1020	/// Get the progress of a prefix scan operation.
1021	///
1022	/// # Returns
1023	/// * `Ok(Some(progress))` - Scan has been started, returns progress info
1024	/// * `Ok(None)` - No scan has been started for this prefix
1025	pub async fn get_prefix_scan_progress(
1026		&self,
1027		block_hash: H256,
1028		prefix: &[u8],
1029	) -> Result<Option<PrefixScanProgress>, CacheError> {
1030		use crate::schema::prefix_scans::columns as psc;
1031
1032		let mut conn = self.get_conn().await?;
1033
1034		let row: Option<(Option<Vec<u8>>, bool)> = prefix_scans::table
1035			.filter(psc::block_hash.eq(block_hash.as_bytes()))
1036			.filter(psc::prefix.eq(prefix))
1037			.select((psc::last_scanned_key, psc::is_complete))
1038			.first::<(Option<Vec<u8>>, bool)>(&mut conn)
1039			.await
1040			.optional()?;
1041
1042		Ok(row.map(|(last_key, complete)| PrefixScanProgress {
1043			last_scanned_key: last_key,
1044			is_complete: complete,
1045		}))
1046	}
1047
1048	/// Update the progress of a prefix scan operation (upsert).
1049	///
1050	/// Creates a new progress record or updates an existing one. Uses SQLite's
1051	/// `ON CONFLICT DO UPDATE` for atomic upsert semantics.
1052	///
1053	/// # Arguments
1054	/// * `block_hash` - The block hash being scanned
1055	/// * `prefix` - The storage prefix being scanned
1056	/// * `last_key` - The last key that was processed
1057	/// * `is_complete` - Whether the scan has finished
1058	pub async fn update_prefix_scan(
1059		&self,
1060		block_hash: H256,
1061		prefix: &[u8],
1062		last_key: &[u8],
1063		is_complete: bool,
1064	) -> Result<(), CacheError> {
1065		use crate::schema::prefix_scans::columns as psc;
1066		use diesel::upsert::excluded;
1067
1068		let new_row = NewPrefixScanRow {
1069			block_hash: block_hash.as_bytes(),
1070			prefix,
1071			last_scanned_key: Some(last_key),
1072			is_complete,
1073		};
1074
1075		let mut attempts = 0;
1076		loop {
1077			let mut conn = self.get_conn().await?;
1078			let res = diesel::insert_into(prefix_scans::table)
1079				.values(&new_row)
1080				.on_conflict((psc::block_hash, psc::prefix))
1081				.do_update()
1082				.set((
1083					psc::last_scanned_key.eq(excluded(psc::last_scanned_key)),
1084					psc::is_complete.eq(excluded(psc::is_complete)),
1085				))
1086				.execute(&mut conn)
1087				.await;
1088
1089			match res {
1090				Ok(_) => return Ok(()),
1091				Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
1092					retry_conn(&mut attempts).await;
1093					continue;
1094				},
1095				Err(e) => return Err(e.into()),
1096			}
1097		}
1098	}
1099
1100	/// Get all cached keys matching a prefix.
1101	///
1102	/// Uses a range query (`key >= prefix AND key < prefix+1`) for efficient
1103	/// prefix matching on SQLite's B-tree index. This is more performant than
1104	/// `LIKE` or `GLOB` patterns for binary key prefixes.
1105	pub async fn get_keys_by_prefix(
1106		&self,
1107		block_hash: H256,
1108		prefix: &[u8],
1109	) -> Result<Vec<Vec<u8>>, CacheError> {
1110		use crate::schema::storage::columns as sc;
1111
1112		let mut conn = self.get_conn().await?;
1113
1114		// SQLite BLOB comparison with >= and < for prefix range
1115		let prefix_end = increment_prefix(prefix);
1116
1117		let mut query = storage::table
1118			.filter(sc::block_hash.eq(block_hash.as_bytes()))
1119			.filter(sc::key.ge(prefix))
1120			.select(sc::key)
1121			.into_boxed();
1122
1123		if let Some(ref end) = prefix_end {
1124			query = query.filter(sc::key.lt(end));
1125		}
1126
1127		Ok(query.load::<Vec<u8>>(&mut conn).await?)
1128	}
1129
1130	/// Find the next cached key after `key` that matches `prefix`.
1131	///
1132	/// Uses a range query (`key > current AND key >= prefix AND key < prefix+1`)
1133	/// for efficient lookup on SQLite's B-tree index.
1134	///
1135	/// # Returns
1136	/// * `Ok(Some(next_key))` - The next key after `key` matching the prefix
1137	/// * `Ok(None)` - No more keys with this prefix after `key`
1138	pub async fn next_key_from_cache(
1139		&self,
1140		block_hash: H256,
1141		prefix: &[u8],
1142		key: &[u8],
1143	) -> Result<Option<Vec<u8>>, CacheError> {
1144		use crate::schema::storage::columns as sc;
1145
1146		let mut conn = self.get_conn().await?;
1147		let prefix_end = increment_prefix(prefix);
1148
1149		let mut query = storage::table
1150			.filter(sc::block_hash.eq(block_hash.as_bytes()))
1151			.filter(sc::key.gt(key))
1152			.filter(sc::key.ge(prefix))
1153			.filter(sc::is_empty.eq(false))
1154			.select(sc::key)
1155			.order(sc::key.asc())
1156			.limit(1)
1157			.into_boxed();
1158
1159		if let Some(ref end) = prefix_end {
1160			query = query.filter(sc::key.lt(end));
1161		}
1162
1163		Ok(query.first::<Vec<u8>>(&mut conn).await.optional()?)
1164	}
1165
1166	/// Count cached keys matching a prefix.
1167	///
1168	/// Uses the same range query strategy as [`Self::get_keys_by_prefix`] for
1169	/// efficient counting without loading key data.
1170	pub async fn count_keys_by_prefix(
1171		&self,
1172		block_hash: H256,
1173		prefix: &[u8],
1174	) -> Result<usize, CacheError> {
1175		use crate::schema::storage::columns as sc;
1176
1177		let mut conn = self.get_conn().await?;
1178		let prefix_end = increment_prefix(prefix);
1179
1180		let mut query = storage::table
1181			.filter(sc::block_hash.eq(block_hash.as_bytes()))
1182			.filter(sc::key.ge(prefix))
1183			.into_boxed();
1184
1185		if let Some(ref end) = prefix_end {
1186			query = query.filter(sc::key.lt(end));
1187		}
1188
1189		let count: i64 = query.count().get_result(&mut conn).await?;
1190
1191		Ok(count as usize)
1192	}
1193}
1194
1195/// Increment a byte slice to get the exclusive upper bound for prefix queries.
1196/// Returns None if the prefix is all 0xFF bytes (no upper bound needed).
1197fn increment_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
1198	let mut result = prefix.to_vec();
1199	// Find the rightmost byte that isn't 0xFF and increment it
1200	for i in (0..result.len()).rev() {
1201		if result[i] < 0xFF {
1202			result[i] += 1;
1203			result.truncate(i + 1);
1204			return Some(result);
1205		}
1206	}
1207	// All bytes were 0xFF, no upper bound
1208	None
1209}
1210
1211fn is_locked_error(e: &DieselError) -> bool {
1212	match e {
1213		DieselError::DatabaseError(_, info) => {
1214			let msg = info.message().to_ascii_lowercase();
1215			msg.contains(lock_patterns::DATABASE_IS_LOCKED) || msg.contains(lock_patterns::BUSY)
1216		},
1217		_ => false,
1218	}
1219}
1220
1221// Embed Diesel migrations located at `crates/pop-fork/migrations`
1222pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
1223
1224#[cfg(test)]
1225mod tests {
1226	use super::*;
1227
1228	#[tokio::test(flavor = "multi_thread")]
1229	async fn in_memory_cache_works() {
1230		let cache = StorageCache::in_memory().await.unwrap();
1231
1232		let block_hash = H256::from([1u8; 32]);
1233		let key = b"test_key";
1234		let value = b"test_value";
1235
1236		// Initially not cached
1237		assert!(cache.get_storage(block_hash, key).await.unwrap().is_none());
1238
1239		// Set a value
1240		cache.set_storage(block_hash, key, Some(value)).await.unwrap();
1241
1242		// Now cached with value
1243		let cached = cache.get_storage(block_hash, key).await.unwrap();
1244		assert_eq!(cached, Some(Some(value.to_vec())));
1245	}
1246
1247	#[tokio::test(flavor = "multi_thread")]
1248	async fn cache_empty_value() {
1249		let cache = StorageCache::in_memory().await.unwrap();
1250
1251		let block_hash = H256::from([2u8; 32]);
1252		let key = b"empty_key";
1253
1254		// Set as empty (key exists but no value)
1255		cache.set_storage(block_hash, key, None).await.unwrap();
1256
1257		// Cached as empty
1258		let cached = cache.get_storage(block_hash, key).await.unwrap();
1259		assert_eq!(cached, Some(None));
1260	}
1261
1262	#[tokio::test(flavor = "multi_thread")]
1263	async fn batch_operations() {
1264		let cache = StorageCache::in_memory().await.unwrap();
1265
1266		let block_hash = H256::from([3u8; 32]);
1267		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1268			(b"key1", Some(b"value1")),
1269			(b"key2", Some(b"value2")),
1270			(b"key3", None), // empty
1271		];
1272
1273		// Batch set
1274		cache.set_storage_batch(block_hash, &entries).await.unwrap();
1275
1276		// Batch get
1277		let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key3", b"key4"];
1278		let results = cache.get_storage_batch(block_hash, &keys).await.unwrap();
1279
1280		assert_eq!(results.len(), 4);
1281		assert_eq!(results[0], Some(Some(b"value1".to_vec())));
1282		assert_eq!(results[1], Some(Some(b"value2".to_vec())));
1283		assert_eq!(results[2], Some(None)); // empty
1284		assert_eq!(results[3], None); // not cached
1285	}
1286
1287	#[tokio::test(flavor = "multi_thread")]
1288	async fn block_caching() {
1289		let cache = StorageCache::in_memory().await.unwrap();
1290
1291		let hash = H256::from([4u8; 32]);
1292		let parent_hash = H256::from([3u8; 32]);
1293		let header = b"mock_header_data";
1294
1295		// Cache block
1296		cache.cache_block(hash, 100, parent_hash, header).await.unwrap();
1297
1298		// Get block
1299		let block = cache.get_block(hash).await.unwrap().unwrap();
1300		assert_eq!(block.hash, hash.as_bytes().to_vec());
1301		assert_eq!(block.number, 100i64);
1302		assert_eq!(block.parent_hash, parent_hash.as_bytes().to_vec());
1303		assert_eq!(block.header, header.to_vec());
1304	}
1305
1306	#[tokio::test(flavor = "multi_thread")]
1307	async fn get_block_with_non_cached_block() {
1308		let cache = StorageCache::in_memory().await.unwrap();
1309
1310		let hash = H256::from([4u8; 32]);
1311
1312		// Get block
1313		let block = cache.get_block(hash).await.unwrap();
1314
1315		assert!(block.is_none());
1316	}
1317
1318	#[tokio::test(flavor = "multi_thread")]
1319	async fn get_block_number_corrupted_block_number_fails() {
1320		let cache = StorageCache::in_memory().await.unwrap();
1321
1322		let hash1 = H256::from([4u8; 32]);
1323		let hash2 = H256::from([5u8; 32]);
1324		let parent_hash = H256::from([3u8; 32]);
1325		let header = b"mock_header_data";
1326
1327		// Manually insert invalid block with negative number directly into database
1328		let invalid_block1 = NewBlockRow {
1329			hash: hash1.as_bytes(),
1330			number: -1, // Invalid: below 0
1331			parent_hash: parent_hash.as_bytes(),
1332			header,
1333		};
1334
1335		// Manually insert invalid block with number above the u32 maximum directly into database
1336		let invalid_block2 = NewBlockRow {
1337			hash: hash2.as_bytes(),
1338			number: u32::MAX as i64 + 1,
1339			parent_hash: parent_hash.as_bytes(),
1340			header,
1341		};
1342
1343		// Insert directly into the database bypassing validation
1344		match &cache.inner {
1345			StorageConn::Single(m) => {
1346				let mut conn = m.lock().await;
1347				for block in [invalid_block1, invalid_block2] {
1348					diesel::insert_into(blocks::table)
1349						.values(&block)
1350						.execute(&mut *conn)
1351						.await
1352						.unwrap();
1353				}
1354			},
1355			_ => unreachable!("Test single connection; qed;"),
1356		}
1357
1358		// Get block should fail with DataCorruption error
1359		assert!(
1360			matches!(cache.get_block(hash1).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
1361		);
1362		assert!(
1363			matches!(cache.get_block(hash2).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
1364		);
1365	}
1366
1367	#[tokio::test(flavor = "multi_thread")]
1368	async fn different_blocks_have_separate_storage() {
1369		let cache = StorageCache::in_memory().await.unwrap();
1370
1371		let block1 = H256::from([5u8; 32]);
1372		let block2 = H256::from([6u8; 32]);
1373		let key = b"same_key";
1374
1375		cache.set_storage(block1, key, Some(b"value1")).await.unwrap();
1376		cache.set_storage(block2, key, Some(b"value2")).await.unwrap();
1377
1378		let cached1 = cache.get_storage(block1, key).await.unwrap();
1379		let cached2 = cache.get_storage(block2, key).await.unwrap();
1380
1381		assert_eq!(cached1, Some(Some(b"value1".to_vec())));
1382		assert_eq!(cached2, Some(Some(b"value2".to_vec())));
1383	}
1384
1385	#[tokio::test(flavor = "multi_thread")]
1386	async fn clear_block_removes_data() {
1387		let cache = StorageCache::in_memory().await.unwrap();
1388
1389		let hash = H256::from([7u8; 32]);
1390		let parent_hash = H256::from([6u8; 32]);
1391		let key = b"test_key";
1392
1393		cache.set_storage(hash, key, Some(b"value")).await.unwrap();
1394		cache.cache_block(hash, 50, parent_hash, b"header").await.unwrap();
1395
1396		// Data exists
1397		assert!(cache.get_storage(hash, key).await.unwrap().is_some());
1398		assert!(cache.get_block(hash).await.unwrap().is_some());
1399
1400		// Clear
1401		cache.clear_block(hash).await.unwrap();
1402
1403		// Data removed
1404		assert!(cache.get_storage(hash, key).await.unwrap().is_none());
1405		assert!(cache.get_block(hash).await.unwrap().is_none());
1406	}
1407
1408	#[tokio::test(flavor = "multi_thread")]
1409	async fn file_persistence() {
1410		let temp_dir = tempfile::tempdir().unwrap();
1411		let db_path = temp_dir.path().join("test_cache.db");
1412
1413		let block_hash = H256::from([8u8; 32]);
1414		let key = b"persistent_key";
1415		let value = b"persistent_value";
1416
1417		// Write and close
1418		{
1419			let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1420			cache.set_storage(block_hash, key, Some(value)).await.unwrap();
1421		}
1422
1423		// Reopen and verify
1424		{
1425			let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1426			let cached = cache.get_storage(block_hash, key).await.unwrap();
1427			assert_eq!(cached, Some(Some(value.to_vec())));
1428		}
1429	}
1430
1431	#[tokio::test(flavor = "multi_thread")]
1432	async fn concurrent_access() {
1433		let temp_dir = tempfile::tempdir().unwrap();
1434		let db_path = temp_dir.path().join("concurrent_test.db");
1435		let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1436
1437		let block_hash = H256::from([9u8; 32]);
1438
1439		// Spawn multiple concurrent write tasks
1440		// StorageCache is cheap to clone (just increments pool's reference count)
1441		let mut handles = vec![];
1442		for i in 0..10u8 {
1443			let cache = cache.clone();
1444			let handle = tokio::spawn(async move {
1445				let key = format!("key_{}", i);
1446				let value = format!("value_{}", i);
1447				cache.set_storage(block_hash, key.as_bytes(), Some(value.as_bytes())).await
1448			});
1449			handles.push(handle);
1450		}
1451
1452		// Wait for all writes to complete
1453		for handle in handles {
1454			handle.await.unwrap().unwrap();
1455		}
1456
1457		// Spawn concurrent read tasks
1458		let mut read_handles = vec![];
1459		for i in 0..10u8 {
1460			let cache = cache.clone();
1461			let handle = tokio::spawn(async move {
1462				let key = format!("key_{}", i);
1463				cache.get_storage(block_hash, key.as_bytes()).await
1464			});
1465			read_handles.push((i, handle));
1466		}
1467
1468		// Verify all reads return correct values
1469		for (i, handle) in read_handles {
1470			let result = handle.await.unwrap().unwrap();
1471			let expected_value = format!("value_{}", i);
1472			assert_eq!(result, Some(Some(expected_value.into_bytes())));
1473		}
1474
1475		// Test concurrent batch operations
1476		let cache1 = cache.clone();
1477		let cache2 = cache.clone();
1478		let block_hash2 = H256::from([10u8; 32]);
1479
1480		let batch_handle1 = tokio::spawn(async move {
1481			let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
1482			let values: Vec<Vec<u8>> = (0..5).map(|i| vec![i]).collect();
1483			let entries: Vec<(&[u8], Option<&[u8]>)> = keys
1484				.iter()
1485				.zip(values.iter())
1486				.map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
1487				.collect();
1488			cache1.set_storage_batch(block_hash2, &entries).await
1489		});
1490
1491		let batch_handle2 = tokio::spawn(async move {
1492			let keys: Vec<Vec<u8>> =
1493				(5..10).map(|i| format!("batch2_{}", i).into_bytes()).collect();
1494			let values: Vec<Vec<u8>> = (5..10).map(|i| vec![i]).collect();
1495			let entries: Vec<(&[u8], Option<&[u8]>)> = keys
1496				.iter()
1497				.zip(values.iter())
1498				.map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
1499				.collect();
1500			cache2.set_storage_batch(block_hash2, &entries).await
1501		});
1502
1503		batch_handle1.await.unwrap().unwrap();
1504		batch_handle2.await.unwrap().unwrap();
1505
1506		// Verify batch results
1507		let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
1508		let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
1509		let results = cache.get_storage_batch(block_hash2, &key_refs).await.unwrap();
1510		for (i, result) in results.iter().enumerate() {
1511			assert_eq!(*result, Some(Some(vec![i as u8])));
1512		}
1513	}
1514
1515	#[tokio::test(flavor = "multi_thread")]
1516	async fn get_storage_batch_with_duplicate_keys() {
1517		let cache = StorageCache::in_memory().await.unwrap();
1518
1519		let block_hash = H256::from([11u8; 32]);
1520		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1521			(b"key1", Some(b"value1")),
1522			(b"key2", Some(b"value2")),
1523			(b"key3", Some(b"value3")),
1524		];
1525
1526		// Set up some values
1527		cache.set_storage_batch(block_hash, &entries).await.unwrap();
1528
1529		// Query with duplicate keys - key1 appears twice, key2 appears three times
1530		let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key1", b"key3", b"key2", b"key2"];
1531		let results = cache.get_storage_batch(block_hash, &keys).await;
1532
1533		assert!(matches!(results, Err(CacheError::DuplicatedKeys)));
1534	}
1535
1536	#[tokio::test(flavor = "multi_thread")]
1537	async fn set_storage_batch_with_duplicate_keys() {
1538		let cache = StorageCache::in_memory().await.unwrap();
1539
1540		let block_hash = H256::from([12u8; 32]);
1541
1542		// Set batch with duplicate keys - last value should win
1543		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1544			(b"key1", Some(b"first_value")),
1545			(b"key2", Some(b"value2")),
1546			(b"key1", Some(b"second_value")), // duplicate key1
1547			(b"key3", Some(b"value3")),
1548			(b"key1", Some(b"final_value")), // another duplicate key1
1549		];
1550
1551		let result = cache.set_storage_batch(block_hash, &entries).await;
1552		assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
1553	}
1554
1555	#[tokio::test(flavor = "multi_thread")]
1556	async fn prefix_scan_progress_tracking() {
1557		let cache = StorageCache::in_memory().await.unwrap();
1558		let block_hash = H256::from([11u8; 32]);
1559		let prefix = b"balances:";
1560
1561		// Initially no progress
1562		let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1563		assert!(progress.is_none());
1564
1565		// Update progress with a partial scan
1566		let last_key = b"balances:account123";
1567		cache.update_prefix_scan(block_hash, prefix, last_key, false).await.unwrap();
1568
1569		// Progress should now exist
1570		let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1571		assert!(progress.is_some());
1572		let p = progress.unwrap();
1573		assert_eq!(p.last_scanned_key, Some(last_key.to_vec()));
1574		assert!(!p.is_complete);
1575
1576		// Update to complete
1577		let final_key = b"balances:zzz";
1578		cache.update_prefix_scan(block_hash, prefix, final_key, true).await.unwrap();
1579
1580		let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1581		let p = progress.unwrap();
1582		assert_eq!(p.last_scanned_key, Some(final_key.to_vec()));
1583		assert!(p.is_complete);
1584	}
1585
1586	#[tokio::test(flavor = "multi_thread")]
1587	async fn prefix_scan_different_blocks_separate() {
1588		let cache = StorageCache::in_memory().await.unwrap();
1589		let block1 = H256::from([12u8; 32]);
1590		let block2 = H256::from([13u8; 32]);
1591		let prefix = b"system:";
1592
1593		// Set progress on block1 only
1594		cache.update_prefix_scan(block1, prefix, b"system:key1", true).await.unwrap();
1595
1596		// Block1 has progress
1597		let p1 = cache.get_prefix_scan_progress(block1, prefix).await.unwrap();
1598		assert!(p1.is_some());
1599		assert!(p1.unwrap().is_complete);
1600
1601		// Block2 has no progress
1602		let p2 = cache.get_prefix_scan_progress(block2, prefix).await.unwrap();
1603		assert!(p2.is_none());
1604	}
1605
1606	#[tokio::test(flavor = "multi_thread")]
1607	async fn get_keys_by_prefix_works() {
1608		let cache = StorageCache::in_memory().await.unwrap();
1609		let block_hash = H256::from([14u8; 32]);
1610
1611		// Insert keys with different prefixes
1612		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1613			(b"tokens:alice", Some(b"100")),
1614			(b"tokens:bob", Some(b"200")),
1615			(b"tokens:charlie", Some(b"300")),
1616			(b"balances:alice", Some(b"50")),
1617			(b"balances:bob", Some(b"75")),
1618		];
1619		cache.set_storage_batch(block_hash, &entries).await.unwrap();
1620
1621		// Get keys with "tokens:" prefix
1622		let token_keys = cache.get_keys_by_prefix(block_hash, b"tokens:").await.unwrap();
1623		assert_eq!(token_keys.len(), 3);
1624		assert!(token_keys.contains(&b"tokens:alice".to_vec()));
1625		assert!(token_keys.contains(&b"tokens:bob".to_vec()));
1626		assert!(token_keys.contains(&b"tokens:charlie".to_vec()));
1627
1628		// Get keys with "balances:" prefix
1629		let balance_keys = cache.get_keys_by_prefix(block_hash, b"balances:").await.unwrap();
1630		assert_eq!(balance_keys.len(), 2);
1631		assert!(balance_keys.contains(&b"balances:alice".to_vec()));
1632		assert!(balance_keys.contains(&b"balances:bob".to_vec()));
1633
1634		// Get keys with non-existent prefix
1635		let empty_keys = cache.get_keys_by_prefix(block_hash, b"nonexistent:").await.unwrap();
1636		assert!(empty_keys.is_empty());
1637	}
1638
1639	#[tokio::test(flavor = "multi_thread")]
1640	async fn count_keys_by_prefix_works() {
1641		let cache = StorageCache::in_memory().await.unwrap();
1642		let block_hash = H256::from([15u8; 32]);
1643
1644		// Insert keys with different prefixes
1645		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1646			(b"prefix_a:1", Some(b"v1")),
1647			(b"prefix_a:2", Some(b"v2")),
1648			(b"prefix_a:3", Some(b"v3")),
1649			(b"prefix_b:1", Some(b"v4")),
1650		];
1651		cache.set_storage_batch(block_hash, &entries).await.unwrap();
1652
1653		assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_a:").await.unwrap(), 3);
1654		assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_b:").await.unwrap(), 1);
1655		assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_c:").await.unwrap(), 0);
1656	}
1657
1658	#[tokio::test(flavor = "multi_thread")]
1659	async fn next_key_from_cache_works() {
1660		let cache = StorageCache::in_memory().await.unwrap();
1661		let block_hash = H256::from([20u8; 32]);
1662
1663		// Insert keys with a prefix
1664		let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1665			(b"prefix:aaa", Some(b"v1")),
1666			(b"prefix:bbb", Some(b"v2")),
1667			(b"prefix:ccc", Some(b"v3")),
1668			(b"other:ddd", Some(b"v4")),
1669		];
1670		cache.set_storage_batch(block_hash, &entries).await.unwrap();
1671
1672		// Next key after "prefix:aaa" with prefix "prefix:" should be "prefix:bbb"
1673		let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:aaa").await.unwrap();
1674		assert_eq!(next, Some(b"prefix:bbb".to_vec()));
1675
1676		// Next key after "prefix:bbb" should be "prefix:ccc"
1677		let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:bbb").await.unwrap();
1678		assert_eq!(next, Some(b"prefix:ccc".to_vec()));
1679
1680		// Next key after "prefix:ccc" should be None (no more keys)
1681		let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:ccc").await.unwrap();
1682		assert!(next.is_none());
1683
1684		// Next key from the very start with prefix "prefix:" should be "prefix:aaa"
1685		let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:").await.unwrap();
1686		assert_eq!(next, Some(b"prefix:aaa".to_vec()));
1687	}
1688
1689	#[test]
1690	fn increment_prefix_works() {
1691		// Normal case
1692		assert_eq!(increment_prefix(b"abc"), Some(b"abd".to_vec()));
1693
1694		// Increment last byte
1695		assert_eq!(increment_prefix(b"ab\xff"), Some(b"ac".to_vec()));
1696
1697		// Multiple 0xff bytes
1698		assert_eq!(increment_prefix(b"a\xff\xff"), Some(b"b".to_vec()));
1699
1700		// All 0xff - no valid increment possible
1701		assert_eq!(increment_prefix(b"\xff\xff\xff"), None);
1702
1703		// Empty prefix - no increment possible
1704		assert_eq!(increment_prefix(b""), None);
1705
1706		// Single byte
1707		assert_eq!(increment_prefix(b"a"), Some(b"b".to_vec()));
1708	}
1709
1710	#[tokio::test(flavor = "multi_thread")]
1711	async fn clear_block_removes_prefix_scans() {
1712		let cache = StorageCache::in_memory().await.unwrap();
1713		let hash = H256::from([16u8; 32]);
1714		let prefix = b"test:";
1715
1716		// Set up prefix scan progress
1717		cache.update_prefix_scan(hash, prefix, b"test:key", true).await.unwrap();
1718		assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_some());
1719
1720		// Clear block
1721		cache.clear_block(hash).await.unwrap();
1722
1723		// Prefix scan progress should be removed
1724		assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_none());
1725	}
1726
1727	// Tests for local storage with validity
1728
1729	#[tokio::test(flavor = "multi_thread")]
1730	async fn get_local_key_returns_none_for_nonexistent_key() {
1731		let cache = StorageCache::in_memory().await.unwrap();
1732
1733		let result = cache.get_local_key(b"nonexistent_key").await.unwrap();
1734		assert!(result.is_none());
1735	}
1736
1737	#[tokio::test(flavor = "multi_thread")]
1738	async fn insert_local_key_creates_new_key() {
1739		let cache = StorageCache::in_memory().await.unwrap();
1740		let key = b"new_key";
1741
1742		// Insert new key
1743		let key_id = cache.insert_local_key(key).await.unwrap();
1744		assert_eq!(key_id, 1);
1745
1746		// Verify it exists
1747		let result = cache.get_local_key(key).await.unwrap();
1748		assert!(result.is_some());
1749		assert_eq!(result.unwrap().id, key_id);
1750	}
1751
1752	#[tokio::test(flavor = "multi_thread")]
1753	async fn insert_local_key_returns_existing_id() {
1754		let cache = StorageCache::in_memory().await.unwrap();
1755		let key = b"duplicate_key";
1756
1757		// Insert key twice
1758		let key_id1 = cache.insert_local_key(key).await.unwrap();
1759		let key_id2 = cache.insert_local_key(key).await.unwrap();
1760
1761		// Should return the same ID
1762		assert_eq!(key_id1, key_id2);
1763	}
1764
1765	#[tokio::test(flavor = "multi_thread")]
1766	async fn insert_and_get_local_value_at_block() {
1767		let cache = StorageCache::in_memory().await.unwrap();
1768		let key = b"test_key";
1769		let value = b"test_value";
1770
1771		// Insert key and value
1772		let key_id = cache.insert_local_key(key).await.unwrap();
1773		cache.insert_local_value(key_id, Some(value), 100).await.unwrap();
1774
1775		// Query at block 100 - should find it (valid_from = 100, valid_until = NULL)
1776		let result = cache.get_local_value_at_block(key, 100).await.unwrap();
1777		assert_eq!(result, Some(Some(value.to_vec())));
1778
1779		// Query at block 150 - should still find it (valid_until = NULL means still valid)
1780		let result = cache.get_local_value_at_block(key, 150).await.unwrap();
1781		assert_eq!(result, Some(Some(value.to_vec())));
1782
1783		// Query at block 99 - should not find it (before valid_from)
1784		let result = cache.get_local_value_at_block(key, 99).await.unwrap();
1785		assert!(result.is_none());
1786	}
1787
1788	#[tokio::test(flavor = "multi_thread")]
1789	async fn get_local_value_at_block_nonexistent_key() {
1790		let cache = StorageCache::in_memory().await.unwrap();
1791
1792		let result = cache.get_local_value_at_block(b"nonexistent", 100).await.unwrap();
1793		assert!(result.is_none());
1794	}
1795
1796	#[tokio::test(flavor = "multi_thread")]
1797	async fn close_local_value_sets_valid_until() {
1798		let cache = StorageCache::in_memory().await.unwrap();
1799		let key = b"closing_key";
1800		let value1 = b"value1";
1801		let value2 = b"value2";
1802
1803		// Insert key and first value at block 100
1804		let key_id = cache.insert_local_key(key).await.unwrap();
1805		cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
1806
1807		// Close at block 150 and insert new value
1808		cache.close_local_value(key_id, 150).await.unwrap();
1809		cache.insert_local_value(key_id, Some(value2), 150).await.unwrap();
1810
1811		// Query at block 120 - should get value1
1812		let result = cache.get_local_value_at_block(key, 120).await.unwrap();
1813		assert_eq!(result, Some(Some(value1.to_vec())));
1814
1815		// Query at block 150 - should get value2
1816		let result = cache.get_local_value_at_block(key, 150).await.unwrap();
1817		assert_eq!(result, Some(Some(value2.to_vec())));
1818
1819		// Query at block 200 - should get value2 (still valid)
1820		let result = cache.get_local_value_at_block(key, 200).await.unwrap();
1821		assert_eq!(result, Some(Some(value2.to_vec())));
1822	}
1823
1824	#[tokio::test(flavor = "multi_thread")]
1825	async fn get_local_values_at_block_batch_works() {
1826		let cache = StorageCache::in_memory().await.unwrap();
1827
1828		let key1 = b"batch_key1";
1829		let key2 = b"batch_key2";
1830		let key3 = b"batch_key3";
1831		let value1 = b"batch_value1";
1832		let value2 = b"batch_value2";
1833
1834		// Insert keys and values
1835		let key_id1 = cache.insert_local_key(key1).await.unwrap();
1836		let key_id2 = cache.insert_local_key(key2).await.unwrap();
1837		cache.insert_local_value(key_id1, Some(value1), 100).await.unwrap();
1838		cache.insert_local_value(key_id2, Some(value2), 100).await.unwrap();
1839
1840		// Batch query
1841		let keys: Vec<&[u8]> = vec![key1, key2, key3];
1842		let results = cache.get_local_values_at_block_batch(&keys, 100).await.unwrap();
1843
1844		assert_eq!(results.len(), 3);
1845		assert_eq!(results[0], Some(Some(value1.to_vec())));
1846		assert_eq!(results[1], Some(Some(value2.to_vec())));
1847		assert!(results[2].is_none()); // key3 doesn't exist
1848	}
1849
1850	#[tokio::test(flavor = "multi_thread")]
1851	async fn get_local_values_at_block_batch_respects_validity() {
1852		let cache = StorageCache::in_memory().await.unwrap();
1853
1854		let key = b"validity_key";
1855		let value1 = b"value_v1";
1856		let value2 = b"value_v2";
1857
1858		// Insert key and values with validity ranges
1859		let key_id = cache.insert_local_key(key).await.unwrap();
1860		cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
1861		cache.close_local_value(key_id, 200).await.unwrap();
1862		cache.insert_local_value(key_id, Some(value2), 200).await.unwrap();
1863
1864		// Query at different blocks
1865		let keys: Vec<&[u8]> = vec![key];
1866
1867		let results = cache.get_local_values_at_block_batch(&keys, 150).await.unwrap();
1868		assert_eq!(results[0], Some(Some(value1.to_vec())));
1869
1870		let results = cache.get_local_values_at_block_batch(&keys, 200).await.unwrap();
1871		assert_eq!(results[0], Some(Some(value2.to_vec())));
1872
1873		let results = cache.get_local_values_at_block_batch(&keys, 99).await.unwrap();
1874		assert!(results[0].is_none());
1875	}
1876
1877	#[tokio::test(flavor = "multi_thread")]
1878	async fn get_local_values_at_block_batch_with_duplicate_keys() {
1879		let cache = StorageCache::in_memory().await.unwrap();
1880
1881		let key = b"dup_key";
1882		let keys: Vec<&[u8]> = vec![key, key]; // duplicate
1883
1884		let result = cache.get_local_values_at_block_batch(&keys, 100).await;
1885		assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
1886	}
1887
1888	#[tokio::test(flavor = "multi_thread")]
1889	async fn clear_local_storage_removes_all_data() {
1890		let cache = StorageCache::in_memory().await.unwrap();
1891
1892		let key1 = b"clear_key1";
1893		let key2 = b"clear_key2";
1894		let value = b"some_value";
1895
1896		// Insert some data
1897		let key_id1 = cache.insert_local_key(key1).await.unwrap();
1898		let key_id2 = cache.insert_local_key(key2).await.unwrap();
1899		cache.insert_local_value(key_id1, Some(value), 100).await.unwrap();
1900		cache.insert_local_value(key_id2, Some(value), 100).await.unwrap();
1901
1902		// Verify data exists
1903		assert!(cache.get_local_key(key1).await.unwrap().is_some());
1904		assert!(cache.get_local_key(key2).await.unwrap().is_some());
1905		assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_some());
1906
1907		// Clear all local storage
1908		cache.clear_local_storage().await.unwrap();
1909
1910		// Verify data is gone
1911		assert!(cache.get_local_key(key1).await.unwrap().is_none());
1912		assert!(cache.get_local_key(key2).await.unwrap().is_none());
1913		assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_none());
1914	}
1915
1916	#[tokio::test(flavor = "multi_thread")]
1917	async fn get_local_keys_at_block_returns_live_keys() {
1918		let cache = StorageCache::in_memory().await.unwrap();
1919
1920		// Insert keys with prefix "pallet:" at different blocks.
1921		let k1 = b"pallet:alice";
1922		let k2 = b"pallet:bob";
1923		let k3 = b"other:charlie";
1924
1925		let id1 = cache.insert_local_key(k1).await.unwrap();
1926		let id2 = cache.insert_local_key(k2).await.unwrap();
1927		let id3 = cache.insert_local_key(k3).await.unwrap();
1928
1929		// k1: valid from block 100
1930		cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1931		// k2: valid from block 200
1932		cache.insert_local_value(id2, Some(b"v2"), 200).await.unwrap();
1933		// k3: valid from block 100 (different prefix)
1934		cache.insert_local_value(id3, Some(b"v3"), 100).await.unwrap();
1935
1936		// At block 150: only k1 matches "pallet:" prefix
1937		let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
1938		assert_eq!(keys, vec![k1.to_vec()]);
1939
1940		// At block 200: both k1 and k2 match
1941		let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
1942		assert_eq!(keys, vec![k1.to_vec(), k2.to_vec()]);
1943
1944		// At block 99: nothing matches (before any inserts)
1945		let keys = cache.get_local_keys_at_block(b"pallet:", 99).await.unwrap();
1946		assert!(keys.is_empty());
1947	}
1948
1949	#[tokio::test(flavor = "multi_thread")]
1950	async fn get_local_keys_at_block_excludes_deleted() {
1951		let cache = StorageCache::in_memory().await.unwrap();
1952
1953		let k1 = b"pallet:alice";
1954		let id1 = cache.insert_local_key(k1).await.unwrap();
1955
1956		// k1: value at block 100, deleted at block 200
1957		cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1958		cache.close_local_value(id1, 200).await.unwrap();
1959		cache.insert_local_value(id1, None, 200).await.unwrap();
1960
1961		// At block 150: key exists (has value)
1962		let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
1963		assert_eq!(keys, vec![k1.to_vec()]);
1964
1965		// At block 200: key was deleted (value is NULL)
1966		let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
1967		assert!(keys.is_empty());
1968	}
1969
1970	#[tokio::test(flavor = "multi_thread")]
1971	async fn get_local_deleted_keys_at_block_works() {
1972		let cache = StorageCache::in_memory().await.unwrap();
1973
1974		let k1 = b"pallet:alice";
1975		let id1 = cache.insert_local_key(k1).await.unwrap();
1976
1977		// k1: value at block 100, deleted at block 200
1978		cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1979		cache.close_local_value(id1, 200).await.unwrap();
1980		cache.insert_local_value(id1, None, 200).await.unwrap();
1981
1982		// At block 150: no deleted keys
1983		let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 150).await.unwrap();
1984		assert!(deleted.is_empty());
1985
1986		// At block 200: k1 is deleted
1987		let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 200).await.unwrap();
1988		assert_eq!(deleted, vec![k1.to_vec()]);
1989	}
1990
1991	#[tokio::test(flavor = "multi_thread")]
1992	async fn prefix_upper_bound_works() {
1993		// Normal case
1994		assert_eq!(StorageCache::prefix_upper_bound(b"abc"), Some(b"abd".to_vec()));
1995		// Trailing 0xFF
1996		assert_eq!(StorageCache::prefix_upper_bound(b"ab\xff"), Some(b"ac".to_vec()));
1997		// All 0xFF
1998		assert_eq!(StorageCache::prefix_upper_bound(b"\xff\xff"), None);
1999		// Empty prefix
2000		assert_eq!(StorageCache::prefix_upper_bound(b""), None);
2001	}
2002}