use crate::{
error::cache::CacheError,
models::{
BlockRow, LocalKeyRow, NewBlockRow, NewLocalKeyRow, NewLocalValueRow, NewPrefixScanRow,
NewStorageRow,
},
schema::{blocks, local_keys, local_values, prefix_scans, storage},
strings::cache::{errors, lock_patterns, pragmas, urls},
};
use bb8::CustomizeConnection;
use diesel::{
OptionalExtension, prelude::*, result::Error as DieselError, sqlite::SqliteConnection,
};
use diesel_async::{
AsyncConnection, AsyncMigrationHarness, RunQueryDsl,
pooled_connection::{
AsyncDieselConnectionManager, PoolError,
bb8::{Pool, PooledConnection},
},
sync_connection_wrapper::SyncConnectionWrapper,
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use std::{
collections::{HashMap, HashSet},
future::Future,
ops::{Deref, DerefMut},
path::Path,
pin::Pin,
sync::Arc,
time::Duration,
};
use subxt::config::substrate::H256;
use tokio::sync::{Mutex, MutexGuard};
const MAX_POOL_CONNECTIONS: u32 = 5;
const MAX_LOCK_RETRIES: u32 = 30;
#[derive(Debug, Clone)]
pub struct PrefixScanProgress {
pub last_scanned_key: Option<Vec<u8>>,
pub is_complete: bool,
}
#[derive(Clone, Debug)]
pub struct StorageCache {
inner: StorageConn,
}
#[derive(Clone)]
enum StorageConn {
Pool(Pool<SyncConnectionWrapper<SqliteConnection>>),
Single(Arc<Mutex<SyncConnectionWrapper<SqliteConnection>>>),
}
impl std::fmt::Debug for StorageConn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StorageConn::Pool(_) => f.debug_tuple("Pool").field(&"...").finish(),
StorageConn::Single(_) => f.debug_tuple("Single").field(&"...").finish(),
}
}
}
pub(crate) enum ConnectionGuard<'a> {
Pool(PooledConnection<'a, SyncConnectionWrapper<SqliteConnection>>),
Single(MutexGuard<'a, SyncConnectionWrapper<SqliteConnection>>),
}
impl<'a> Deref for ConnectionGuard<'a> {
type Target = SyncConnectionWrapper<SqliteConnection>;
fn deref(&self) -> &Self::Target {
match self {
ConnectionGuard::Pool(conn) => conn,
ConnectionGuard::Single(guard) => guard,
}
}
}
impl<'a> DerefMut for ConnectionGuard<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
ConnectionGuard::Pool(conn) => conn,
ConnectionGuard::Single(guard) => guard,
}
}
}
async fn retry_conn(attempts: &mut u32) {
*attempts += 1;
let delay_ms = 10u64.saturating_mul(*attempts as u64);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
#[derive(Debug, Clone, Copy)]
struct SqliteConnectionCustomizer;
impl CustomizeConnection<SyncConnectionWrapper<SqliteConnection>, PoolError>
for SqliteConnectionCustomizer
{
fn on_acquire<'a>(
&'a self,
conn: &'a mut SyncConnectionWrapper<SqliteConnection>,
) -> Pin<Box<dyn Future<Output = Result<(), PoolError>> + Send + 'a>> {
Box::pin(async move {
diesel::sql_query(pragmas::BUSY_TIMEOUT)
.execute(conn)
.await
.map_err(PoolError::QueryError)?;
Ok(())
})
}
}
impl StorageCache {
pub async fn open(maybe_path: Option<&Path>) -> Result<Self, CacheError> {
if let Some(path) = maybe_path {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let url = path.display().to_string();
{
let mut conn = SyncConnectionWrapper::<SqliteConnection>::establish(&url).await?;
diesel::sql_query(pragmas::JOURNAL_MODE_WAL).execute(&mut conn).await?;
diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
let mut harness = AsyncMigrationHarness::new(conn);
harness.run_pending_migrations(MIGRATIONS)?;
let _ = harness.into_inner();
}
let manager =
AsyncDieselConnectionManager::<SyncConnectionWrapper<SqliteConnection>>::new(url);
let pool = Pool::builder()
.max_size(MAX_POOL_CONNECTIONS)
.connection_customizer(Box::new(SqliteConnectionCustomizer))
.build(manager)
.await?;
Ok(Self { inner: StorageConn::Pool(pool) })
} else {
let mut conn =
SyncConnectionWrapper::<SqliteConnection>::establish(urls::IN_MEMORY).await?;
diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
let mut harness = AsyncMigrationHarness::new(conn);
harness.run_pending_migrations(MIGRATIONS)?;
let conn = harness.into_inner();
Ok(Self {
inner: StorageConn::Single(std::sync::Arc::new(tokio::sync::Mutex::new(conn))),
})
}
}
pub async fn in_memory() -> Result<Self, CacheError> {
Self::open(None).await
}
pub(crate) async fn get_conn(&self) -> Result<ConnectionGuard<'_>, CacheError> {
match &self.inner {
StorageConn::Pool(pool) => {
let conn = pool.get().await.map_err(|e| {
CacheError::Connection(ConnectionError::BadConnection(e.to_string()))
})?;
Ok(ConnectionGuard::Pool(conn))
},
StorageConn::Single(m) => {
let conn = m.lock().await;
Ok(ConnectionGuard::Single(conn))
},
}
}
pub async fn get_storage(
&self,
block_hash: H256,
key: &[u8],
) -> Result<Option<Option<Vec<u8>>>, CacheError> {
use crate::schema::storage::columns as sc;
let mut conn = self.get_conn().await?;
let row: Option<(Option<Vec<u8>>, bool)> = storage::table
.filter(sc::block_hash.eq(block_hash.as_bytes()))
.filter(sc::key.eq(key))
.select((sc::value, sc::is_empty))
.first::<(Option<Vec<u8>>, bool)>(&mut conn)
.await
.optional()?;
Ok(row.map(|(val, empty)| if empty { None } else { val }))
}
pub async fn set_storage(
&self,
block_hash: H256,
key: &[u8],
value: Option<&[u8]>,
) -> Result<(), CacheError> {
use crate::schema::storage::columns as sc;
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let row = NewStorageRow {
block_hash: block_hash.as_bytes(),
key,
value,
is_empty: value.is_none(),
};
let res = diesel::insert_into(storage::table)
.values(&row)
.on_conflict((sc::block_hash, sc::key))
.do_update()
.set((sc::value.eq(value), sc::is_empty.eq(row.is_empty)))
.execute(&mut conn)
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_storage_batch(
&self,
block_hash: H256,
keys: &[&[u8]],
) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
if keys.is_empty() {
return Ok(vec![]);
}
let mut seen = HashSet::with_capacity(keys.len());
if keys.iter().any(|key| !seen.insert(key)) {
return Err(CacheError::DuplicatedKeys);
}
use crate::schema::storage::columns as sc;
let mut conn = self.get_conn().await?;
let rows: Vec<(Vec<u8>, Option<Vec<u8>>, bool)> = storage::table
.filter(sc::block_hash.eq(block_hash.as_bytes()))
.filter(sc::key.eq_any(keys))
.select((sc::key, sc::value, sc::is_empty))
.load::<(Vec<u8>, Option<Vec<u8>>, bool)>(&mut conn)
.await?;
let mut cache_map = HashMap::new();
for (key, value, empty) in rows {
let value = if empty { None } else { value };
cache_map.insert(key, value);
}
Ok(keys.iter().map(|key| cache_map.remove(*key)).collect())
}
pub async fn set_storage_batch(
&self,
block_hash: H256,
entries: &[(&[u8], Option<&[u8]>)],
) -> Result<(), CacheError> {
if entries.is_empty() {
return Ok(());
}
let mut seen = HashSet::with_capacity(entries.len());
if entries.iter().any(|(key, _)| !seen.insert(key)) {
return Err(CacheError::DuplicatedKeys);
}
use crate::schema::storage::columns as sc;
let entries = Arc::new(entries);
let block_hash = Arc::new(block_hash);
let mut attempts = 0;
loop {
let entries = Arc::clone(&entries);
let block_hash = Arc::clone(&block_hash);
let mut conn = self.get_conn().await?;
let res = conn
.transaction::<_, DieselError, _>(move |conn| {
Box::pin(async move {
let new_rows: Vec<NewStorageRow> = entries
.iter()
.map(|(key, value)| NewStorageRow {
block_hash: block_hash.as_bytes(),
key,
value: *value,
is_empty: value.is_none(),
})
.collect();
for row in new_rows {
diesel::insert_into(storage::table)
.values(&row)
.on_conflict((sc::block_hash, sc::key))
.do_update()
.set((sc::value.eq(row.value), sc::is_empty.eq(row.is_empty)))
.execute(conn)
.await?;
}
Ok(())
})
})
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_local_key(&self, key: &[u8]) -> Result<Option<LocalKeyRow>, CacheError> {
use crate::schema::local_keys::columns as lkc;
let mut conn = self.get_conn().await?;
let row = local_keys::table
.filter(lkc::key.eq(key))
.select(LocalKeyRow::as_select())
.first(&mut conn)
.await
.optional()?;
Ok(row)
}
pub async fn insert_local_key(&self, key: &[u8]) -> Result<i32, CacheError> {
use crate::schema::local_keys::columns as lkc;
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let res = diesel::insert_into(local_keys::table)
.values(NewLocalKeyRow { key })
.on_conflict(lkc::key)
.do_nothing()
.execute(&mut conn)
.await;
match res {
Ok(_) => {
let key_id: i32 = local_keys::table
.filter(lkc::key.eq(key))
.select(lkc::id)
.first(&mut conn)
.await?;
return Ok(key_id);
},
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_local_value_at_block(
&self,
key: &[u8],
block_number: u32,
) -> Result<Option<Option<Vec<u8>>>, CacheError> {
use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
let mut conn = self.get_conn().await?;
let block_num = block_number as i64;
let key_id: i32 = match local_keys::table
.filter(lkc::key.eq(key))
.select(lkc::id)
.first(&mut conn)
.await
.optional()?
{
Some(id) => id,
_ => return Ok(None),
};
let value: Option<Option<Vec<u8>>> = local_values::table
.filter(lvc::key_id.eq(key_id))
.filter(lvc::valid_from.le(block_num))
.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
.select(lvc::value)
.first(&mut conn)
.await
.optional()?;
Ok(value)
}
pub async fn get_local_values_at_block_batch(
&self,
keys: &[&[u8]],
block_number: u32,
) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
if keys.is_empty() {
return Ok(vec![]);
}
let mut seen = HashSet::with_capacity(keys.len());
if keys.iter().any(|key| !seen.insert(key)) {
return Err(CacheError::DuplicatedKeys);
}
use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
let mut conn = self.get_conn().await?;
let block_num = block_number as i64;
let key_rows: Vec<LocalKeyRow> = local_keys::table
.filter(lkc::key.eq_any(keys))
.select(LocalKeyRow::as_select())
.load(&mut conn)
.await?;
let key_to_id: HashMap<Vec<u8>, i32> =
key_rows.iter().map(|r| (r.key.clone(), r.id)).collect();
let key_ids: Vec<i32> = key_to_id.values().copied().collect();
if key_ids.is_empty() {
return Ok(vec![None; keys.len()]);
}
let value_rows: Vec<(i32, Option<Vec<u8>>)> = local_values::table
.filter(lvc::key_id.eq_any(&key_ids))
.filter(lvc::valid_from.le(block_num))
.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
.select((lvc::key_id, lvc::value))
.load(&mut conn)
.await?;
let mut id_to_value: HashMap<i32, Option<Vec<u8>>> = HashMap::new();
for (key_id, value) in value_rows {
id_to_value.insert(key_id, value);
}
Ok(keys
.iter()
.map(|key| key_to_id.get(*key).and_then(|key_id| id_to_value.remove(key_id)))
.collect())
}
pub async fn get_local_keys_at_block(
&self,
prefix: &[u8],
block_number: u32,
) -> Result<Vec<Vec<u8>>, CacheError> {
use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
let mut conn = self.get_conn().await?;
let block_num = block_number as i64;
let prefix_vec = prefix.to_vec();
let mut query = local_keys::table
.inner_join(local_values::table)
.filter(lkc::key.ge(&prefix_vec))
.filter(lvc::valid_from.le(block_num))
.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
.filter(lvc::value.is_not_null())
.select(lkc::key)
.distinct()
.order(lkc::key.asc())
.into_boxed();
if let Some(upper) = Self::prefix_upper_bound(prefix) {
query = query.filter(lkc::key.lt(upper));
}
let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
Ok(keys)
}
pub async fn get_local_deleted_keys_at_block(
&self,
prefix: &[u8],
block_number: u32,
) -> Result<Vec<Vec<u8>>, CacheError> {
use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
let mut conn = self.get_conn().await?;
let block_num = block_number as i64;
let prefix_vec = prefix.to_vec();
let mut query = local_keys::table
.inner_join(local_values::table)
.filter(lkc::key.ge(&prefix_vec))
.filter(lvc::valid_from.le(block_num))
.filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
.filter(lvc::value.is_null())
.select(lkc::key)
.distinct()
.order(lkc::key.asc())
.into_boxed();
if let Some(upper) = Self::prefix_upper_bound(prefix) {
query = query.filter(lkc::key.lt(upper));
}
let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
Ok(keys)
}
fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
let mut upper = prefix.to_vec();
while let Some(last) = upper.last_mut() {
if *last < 0xFF {
*last += 1;
return Some(upper);
}
upper.pop();
}
None
}
pub async fn insert_local_value(
&self,
key_id: i32,
value: Option<&[u8]>,
valid_from: u32,
) -> Result<(), CacheError> {
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let row = NewLocalValueRow {
key_id,
value: value.map(|v| v.to_vec()),
valid_from: valid_from as i64,
valid_until: None,
};
let res =
diesel::insert_into(local_values::table).values(&row).execute(&mut conn).await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn close_local_value(&self, key_id: i32, valid_until: u32) -> Result<(), CacheError> {
use crate::schema::local_values::columns as lvc;
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let res = diesel::update(
local_values::table
.filter(lvc::key_id.eq(key_id))
.filter(lvc::valid_until.is_null()),
)
.set(lvc::valid_until.eq(Some(valid_until as i64)))
.execute(&mut conn)
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn commit_local_changes(
&self,
entries: &[(&[u8], Option<&[u8]>)],
block_number: u32,
) -> Result<(), CacheError> {
use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
if entries.is_empty() {
return Ok(());
}
let owned: Vec<(Vec<u8>, Option<Vec<u8>>)> =
entries.iter().map(|(k, v)| (k.to_vec(), v.map(|val| val.to_vec()))).collect();
let mut attempts = 0;
loop {
let owned = owned.clone();
let mut conn = self.get_conn().await?;
let res = conn
.transaction::<_, DieselError, _>(move |conn| {
Box::pin(async move {
for (key, value) in &owned {
diesel::insert_into(local_keys::table)
.values(NewLocalKeyRow { key })
.on_conflict(lkc::key)
.do_nothing()
.execute(conn)
.await?;
let key_id: i32 = local_keys::table
.filter(lkc::key.eq(key.as_slice()))
.select(lkc::id)
.first(conn)
.await?;
diesel::update(
local_values::table
.filter(lvc::key_id.eq(key_id))
.filter(lvc::valid_until.is_null()),
)
.set(lvc::valid_until.eq(Some(block_number as i64)))
.execute(conn)
.await?;
let row = NewLocalValueRow {
key_id,
value: value.clone(),
valid_from: block_number as i64,
valid_until: None,
};
diesel::insert_into(local_values::table)
.values(&row)
.execute(conn)
.await?;
}
Ok(())
})
})
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn clear_local_storage(&self) -> Result<(), CacheError> {
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let res = conn
.transaction::<_, DieselError, _>(|conn| {
Box::pin(async move {
diesel::delete(local_values::table).execute(conn).await?;
diesel::delete(local_keys::table).execute(conn).await?;
Ok(())
})
})
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn cache_block(
&self,
hash: H256,
number: u32,
parent_hash: H256,
header: &[u8],
) -> Result<(), CacheError> {
use crate::schema::blocks::columns as bc;
let mut attempts = 0;
let parent_hash_bytes = parent_hash.as_bytes();
loop {
let mut conn = self.get_conn().await?;
let block = NewBlockRow {
hash: hash.as_bytes(),
number: number as i64,
parent_hash: parent_hash_bytes,
header,
};
let res = diesel::insert_into(blocks::table)
.values(&block)
.on_conflict(bc::hash)
.do_update()
.set((
bc::number.eq(number as i64),
bc::parent_hash.eq(parent_hash_bytes),
bc::header.eq(header),
))
.execute(&mut conn)
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_block(&self, hash: H256) -> Result<Option<BlockRow>, CacheError> {
use crate::schema::blocks::columns as bc;
let mut conn = self.get_conn().await?;
let row = blocks::table
.filter(bc::hash.eq(hash.as_bytes()))
.select(BlockRow::as_select())
.first(&mut conn)
.await
.optional()?;
match row {
Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
row @ Some(_) => Ok(row),
None => Ok(None),
}
}
pub async fn get_block_by_number(
&self,
block_number: u32,
) -> Result<Option<BlockRow>, CacheError> {
use crate::schema::blocks::columns as bc;
let mut conn = self.get_conn().await?;
let row = blocks::table
.filter(bc::number.eq(block_number as i64))
.select(BlockRow::as_select())
.first(&mut conn)
.await
.optional()?;
match row {
Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
row @ Some(_) => Ok(row),
None => Ok(None),
}
}
pub async fn clear_block(&self, hash: H256) -> Result<(), CacheError> {
use crate::schema::{
blocks::columns as bc, prefix_scans::columns as psc, storage::columns as sc,
};
let block_hash = Arc::new(hash.as_bytes());
let mut attempts = 0;
loop {
let block_hash = Arc::clone(&block_hash);
let mut conn = self.get_conn().await?;
let res = conn
.transaction::<_, DieselError, _>(move |conn| {
Box::pin(async move {
diesel::delete(storage::table.filter(sc::block_hash.eq(*block_hash)))
.execute(conn)
.await?;
diesel::delete(blocks::table.filter(bc::hash.eq(*block_hash)))
.execute(conn)
.await?;
diesel::delete(prefix_scans::table.filter(psc::block_hash.eq(*block_hash)))
.execute(conn)
.await?;
Ok(())
})
})
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_prefix_scan_progress(
&self,
block_hash: H256,
prefix: &[u8],
) -> Result<Option<PrefixScanProgress>, CacheError> {
use crate::schema::prefix_scans::columns as psc;
let mut conn = self.get_conn().await?;
let row: Option<(Option<Vec<u8>>, bool)> = prefix_scans::table
.filter(psc::block_hash.eq(block_hash.as_bytes()))
.filter(psc::prefix.eq(prefix))
.select((psc::last_scanned_key, psc::is_complete))
.first::<(Option<Vec<u8>>, bool)>(&mut conn)
.await
.optional()?;
Ok(row.map(|(last_key, complete)| PrefixScanProgress {
last_scanned_key: last_key,
is_complete: complete,
}))
}
pub async fn update_prefix_scan(
&self,
block_hash: H256,
prefix: &[u8],
last_key: &[u8],
is_complete: bool,
) -> Result<(), CacheError> {
use crate::schema::prefix_scans::columns as psc;
use diesel::upsert::excluded;
let new_row = NewPrefixScanRow {
block_hash: block_hash.as_bytes(),
prefix,
last_scanned_key: Some(last_key),
is_complete,
};
let mut attempts = 0;
loop {
let mut conn = self.get_conn().await?;
let res = diesel::insert_into(prefix_scans::table)
.values(&new_row)
.on_conflict((psc::block_hash, psc::prefix))
.do_update()
.set((
psc::last_scanned_key.eq(excluded(psc::last_scanned_key)),
psc::is_complete.eq(excluded(psc::is_complete)),
))
.execute(&mut conn)
.await;
match res {
Ok(_) => return Ok(()),
Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
retry_conn(&mut attempts).await;
continue;
},
Err(e) => return Err(e.into()),
}
}
}
pub async fn get_keys_by_prefix(
&self,
block_hash: H256,
prefix: &[u8],
) -> Result<Vec<Vec<u8>>, CacheError> {
use crate::schema::storage::columns as sc;
let mut conn = self.get_conn().await?;
let prefix_end = increment_prefix(prefix);
let mut query = storage::table
.filter(sc::block_hash.eq(block_hash.as_bytes()))
.filter(sc::key.ge(prefix))
.select(sc::key)
.into_boxed();
if let Some(ref end) = prefix_end {
query = query.filter(sc::key.lt(end));
}
Ok(query.load::<Vec<u8>>(&mut conn).await?)
}
pub async fn next_key_from_cache(
&self,
block_hash: H256,
prefix: &[u8],
key: &[u8],
) -> Result<Option<Vec<u8>>, CacheError> {
use crate::schema::storage::columns as sc;
let mut conn = self.get_conn().await?;
let prefix_end = increment_prefix(prefix);
let mut query = storage::table
.filter(sc::block_hash.eq(block_hash.as_bytes()))
.filter(sc::key.gt(key))
.filter(sc::key.ge(prefix))
.filter(sc::is_empty.eq(false))
.select(sc::key)
.order(sc::key.asc())
.limit(1)
.into_boxed();
if let Some(ref end) = prefix_end {
query = query.filter(sc::key.lt(end));
}
Ok(query.first::<Vec<u8>>(&mut conn).await.optional()?)
}
pub async fn count_keys_by_prefix(
&self,
block_hash: H256,
prefix: &[u8],
) -> Result<usize, CacheError> {
use crate::schema::storage::columns as sc;
let mut conn = self.get_conn().await?;
let prefix_end = increment_prefix(prefix);
let mut query = storage::table
.filter(sc::block_hash.eq(block_hash.as_bytes()))
.filter(sc::key.ge(prefix))
.into_boxed();
if let Some(ref end) = prefix_end {
query = query.filter(sc::key.lt(end));
}
let count: i64 = query.count().get_result(&mut conn).await?;
Ok(count as usize)
}
}
fn increment_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
let mut result = prefix.to_vec();
for i in (0..result.len()).rev() {
if result[i] < 0xFF {
result[i] += 1;
result.truncate(i + 1);
return Some(result);
}
}
None
}
fn is_locked_error(e: &DieselError) -> bool {
match e {
DieselError::DatabaseError(_, info) => {
let msg = info.message().to_ascii_lowercase();
msg.contains(lock_patterns::DATABASE_IS_LOCKED) || msg.contains(lock_patterns::BUSY)
},
_ => false,
}
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn in_memory_cache_works() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([1u8; 32]);
let key = b"test_key";
let value = b"test_value";
assert!(cache.get_storage(block_hash, key).await.unwrap().is_none());
cache.set_storage(block_hash, key, Some(value)).await.unwrap();
let cached = cache.get_storage(block_hash, key).await.unwrap();
assert_eq!(cached, Some(Some(value.to_vec())));
}
#[tokio::test(flavor = "multi_thread")]
async fn cache_empty_value() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([2u8; 32]);
let key = b"empty_key";
cache.set_storage(block_hash, key, None).await.unwrap();
let cached = cache.get_storage(block_hash, key).await.unwrap();
assert_eq!(cached, Some(None));
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_operations() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([3u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"key1", Some(b"value1")),
(b"key2", Some(b"value2")),
(b"key3", None), ];
cache.set_storage_batch(block_hash, &entries).await.unwrap();
let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key3", b"key4"];
let results = cache.get_storage_batch(block_hash, &keys).await.unwrap();
assert_eq!(results.len(), 4);
assert_eq!(results[0], Some(Some(b"value1".to_vec())));
assert_eq!(results[1], Some(Some(b"value2".to_vec())));
assert_eq!(results[2], Some(None)); assert_eq!(results[3], None); }
#[tokio::test(flavor = "multi_thread")]
async fn block_caching() {
let cache = StorageCache::in_memory().await.unwrap();
let hash = H256::from([4u8; 32]);
let parent_hash = H256::from([3u8; 32]);
let header = b"mock_header_data";
cache.cache_block(hash, 100, parent_hash, header).await.unwrap();
let block = cache.get_block(hash).await.unwrap().unwrap();
assert_eq!(block.hash, hash.as_bytes().to_vec());
assert_eq!(block.number, 100i64);
assert_eq!(block.parent_hash, parent_hash.as_bytes().to_vec());
assert_eq!(block.header, header.to_vec());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_block_with_non_cached_block() {
let cache = StorageCache::in_memory().await.unwrap();
let hash = H256::from([4u8; 32]);
let block = cache.get_block(hash).await.unwrap();
assert!(block.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_block_number_corrupted_block_number_fails() {
let cache = StorageCache::in_memory().await.unwrap();
let hash1 = H256::from([4u8; 32]);
let hash2 = H256::from([5u8; 32]);
let parent_hash = H256::from([3u8; 32]);
let header = b"mock_header_data";
let invalid_block1 = NewBlockRow {
hash: hash1.as_bytes(),
number: -1, parent_hash: parent_hash.as_bytes(),
header,
};
let invalid_block2 = NewBlockRow {
hash: hash2.as_bytes(),
number: u32::MAX as i64 + 1,
parent_hash: parent_hash.as_bytes(),
header,
};
match &cache.inner {
StorageConn::Single(m) => {
let mut conn = m.lock().await;
for block in [invalid_block1, invalid_block2] {
diesel::insert_into(blocks::table)
.values(&block)
.execute(&mut *conn)
.await
.unwrap();
}
},
_ => unreachable!("Test single connection; qed;"),
}
assert!(
matches!(cache.get_block(hash1).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
);
assert!(
matches!(cache.get_block(hash2).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
);
}
#[tokio::test(flavor = "multi_thread")]
async fn different_blocks_have_separate_storage() {
let cache = StorageCache::in_memory().await.unwrap();
let block1 = H256::from([5u8; 32]);
let block2 = H256::from([6u8; 32]);
let key = b"same_key";
cache.set_storage(block1, key, Some(b"value1")).await.unwrap();
cache.set_storage(block2, key, Some(b"value2")).await.unwrap();
let cached1 = cache.get_storage(block1, key).await.unwrap();
let cached2 = cache.get_storage(block2, key).await.unwrap();
assert_eq!(cached1, Some(Some(b"value1".to_vec())));
assert_eq!(cached2, Some(Some(b"value2".to_vec())));
}
#[tokio::test(flavor = "multi_thread")]
async fn clear_block_removes_data() {
let cache = StorageCache::in_memory().await.unwrap();
let hash = H256::from([7u8; 32]);
let parent_hash = H256::from([6u8; 32]);
let key = b"test_key";
cache.set_storage(hash, key, Some(b"value")).await.unwrap();
cache.cache_block(hash, 50, parent_hash, b"header").await.unwrap();
assert!(cache.get_storage(hash, key).await.unwrap().is_some());
assert!(cache.get_block(hash).await.unwrap().is_some());
cache.clear_block(hash).await.unwrap();
assert!(cache.get_storage(hash, key).await.unwrap().is_none());
assert!(cache.get_block(hash).await.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn file_persistence() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test_cache.db");
let block_hash = H256::from([8u8; 32]);
let key = b"persistent_key";
let value = b"persistent_value";
{
let cache = StorageCache::open(Some(&db_path)).await.unwrap();
cache.set_storage(block_hash, key, Some(value)).await.unwrap();
}
{
let cache = StorageCache::open(Some(&db_path)).await.unwrap();
let cached = cache.get_storage(block_hash, key).await.unwrap();
assert_eq!(cached, Some(Some(value.to_vec())));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_access() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("concurrent_test.db");
let cache = StorageCache::open(Some(&db_path)).await.unwrap();
let block_hash = H256::from([9u8; 32]);
let mut handles = vec![];
for i in 0..10u8 {
let cache = cache.clone();
let handle = tokio::spawn(async move {
let key = format!("key_{}", i);
let value = format!("value_{}", i);
cache.set_storage(block_hash, key.as_bytes(), Some(value.as_bytes())).await
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap().unwrap();
}
let mut read_handles = vec![];
for i in 0..10u8 {
let cache = cache.clone();
let handle = tokio::spawn(async move {
let key = format!("key_{}", i);
cache.get_storage(block_hash, key.as_bytes()).await
});
read_handles.push((i, handle));
}
for (i, handle) in read_handles {
let result = handle.await.unwrap().unwrap();
let expected_value = format!("value_{}", i);
assert_eq!(result, Some(Some(expected_value.into_bytes())));
}
let cache1 = cache.clone();
let cache2 = cache.clone();
let block_hash2 = H256::from([10u8; 32]);
let batch_handle1 = tokio::spawn(async move {
let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
let values: Vec<Vec<u8>> = (0..5).map(|i| vec![i]).collect();
let entries: Vec<(&[u8], Option<&[u8]>)> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
.collect();
cache1.set_storage_batch(block_hash2, &entries).await
});
let batch_handle2 = tokio::spawn(async move {
let keys: Vec<Vec<u8>> =
(5..10).map(|i| format!("batch2_{}", i).into_bytes()).collect();
let values: Vec<Vec<u8>> = (5..10).map(|i| vec![i]).collect();
let entries: Vec<(&[u8], Option<&[u8]>)> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
.collect();
cache2.set_storage_batch(block_hash2, &entries).await
});
batch_handle1.await.unwrap().unwrap();
batch_handle2.await.unwrap().unwrap();
let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let results = cache.get_storage_batch(block_hash2, &key_refs).await.unwrap();
for (i, result) in results.iter().enumerate() {
assert_eq!(*result, Some(Some(vec![i as u8])));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn get_storage_batch_with_duplicate_keys() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([11u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"key1", Some(b"value1")),
(b"key2", Some(b"value2")),
(b"key3", Some(b"value3")),
];
cache.set_storage_batch(block_hash, &entries).await.unwrap();
let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key1", b"key3", b"key2", b"key2"];
let results = cache.get_storage_batch(block_hash, &keys).await;
assert!(matches!(results, Err(CacheError::DuplicatedKeys)));
}
#[tokio::test(flavor = "multi_thread")]
async fn set_storage_batch_with_duplicate_keys() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([12u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"key1", Some(b"first_value")),
(b"key2", Some(b"value2")),
(b"key1", Some(b"second_value")), (b"key3", Some(b"value3")),
(b"key1", Some(b"final_value")), ];
let result = cache.set_storage_batch(block_hash, &entries).await;
assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
}
#[tokio::test(flavor = "multi_thread")]
async fn prefix_scan_progress_tracking() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([11u8; 32]);
let prefix = b"balances:";
let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
assert!(progress.is_none());
let last_key = b"balances:account123";
cache.update_prefix_scan(block_hash, prefix, last_key, false).await.unwrap();
let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
assert!(progress.is_some());
let p = progress.unwrap();
assert_eq!(p.last_scanned_key, Some(last_key.to_vec()));
assert!(!p.is_complete);
let final_key = b"balances:zzz";
cache.update_prefix_scan(block_hash, prefix, final_key, true).await.unwrap();
let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
let p = progress.unwrap();
assert_eq!(p.last_scanned_key, Some(final_key.to_vec()));
assert!(p.is_complete);
}
#[tokio::test(flavor = "multi_thread")]
async fn prefix_scan_different_blocks_separate() {
let cache = StorageCache::in_memory().await.unwrap();
let block1 = H256::from([12u8; 32]);
let block2 = H256::from([13u8; 32]);
let prefix = b"system:";
cache.update_prefix_scan(block1, prefix, b"system:key1", true).await.unwrap();
let p1 = cache.get_prefix_scan_progress(block1, prefix).await.unwrap();
assert!(p1.is_some());
assert!(p1.unwrap().is_complete);
let p2 = cache.get_prefix_scan_progress(block2, prefix).await.unwrap();
assert!(p2.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_keys_by_prefix_works() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([14u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"tokens:alice", Some(b"100")),
(b"tokens:bob", Some(b"200")),
(b"tokens:charlie", Some(b"300")),
(b"balances:alice", Some(b"50")),
(b"balances:bob", Some(b"75")),
];
cache.set_storage_batch(block_hash, &entries).await.unwrap();
let token_keys = cache.get_keys_by_prefix(block_hash, b"tokens:").await.unwrap();
assert_eq!(token_keys.len(), 3);
assert!(token_keys.contains(&b"tokens:alice".to_vec()));
assert!(token_keys.contains(&b"tokens:bob".to_vec()));
assert!(token_keys.contains(&b"tokens:charlie".to_vec()));
let balance_keys = cache.get_keys_by_prefix(block_hash, b"balances:").await.unwrap();
assert_eq!(balance_keys.len(), 2);
assert!(balance_keys.contains(&b"balances:alice".to_vec()));
assert!(balance_keys.contains(&b"balances:bob".to_vec()));
let empty_keys = cache.get_keys_by_prefix(block_hash, b"nonexistent:").await.unwrap();
assert!(empty_keys.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn count_keys_by_prefix_works() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([15u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"prefix_a:1", Some(b"v1")),
(b"prefix_a:2", Some(b"v2")),
(b"prefix_a:3", Some(b"v3")),
(b"prefix_b:1", Some(b"v4")),
];
cache.set_storage_batch(block_hash, &entries).await.unwrap();
assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_a:").await.unwrap(), 3);
assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_b:").await.unwrap(), 1);
assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_c:").await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_from_cache_works() {
let cache = StorageCache::in_memory().await.unwrap();
let block_hash = H256::from([20u8; 32]);
let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
(b"prefix:aaa", Some(b"v1")),
(b"prefix:bbb", Some(b"v2")),
(b"prefix:ccc", Some(b"v3")),
(b"other:ddd", Some(b"v4")),
];
cache.set_storage_batch(block_hash, &entries).await.unwrap();
let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:aaa").await.unwrap();
assert_eq!(next, Some(b"prefix:bbb".to_vec()));
let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:bbb").await.unwrap();
assert_eq!(next, Some(b"prefix:ccc".to_vec()));
let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:ccc").await.unwrap();
assert!(next.is_none());
let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:").await.unwrap();
assert_eq!(next, Some(b"prefix:aaa".to_vec()));
}
#[test]
fn increment_prefix_works() {
assert_eq!(increment_prefix(b"abc"), Some(b"abd".to_vec()));
assert_eq!(increment_prefix(b"ab\xff"), Some(b"ac".to_vec()));
assert_eq!(increment_prefix(b"a\xff\xff"), Some(b"b".to_vec()));
assert_eq!(increment_prefix(b"\xff\xff\xff"), None);
assert_eq!(increment_prefix(b""), None);
assert_eq!(increment_prefix(b"a"), Some(b"b".to_vec()));
}
#[tokio::test(flavor = "multi_thread")]
async fn clear_block_removes_prefix_scans() {
let cache = StorageCache::in_memory().await.unwrap();
let hash = H256::from([16u8; 32]);
let prefix = b"test:";
cache.update_prefix_scan(hash, prefix, b"test:key", true).await.unwrap();
assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_some());
cache.clear_block(hash).await.unwrap();
assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_key_returns_none_for_nonexistent_key() {
let cache = StorageCache::in_memory().await.unwrap();
let result = cache.get_local_key(b"nonexistent_key").await.unwrap();
assert!(result.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn insert_local_key_creates_new_key() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"new_key";
let key_id = cache.insert_local_key(key).await.unwrap();
assert_eq!(key_id, 1);
let result = cache.get_local_key(key).await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().id, key_id);
}
#[tokio::test(flavor = "multi_thread")]
async fn insert_local_key_returns_existing_id() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"duplicate_key";
let key_id1 = cache.insert_local_key(key).await.unwrap();
let key_id2 = cache.insert_local_key(key).await.unwrap();
assert_eq!(key_id1, key_id2);
}
#[tokio::test(flavor = "multi_thread")]
async fn insert_and_get_local_value_at_block() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"test_key";
let value = b"test_value";
let key_id = cache.insert_local_key(key).await.unwrap();
cache.insert_local_value(key_id, Some(value), 100).await.unwrap();
let result = cache.get_local_value_at_block(key, 100).await.unwrap();
assert_eq!(result, Some(Some(value.to_vec())));
let result = cache.get_local_value_at_block(key, 150).await.unwrap();
assert_eq!(result, Some(Some(value.to_vec())));
let result = cache.get_local_value_at_block(key, 99).await.unwrap();
assert!(result.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_value_at_block_nonexistent_key() {
let cache = StorageCache::in_memory().await.unwrap();
let result = cache.get_local_value_at_block(b"nonexistent", 100).await.unwrap();
assert!(result.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn close_local_value_sets_valid_until() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"closing_key";
let value1 = b"value1";
let value2 = b"value2";
let key_id = cache.insert_local_key(key).await.unwrap();
cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
cache.close_local_value(key_id, 150).await.unwrap();
cache.insert_local_value(key_id, Some(value2), 150).await.unwrap();
let result = cache.get_local_value_at_block(key, 120).await.unwrap();
assert_eq!(result, Some(Some(value1.to_vec())));
let result = cache.get_local_value_at_block(key, 150).await.unwrap();
assert_eq!(result, Some(Some(value2.to_vec())));
let result = cache.get_local_value_at_block(key, 200).await.unwrap();
assert_eq!(result, Some(Some(value2.to_vec())));
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_values_at_block_batch_works() {
let cache = StorageCache::in_memory().await.unwrap();
let key1 = b"batch_key1";
let key2 = b"batch_key2";
let key3 = b"batch_key3";
let value1 = b"batch_value1";
let value2 = b"batch_value2";
let key_id1 = cache.insert_local_key(key1).await.unwrap();
let key_id2 = cache.insert_local_key(key2).await.unwrap();
cache.insert_local_value(key_id1, Some(value1), 100).await.unwrap();
cache.insert_local_value(key_id2, Some(value2), 100).await.unwrap();
let keys: Vec<&[u8]> = vec![key1, key2, key3];
let results = cache.get_local_values_at_block_batch(&keys, 100).await.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0], Some(Some(value1.to_vec())));
assert_eq!(results[1], Some(Some(value2.to_vec())));
assert!(results[2].is_none()); }
#[tokio::test(flavor = "multi_thread")]
async fn get_local_values_at_block_batch_respects_validity() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"validity_key";
let value1 = b"value_v1";
let value2 = b"value_v2";
let key_id = cache.insert_local_key(key).await.unwrap();
cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
cache.close_local_value(key_id, 200).await.unwrap();
cache.insert_local_value(key_id, Some(value2), 200).await.unwrap();
let keys: Vec<&[u8]> = vec![key];
let results = cache.get_local_values_at_block_batch(&keys, 150).await.unwrap();
assert_eq!(results[0], Some(Some(value1.to_vec())));
let results = cache.get_local_values_at_block_batch(&keys, 200).await.unwrap();
assert_eq!(results[0], Some(Some(value2.to_vec())));
let results = cache.get_local_values_at_block_batch(&keys, 99).await.unwrap();
assert!(results[0].is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_values_at_block_batch_with_duplicate_keys() {
let cache = StorageCache::in_memory().await.unwrap();
let key = b"dup_key";
let keys: Vec<&[u8]> = vec![key, key];
let result = cache.get_local_values_at_block_batch(&keys, 100).await;
assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
}
#[tokio::test(flavor = "multi_thread")]
async fn clear_local_storage_removes_all_data() {
let cache = StorageCache::in_memory().await.unwrap();
let key1 = b"clear_key1";
let key2 = b"clear_key2";
let value = b"some_value";
let key_id1 = cache.insert_local_key(key1).await.unwrap();
let key_id2 = cache.insert_local_key(key2).await.unwrap();
cache.insert_local_value(key_id1, Some(value), 100).await.unwrap();
cache.insert_local_value(key_id2, Some(value), 100).await.unwrap();
assert!(cache.get_local_key(key1).await.unwrap().is_some());
assert!(cache.get_local_key(key2).await.unwrap().is_some());
assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_some());
cache.clear_local_storage().await.unwrap();
assert!(cache.get_local_key(key1).await.unwrap().is_none());
assert!(cache.get_local_key(key2).await.unwrap().is_none());
assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_keys_at_block_returns_live_keys() {
let cache = StorageCache::in_memory().await.unwrap();
let k1 = b"pallet:alice";
let k2 = b"pallet:bob";
let k3 = b"other:charlie";
let id1 = cache.insert_local_key(k1).await.unwrap();
let id2 = cache.insert_local_key(k2).await.unwrap();
let id3 = cache.insert_local_key(k3).await.unwrap();
cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
cache.insert_local_value(id2, Some(b"v2"), 200).await.unwrap();
cache.insert_local_value(id3, Some(b"v3"), 100).await.unwrap();
let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
assert_eq!(keys, vec![k1.to_vec()]);
let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
assert_eq!(keys, vec![k1.to_vec(), k2.to_vec()]);
let keys = cache.get_local_keys_at_block(b"pallet:", 99).await.unwrap();
assert!(keys.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_keys_at_block_excludes_deleted() {
let cache = StorageCache::in_memory().await.unwrap();
let k1 = b"pallet:alice";
let id1 = cache.insert_local_key(k1).await.unwrap();
cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
cache.close_local_value(id1, 200).await.unwrap();
cache.insert_local_value(id1, None, 200).await.unwrap();
let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
assert_eq!(keys, vec![k1.to_vec()]);
let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
assert!(keys.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_deleted_keys_at_block_works() {
let cache = StorageCache::in_memory().await.unwrap();
let k1 = b"pallet:alice";
let id1 = cache.insert_local_key(k1).await.unwrap();
cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
cache.close_local_value(id1, 200).await.unwrap();
cache.insert_local_value(id1, None, 200).await.unwrap();
let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 150).await.unwrap();
assert!(deleted.is_empty());
let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 200).await.unwrap();
assert_eq!(deleted, vec![k1.to_vec()]);
}
#[tokio::test(flavor = "multi_thread")]
async fn prefix_upper_bound_works() {
assert_eq!(StorageCache::prefix_upper_bound(b"abc"), Some(b"abd".to_vec()));
assert_eq!(StorageCache::prefix_upper_bound(b"ab\xff"), Some(b"ac".to_vec()));
assert_eq!(StorageCache::prefix_upper_bound(b"\xff\xff"), None);
assert_eq!(StorageCache::prefix_upper_bound(b""), None);
}
}