use crate::{error::LocalStorageError, models::BlockRow, remote::RemoteStorageLayer};
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock},
};
use subxt::{Metadata, config::substrate::H256};
const ONE_BLOCK: u32 = 1;
#[derive(Debug, PartialEq)]
pub struct LocalSharedValue {
last_modification_block: u32,
pub value: Option<Vec<u8>>,
}
static EMPTY_MEMORY: [u8; 0] = [];
impl AsRef<[u8]> for LocalSharedValue {
fn as_ref(&self) -> &[u8] {
match &self.value {
Some(value) => value.as_ref(),
None => &EMPTY_MEMORY,
}
}
}
type SharedValue = Arc<LocalSharedValue>;
type Modifications = HashMap<Vec<u8>, Option<SharedValue>>;
type DeletedPrefixes = Vec<Vec<u8>>;
type DiffLocalStorage = Vec<(Vec<u8>, Option<SharedValue>)>;
type MetadataVersions = BTreeMap<u32, Arc<Metadata>>;
#[derive(Clone, Debug)]
pub struct LocalStorageLayer {
parent: RemoteStorageLayer,
first_forked_block_hash: H256,
first_forked_block_number: u32,
current_block_number: u32,
modifications: Arc<RwLock<Modifications>>,
deleted_prefixes: Arc<RwLock<DeletedPrefixes>>,
metadata_versions: Arc<RwLock<MetadataVersions>>,
}
impl LocalStorageLayer {
pub fn new(
parent: RemoteStorageLayer,
first_forked_block_number: u32,
first_forked_block_hash: H256,
metadata: Metadata,
) -> Self {
let mut metadata_versions = BTreeMap::new();
metadata_versions.insert(first_forked_block_number, Arc::new(metadata));
Self {
parent,
first_forked_block_hash,
first_forked_block_number,
current_block_number: first_forked_block_number + 1,
modifications: Arc::new(RwLock::new(HashMap::new())),
deleted_prefixes: Arc::new(RwLock::new(Vec::new())),
metadata_versions: Arc::new(RwLock::new(metadata_versions)),
}
}
async fn get_block(&self, block_number: u32) -> Result<Option<BlockRow>, LocalStorageError> {
if let Some(cached_block) = self.parent.cache().get_block_by_number(block_number).await? {
return Ok(Some(cached_block));
}
Ok(self.parent.fetch_and_cache_block_by_number(block_number).await?)
}
pub fn get_current_block_number(&self) -> u32 {
self.current_block_number
}
pub fn cache(&self) -> &crate::StorageCache {
self.parent.cache()
}
pub fn remote(&self) -> &crate::RemoteStorageLayer {
&self.parent
}
pub fn fork_block_hash(&self) -> H256 {
self.first_forked_block_hash
}
pub async fn metadata_at(&self, block_number: u32) -> Result<Arc<Metadata>, LocalStorageError> {
if block_number < self.first_forked_block_number {
return self.fetch_remote_metadata(block_number).await;
}
let versions = self
.metadata_versions
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
versions
.range(..=block_number)
.next_back()
.map(|(_, metadata)| Arc::clone(metadata))
.ok_or_else(|| {
LocalStorageError::MetadataNotFound(format!(
"No metadata found for block {}",
block_number
))
})
}
async fn fetch_remote_metadata(
&self,
block_number: u32,
) -> Result<Arc<Metadata>, LocalStorageError> {
let block_hash = self.parent.rpc().block_hash_at(block_number).await?.ok_or_else(|| {
LocalStorageError::MetadataNotFound(format!(
"Block {} not found on remote node",
block_number
))
})?;
let metadata = self.parent.rpc().metadata(block_hash).await?;
Ok(Arc::new(metadata))
}
pub fn register_metadata_version(
&self,
block_number: u32,
metadata: Metadata,
) -> Result<(), LocalStorageError> {
let mut versions = self
.metadata_versions
.write()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
versions.insert(block_number, Arc::new(metadata));
Ok(())
}
pub fn has_code_changed_at(&self, block_number: u32) -> Result<bool, LocalStorageError> {
let modifications =
self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
let code_key = sp_core::storage::well_known_keys::CODE;
Ok(modifications.get(code_key).is_some_and(|value| {
value.as_ref().is_some_and(|v| v.last_modification_block == block_number)
}))
}
fn get_local_modification(
&self,
key: &[u8],
block_number: u32,
) -> Result<Option<Option<SharedValue>>, LocalStorageError> {
let latest_block_number = self.get_current_block_number();
let modifications_lock =
self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
let deleted_prefixes_lock = self
.deleted_prefixes
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
match modifications_lock.get(key) {
local_modification @ Some(Some(shared_value))
if latest_block_number == block_number ||
shared_value.last_modification_block < block_number =>
Ok(local_modification.cloned()),
None if deleted_prefixes_lock
.iter()
.any(|prefix| key.starts_with(prefix.as_slice())) =>
Ok(Some(None)),
_ => Ok(None),
}
}
pub async fn get(
&self,
block_number: u32,
key: &[u8],
) -> Result<Option<SharedValue>, LocalStorageError> {
let latest_block_number = self.get_current_block_number();
if let local_modification @ Ok(Some(_)) = self.get_local_modification(key, block_number) {
return local_modification.map(|local_modification| local_modification.flatten());
}
if block_number == latest_block_number {
return Ok(self
.parent
.get(self.first_forked_block_hash, key)
.await?
.map(|value| LocalSharedValue {
last_modification_block: 0,
value: Some(value),
})
.map(Arc::new));
}
if block_number > self.first_forked_block_number && block_number < latest_block_number {
let value = if let Some(local_value) =
self.parent.cache().get_local_value_at_block(key, block_number).await?
{
local_value
}
else if let Some(remote_value) =
self.parent.get(self.first_forked_block_hash, key).await?
{
Some(remote_value)
} else {
return Ok(None);
};
return Ok(Some(Arc::new(LocalSharedValue {
last_modification_block: 0,
value,
})));
}
let block = self.get_block(block_number).await?;
if let Some(block_row) = block {
let block_hash = H256::from_slice(&block_row.hash);
Ok(self
.parent
.get(block_hash, key)
.await?
.map(|value| LocalSharedValue {
last_modification_block: 0,
value: Some(value),
})
.map(Arc::new))
} else {
Ok(None)
}
}
pub async fn next_key(
&self,
prefix: &[u8],
key: &[u8],
) -> Result<Option<Vec<u8>>, LocalStorageError> {
let deleted_prefixes = self
.deleted_prefixes
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?
.clone();
let mut current_key = key.to_vec();
loop {
let next =
self.parent.next_key(self.first_forked_block_hash, prefix, ¤t_key).await?;
match next {
Some(next_key) => {
if deleted_prefixes
.iter()
.any(|deleted| next_key.starts_with(deleted.as_slice()))
{
current_key = next_key;
continue;
}
return Ok(Some(next_key));
},
None => return Ok(None),
}
}
}
pub async fn keys_by_prefix(
&self,
prefix: &[u8],
block_number: u32,
) -> Result<Vec<Vec<u8>>, LocalStorageError> {
let remote_keys = self.parent.get_keys(self.first_forked_block_hash, prefix).await?;
let latest_block_number = self.get_current_block_number();
if block_number >= latest_block_number {
self.merge_keys_with_in_memory(remote_keys, prefix)
} else {
self.merge_keys_with_cache(remote_keys, prefix, block_number).await
}
}
fn merge_keys_with_in_memory(
&self,
remote_keys: Vec<Vec<u8>>,
prefix: &[u8],
) -> Result<Vec<Vec<u8>>, LocalStorageError> {
let modifications = self
.modifications
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?
.clone();
let deleted_prefixes = self
.deleted_prefixes
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?
.clone();
let is_deleted = |key: &[u8]| -> bool {
deleted_prefixes.iter().any(|dp| key.starts_with(dp.as_slice()))
};
let is_locally_deleted = |key: &[u8]| -> bool {
modifications
.get::<[u8]>(key)
.and_then(|sv| sv.as_ref())
.is_some_and(|sv| sv.value.is_none())
};
let mut merged: std::collections::BTreeSet<Vec<u8>> = remote_keys
.into_iter()
.filter(|k| !is_deleted(k) && !is_locally_deleted(k))
.collect();
for (key, maybe_sv) in modifications.iter() {
if key.starts_with(prefix) && maybe_sv.as_ref().is_some_and(|sv| sv.value.is_some()) {
merged.insert(key.clone());
}
}
Ok(merged.into_iter().collect())
}
async fn merge_keys_with_cache(
&self,
remote_keys: Vec<Vec<u8>>,
prefix: &[u8],
block_number: u32,
) -> Result<Vec<Vec<u8>>, LocalStorageError> {
let cache = self.parent.cache();
let local_live_keys = cache.get_local_keys_at_block(prefix, block_number).await?;
let local_deleted_keys =
cache.get_local_deleted_keys_at_block(prefix, block_number).await?;
let deleted_set: std::collections::HashSet<Vec<u8>> =
local_deleted_keys.into_iter().collect();
let mut merged: std::collections::BTreeSet<Vec<u8>> =
remote_keys.into_iter().filter(|k| !deleted_set.contains(k)).collect();
merged.extend(local_live_keys);
Ok(merged.into_iter().collect())
}
pub fn set(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
let mut modifications_lock =
self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
let latest_block_number = self.get_current_block_number();
modifications_lock.insert(
key.to_vec(),
Some(Arc::new({
LocalSharedValue {
last_modification_block: latest_block_number,
value: value.map(|value| value.to_vec()),
}
})),
);
Ok(())
}
pub fn set_initial(&self, key: &[u8], value: Option<&[u8]>) -> Result<(), LocalStorageError> {
let mut modifications_lock =
self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
modifications_lock.insert(
key.to_vec(),
Some(Arc::new(LocalSharedValue {
last_modification_block: self.first_forked_block_number,
value: value.map(|v| v.to_vec()),
})),
);
Ok(())
}
pub async fn get_batch(
&self,
block_number: u32,
keys: &[&[u8]],
) -> Result<Vec<Option<SharedValue>>, LocalStorageError> {
if keys.is_empty() {
return Ok(vec![]);
}
let latest_block_number = self.get_current_block_number();
let mut results: Vec<Option<SharedValue>> = Vec::with_capacity(keys.len());
let mut non_local_keys: Vec<&[u8]> = Vec::new();
let mut non_local_indices: Vec<usize> = Vec::new();
for (i, key) in keys.iter().enumerate() {
match self.get_local_modification(key, block_number)? {
Some(local_modification) => results.push(local_modification),
_ => {
results.push(None);
non_local_keys.push(*key);
non_local_indices.push(i)
},
}
}
if block_number == latest_block_number {
if !non_local_keys.is_empty() {
let parent_values =
self.parent.get_batch(self.first_forked_block_hash, &non_local_keys).await?;
for (i, parent_value) in parent_values.into_iter().enumerate() {
let result_idx = non_local_indices[i];
results[result_idx] = parent_value
.map(|value| LocalSharedValue {
last_modification_block: 0,
value: Some(value),
})
.map(Arc::new);
}
}
return Ok(results);
}
if block_number > self.first_forked_block_number && block_number < latest_block_number {
if !non_local_keys.is_empty() {
let cached_values = self
.parent
.cache()
.get_local_values_at_block_batch(&non_local_keys, block_number)
.await?;
for (i, cache_value) in cached_values.into_iter().enumerate() {
let result_idx = non_local_indices[i];
results[result_idx] = cache_value.map(|value| {
Arc::new(LocalSharedValue {
last_modification_block: 0,
value,
})
});
}
}
let mut final_results = Vec::with_capacity(keys.len());
for (i, value) in results.into_iter().enumerate() {
let final_value = if value.is_some() {
value
} else {
self.parent
.get(self.first_forked_block_hash, keys[i])
.await?
.map(|value| {
LocalSharedValue {
last_modification_block: 0,
value: Some(value),
}
})
.map(Arc::new)
};
final_results.push(final_value);
}
return Ok(final_results);
}
let block = self.get_block(block_number).await?;
if let Some(block_row) = block {
let block_hash = H256::from_slice(&block_row.hash);
let parent_values = self.parent.get_batch(block_hash, keys).await?;
Ok(parent_values
.into_iter()
.map(|value| {
value.map(|value| {
Arc::new(LocalSharedValue {
last_modification_block: 0,
value: Some(value),
})
})
})
.collect())
} else {
Ok(vec![None; keys.len()])
}
}
pub fn set_batch(&self, entries: &[(&[u8], Option<&[u8]>)]) -> Result<(), LocalStorageError> {
if entries.is_empty() {
return Ok(());
}
let latest_block_number = self.get_current_block_number();
let mut modifications_lock =
self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
for (key, value) in entries {
modifications_lock.insert(
key.to_vec(),
Some(Arc::new(LocalSharedValue {
last_modification_block: latest_block_number,
value: value.map(|value| value.to_vec()),
})),
);
}
Ok(())
}
pub fn set_batch_initial(
&self,
entries: &[(&[u8], Option<&[u8]>)],
) -> Result<(), LocalStorageError> {
if entries.is_empty() {
return Ok(());
}
let mut modifications_lock =
self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
for (key, value) in entries {
modifications_lock.insert(
key.to_vec(),
Some(Arc::new(LocalSharedValue {
last_modification_block: self.first_forked_block_number,
value: value.map(|v| v.to_vec()),
})),
);
}
Ok(())
}
pub fn delete_prefix(&self, prefix: &[u8]) -> Result<(), LocalStorageError> {
let mut modifications_lock =
self.modifications.write().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
let mut deleted_prefixes_lock = self
.deleted_prefixes
.write()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
modifications_lock.retain(|key, _| !key.starts_with(prefix));
deleted_prefixes_lock.push(prefix.to_vec());
Ok(())
}
pub fn is_deleted(&self, prefix: &[u8]) -> Result<bool, LocalStorageError> {
let deleted_prefixes_lock = self
.deleted_prefixes
.read()
.map_err(|e| LocalStorageError::Lock(e.to_string()))?;
Ok(deleted_prefixes_lock
.iter()
.any(|deleted_prefix| deleted_prefix.as_slice() == prefix))
}
pub fn diff(&self) -> Result<DiffLocalStorage, LocalStorageError> {
let modifications_lock =
self.modifications.read().map_err(|e| LocalStorageError::Lock(e.to_string()))?;
Ok(modifications_lock
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect())
}
pub async fn commit(&mut self) -> Result<(), LocalStorageError> {
let current_block_number = self.get_current_block_number();
let new_latest_block = current_block_number
.checked_add(ONE_BLOCK)
.ok_or(LocalStorageError::Arithmetic)?;
let diff = self.diff()?;
let entries_to_commit: Vec<(&[u8], Option<&[u8]>)> = diff
.iter()
.filter_map(|(key, shared_value)| {
shared_value.as_ref().and_then(|sv| {
if sv.last_modification_block == current_block_number {
Some((key.as_slice(), sv.value.as_deref()))
} else {
None
}
})
})
.collect();
self.parent
.cache()
.commit_local_changes(&entries_to_commit, current_block_number)
.await?;
self.current_block_number = new_latest_block;
Ok(())
}
pub fn child(&self) -> LocalStorageLayer {
self.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{
TestContext,
constants::{SYSTEM_NUMBER_KEY, SYSTEM_PALLET_PREFIX, SYSTEM_PARENT_HASH_KEY},
};
use std::time::Duration;
use subxt::ext::codec::Decode;
fn create_layer(ctx: &TestContext) -> LocalStorageLayer {
LocalStorageLayer::new(
ctx.remote().clone(),
ctx.block_number(),
ctx.block_hash(),
ctx.metadata().clone(),
)
}
#[tokio::test(flavor = "multi_thread")]
async fn new_creates_empty_layer() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 0, "New layer should have no modifications");
assert_eq!(layer.first_forked_block_number, ctx.block_number());
assert_eq!(layer.current_block_number, ctx.block_number() + 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_returns_local_modification() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"test_key";
let value = b"test_value";
layer.set(key, Some(value)).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(
result,
Some(Arc::new(LocalSharedValue {
last_modification_block: block,
value: Some(value.as_slice().to_vec())
}))
);
layer.commit().await.unwrap();
layer.commit().await.unwrap();
let new_block = layer.get_current_block_number();
let result = layer.get(new_block, key).await.unwrap();
assert_eq!(
result,
Some(Arc::new(LocalSharedValue {
last_modification_block: block,
value: Some(value.as_slice().to_vec())
}))
);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_non_existent_block_returns_none() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let non_existent_block = u32::MAX;
let key = b"some_key";
let result = layer.get(non_existent_block, key).await.unwrap();
assert!(result.is_none(), "Non-existent block should return None");
}
#[tokio::test(flavor = "multi_thread")]
async fn get_returns_none_for_deleted_prefix_if_exact_key_not_found() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let prefix = b"ke";
let value = b"value";
layer.set(key, Some(value)).unwrap();
layer.delete_prefix(prefix).unwrap();
let result = layer.get(block, key).await.unwrap();
assert!(result.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_returns_some_for_deleted_prefix_if_exact_key_found_after_deletion() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let prefix = b"ke";
let value = b"value";
layer.set(key, Some(value)).unwrap();
layer.delete_prefix(prefix).unwrap();
let result = layer.get(block, key).await.unwrap();
assert!(result.is_none(), "get() should return None for deleted key");
layer.set(key, Some(value)).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(result.unwrap().value.as_deref().unwrap(), value.as_slice());
assert!(layer.is_deleted(prefix).unwrap());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_falls_back_to_parent() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let result = layer.get(block, &key).await.unwrap().unwrap().value.clone().unwrap();
assert_eq!(u32::decode(&mut &result[..]).unwrap(), ctx.block_number());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_local_overrides_parent() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let local_value = b"local_override";
let parent_value = layer.get(block, &key).await.unwrap().unwrap().value.clone().unwrap();
assert_eq!(u32::decode(&mut &parent_value[..]).unwrap(), ctx.block_number());
layer.set(&key, Some(local_value)).unwrap();
let result = layer.get(block, &key).await.unwrap();
assert_eq!(
result,
Some(Arc::new(LocalSharedValue {
last_modification_block: block,
value: Some(local_value.as_slice().to_vec())
}))
);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_returns_none_for_nonexistent_key() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"nonexistent_key_12345";
let result = layer.get(block, key).await.unwrap();
assert!(result.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_retrieves_modified_value_from_fork_history() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let key = b"modified_key";
let value_block_1 = b"value_at_block_1";
let value_block_2 = b"value_at_block_2";
layer.commit().await.unwrap();
layer.set(key, Some(value_block_1)).unwrap();
layer.commit().await.unwrap();
let block_1 = layer.get_current_block_number() - 1;
layer.set(key, Some(value_block_2)).unwrap();
layer.commit().await.unwrap();
let block_2 = layer.get_current_block_number() - 1;
let result_block_1 = layer.get(block_1, key).await.unwrap();
assert_eq!(
result_block_1,
Some(Arc::new(LocalSharedValue {
last_modification_block: 0, value: Some(value_block_1.to_vec())
}))
);
let result_block_2 = layer.get(block_2, key).await.unwrap();
assert_eq!(
result_block_2,
Some(Arc::new(LocalSharedValue {
last_modification_block: 0, value: Some(value_block_2.to_vec())
}))
);
let result_latest = layer.get(layer.get_current_block_number(), key).await.unwrap();
assert_eq!(
result_latest,
Some(Arc::new(LocalSharedValue {
last_modification_block: block_2,
value: Some(value_block_2.to_vec())
}))
);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_retrieves_unmodified_value_from_remote_at_past_forked_block() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let unmodified_key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
layer.commit().await.unwrap();
layer.commit().await.unwrap();
let committed_block = layer.get_current_block_number() - 1;
let result = layer.get(committed_block, &unmodified_key).await.unwrap();
assert!(result.is_some(),);
let remote_value = layer.get(ctx.block_number(), &unmodified_key).await.unwrap();
assert_eq!(result, remote_value,);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_historical_block() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block_number = ctx.block_number();
let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let cached_before = ctx.remote().cache().get_block_by_number(block_number).await.unwrap();
assert!(cached_before.is_none());
let result = layer.get(block_number, &key).await.unwrap().unwrap().value.clone().unwrap();
assert_eq!(u32::decode(&mut &result[..]).unwrap(), ctx.block_number());
let cached_before = ctx.remote().cache().get_block_by_number(block_number).await.unwrap();
assert!(cached_before.is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn set_stores_value() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let value = b"value";
layer.set(key, Some(value)).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(
result,
Some(Arc::new(LocalSharedValue {
last_modification_block: block,
value: Some(value.as_slice().to_vec())
}))
);
}
#[tokio::test(flavor = "multi_thread")]
async fn set_overwrites_previous_value() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let value1 = b"value1";
let value2 = b"value2";
layer.set(key, Some(value1)).unwrap();
layer.set(key, Some(value2)).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(result.as_ref().and_then(|v| v.value.as_deref()), Some(value2.as_slice()));
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_empty_keys() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let results = layer.get_batch(ctx.block_number(), &[]).await.unwrap();
assert_eq!(results.len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_returns_local_modifications() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"key1";
let key2 = b"key2";
let value1 = b"value1";
let value2 = b"value2";
layer.set_batch(&[(key1, Some(value1)), (key2, Some(value2))]).unwrap();
let results = layer.get_batch(block, &[key1, key2]).await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].as_ref().and_then(|v| v.value.as_deref()), Some(value1.as_slice()));
assert_eq!(results[1].as_ref().and_then(|v| v.value.as_deref()), Some(value2.as_slice()));
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_returns_none_for_deleted_prefix() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"key1";
let key2 = b"key2";
layer.set_batch(&[(key1, Some(b"val")), (key2, Some(b"val"))]).unwrap();
layer.delete_prefix(key2).unwrap();
let results = layer.get_batch(block, &[key1, key2]).await.unwrap();
assert!(results[0].is_some());
assert!(results[1].is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_falls_back_to_parent() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();
let results = layer.get_batch(block, &[key1.as_slice(), key2.as_slice()]).await.unwrap();
assert!(results[0].is_some());
assert!(results[1].is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_local_overrides_parent() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();
let local_value = b"local_override";
layer.set(&key1, Some(local_value)).unwrap();
let results = layer.get_batch(block, &[key1.as_slice(), key2.as_slice()]).await.unwrap();
assert_eq!(
results[0].as_ref().and_then(|v| v.value.as_deref()),
Some(local_value.as_slice())
);
assert!(results[1].is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_mixed_sources() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let local_key = b"local_key";
let remote_key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let deleted_key = b"deleted_key";
let nonexistent_key = b"nonexistent_key";
layer.set(local_key, Some(b"local_value")).unwrap();
layer.set(deleted_key, None).unwrap();
let results = layer
.get_batch(block, &[local_key, remote_key.as_slice(), deleted_key, nonexistent_key])
.await
.unwrap();
assert_eq!(results.len(), 4);
assert_eq!(
results[0].as_ref().and_then(|v| v.value.as_deref()),
Some(b"local_value".as_slice())
);
assert_eq!(
u32::decode(&mut &results[1].as_ref().unwrap().value.as_ref().unwrap()[..]).unwrap(),
ctx.block_number()
); assert!(results[2].as_ref().map(|v| v.value.is_none()).unwrap_or(false)); assert!(results[3].is_none()); }
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_maintains_order() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"key1";
let key2 = b"key2";
let key3 = b"key3";
let value1 = b"value1";
let value2 = b"value2";
let value3 = b"value3";
layer
.set_batch(&[(key1, Some(value1)), (key2, Some(value2)), (key3, Some(value3))])
.unwrap();
let results = layer.get_batch(block, &[key3, key1, key2]).await.unwrap();
assert_eq!(results[0].as_ref().and_then(|v| v.value.as_deref()), Some(value3.as_slice()));
assert_eq!(results[1].as_ref().and_then(|v| v.value.as_deref()), Some(value1.as_slice()));
assert_eq!(results[2].as_ref().and_then(|v| v.value.as_deref()), Some(value2.as_slice()));
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_retrieves_modified_value_from_fork_history() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let key1 = b"modified_key1";
let key2 = b"modified_key2";
let value1_block_1 = b"value1_at_block_1";
let value2_block_1 = b"value2_at_block_1";
let value1_block_2 = b"value1_at_block_2";
let value2_block_2 = b"value2_at_block_2";
layer.commit().await.unwrap();
layer
.set_batch(&[(key1, Some(value1_block_1)), (key2, Some(value2_block_1))])
.unwrap();
layer.commit().await.unwrap();
let block_1 = layer.get_current_block_number() - 1;
layer
.set_batch(&[(key1, Some(value1_block_2)), (key2, Some(value2_block_2))])
.unwrap();
layer.commit().await.unwrap();
let block_2 = layer.get_current_block_number() - 1;
let results_block_1 = layer.get_batch(block_1, &[key1, key2]).await.unwrap();
assert_eq!(
results_block_1[0],
Some(Arc::new(LocalSharedValue {
last_modification_block: 0, value: Some(value1_block_1.to_vec())
}))
);
assert_eq!(
results_block_1[1],
Some(Arc::new(LocalSharedValue {
last_modification_block: 0,
value: Some(value2_block_1.to_vec())
}))
);
let results_block_2 = layer.get_batch(block_2, &[key1, key2]).await.unwrap();
assert_eq!(
results_block_2[0],
Some(Arc::new(LocalSharedValue {
last_modification_block: 0, value: Some(value1_block_2.to_vec())
}))
);
assert_eq!(
results_block_2[1],
Some(Arc::new(LocalSharedValue {
last_modification_block: 0,
value: Some(value2_block_2.to_vec())
}))
);
let results_latest =
layer.get_batch(layer.get_current_block_number(), &[key1, key2]).await.unwrap();
assert_eq!(
results_latest[0],
Some(Arc::new(LocalSharedValue {
last_modification_block: block_2,
value: Some(value1_block_2.to_vec())
}))
);
assert_eq!(
results_latest[1],
Some(Arc::new(LocalSharedValue {
last_modification_block: block_2,
value: Some(value2_block_2.to_vec())
}))
);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_retrieves_unmodified_value_from_remote_at_past_forked_block() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let unmodified_key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let unmodified_key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();
layer.commit().await.unwrap();
layer.commit().await.unwrap();
let committed_block = layer.get_current_block_number() - 1;
let results = layer
.get_batch(committed_block, &[unmodified_key1.as_slice(), unmodified_key2.as_slice()])
.await
.unwrap();
assert!(results[0].is_some());
assert!(results[1].is_some());
let remote_values = layer
.get_batch(
ctx.block_number(),
&[unmodified_key1.as_slice(), unmodified_key2.as_slice()],
)
.await
.unwrap();
assert_eq!(results[0], remote_values[0]);
assert_eq!(results[1], remote_values[1]);
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_historical_block() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
std::thread::sleep(Duration::from_secs(30));
let block_number = ctx.block_number();
let key1 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let key2 = hex::decode(SYSTEM_PARENT_HASH_KEY).unwrap();
let key3 = b"non_existent_key";
let results = layer
.get_batch(block_number, &[key1.as_slice(), key2.as_slice(), key3])
.await
.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(
u32::decode(&mut &results[0].as_ref().unwrap().value.as_ref().unwrap()[..]).unwrap(),
block_number
);
assert!(results[2].is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_non_existent_block_returns_none() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let non_existent_block = u32::MAX;
let keys: Vec<&[u8]> = vec![b"key1", b"key2"];
let results = layer.get_batch(non_existent_block, &keys).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].is_none(), "Non-existent block should return None");
assert!(results[1].is_none(), "Non-existent block should return None");
}
#[tokio::test(flavor = "multi_thread")]
async fn get_batch_mixed_block_scenarios() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
layer.commit().await.unwrap();
layer.commit().await.unwrap();
let latest_block_1 = layer.get_current_block_number();
let key1 = b"local_key";
let key2 = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
layer.set(key1, Some(b"local_value")).unwrap();
let results1 = layer.get(latest_block_1, key1).await.unwrap();
assert_eq!(
results1.as_ref().and_then(|v| v.value.as_deref()),
Some(b"local_value".as_slice())
);
let historical_block = ctx.block_number();
let results2 = layer
.get(historical_block, key2.as_slice())
.await
.unwrap()
.unwrap()
.value
.clone()
.unwrap();
assert_eq!(u32::decode(&mut &results2[..]).unwrap(), historical_block);
layer.commit().await.unwrap();
let latest_block_2 = layer.get_current_block_number();
layer.set(key1, Some(b"local_value_2")).unwrap();
let result_previous_block = layer.get(latest_block_1, key1).await.unwrap().unwrap();
let result_latest_block = layer.get(latest_block_2, key1).await.unwrap().unwrap();
assert_eq!(
*result_previous_block,
LocalSharedValue {
last_modification_block: 0,
value: Some(b"local_value".to_vec())
}
);
assert_eq!(
*result_latest_block,
LocalSharedValue {
last_modification_block: latest_block_2,
value: Some(b"local_value_2".to_vec())
}
);
}
#[tokio::test(flavor = "multi_thread")]
async fn set_batch_empty_entries() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
layer.set_batch(&[]).unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn set_batch_stores_multiple_values() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let key1 = b"key1";
let key2 = b"key2";
let key3 = b"key3";
let value1 = b"value1";
let value2 = b"value2";
let value3 = b"value3";
layer
.set_batch(&[(key1, Some(value1)), (key2, Some(value2)), (key3, Some(value3))])
.unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 3);
}
#[tokio::test(flavor = "multi_thread")]
async fn set_batch_with_deletions() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"key1";
let key2 = b"key2";
let value1 = b"value1";
layer.set_batch(&[(key1, Some(value1)), (key2, None)]).unwrap();
let results = layer.get_batch(block, &[key1, key2]).await.unwrap();
assert!(results[0].is_some());
assert!(results[1].as_ref().map(|v| v.value.is_none()).unwrap_or(false));
}
#[tokio::test(flavor = "multi_thread")]
async fn set_batch_overwrites_previous_values() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let value1 = b"value1";
let value2 = b"value2";
layer.set(key, Some(value1)).unwrap();
layer.set_batch(&[(key, Some(value2))]).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(result.as_ref().and_then(|v| v.value.as_deref()), Some(value2.as_slice()));
}
#[tokio::test(flavor = "multi_thread")]
async fn set_batch_duplicate_keys_last_wins() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"key";
let value1 = b"value1";
let value2 = b"value2";
layer.set_batch(&[(key, Some(value1)), (key, Some(value2))]).unwrap();
let result = layer.get(block, key).await.unwrap();
assert_eq!(result.as_ref().and_then(|v| v.value.as_deref()), Some(value2.as_slice()));
}
#[tokio::test(flavor = "multi_thread")]
async fn delete_prefix_removes_matching_keys() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = b"prefix_";
let key1 = b"prefix_key1";
let key2 = b"prefix_key2";
let key3 = b"other_key";
layer.set(key1, Some(b"val1")).unwrap();
layer.set(key2, Some(b"val2")).unwrap();
layer.set(key3, Some(b"val3")).unwrap();
layer.delete_prefix(prefix).unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 1, "Only non-matching key should remain");
assert_eq!(diff[0].0, key3);
}
#[tokio::test(flavor = "multi_thread")]
async fn delete_prefix_blocks_parent_reads() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();
let key = hex::decode(SYSTEM_NUMBER_KEY).unwrap();
let before = layer.get(block, &key).await.unwrap();
assert!(before.is_some());
layer.delete_prefix(&prefix).unwrap();
let after = layer.get(block, &key).await.unwrap();
assert!(after.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn delete_prefix_adds_to_deleted_prefixes() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = b"prefix_";
layer.delete_prefix(prefix).unwrap();
assert!(layer.is_deleted(prefix).unwrap());
}
#[tokio::test(flavor = "multi_thread")]
async fn delete_prefix_with_empty_prefix() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let key1 = b"key1";
let key2 = b"key2";
layer.set(key1, Some(b"val1")).unwrap();
layer.set(key2, Some(b"val2")).unwrap();
layer.delete_prefix(b"").unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 0, "Empty prefix should delete all modifications");
}
#[tokio::test(flavor = "multi_thread")]
async fn is_deleted_returns_false_initially() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = b"prefix_";
assert!(!layer.is_deleted(prefix).unwrap());
}
#[tokio::test(flavor = "multi_thread")]
async fn is_deleted_returns_true_after_delete() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = b"prefix_";
layer.delete_prefix(prefix).unwrap();
assert!(layer.is_deleted(prefix).unwrap());
}
#[tokio::test(flavor = "multi_thread")]
async fn is_deleted_exact_match_only() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix1 = b"prefix_";
let prefix2 = b"prefix_other";
layer.delete_prefix(prefix1).unwrap();
assert!(layer.is_deleted(prefix1).unwrap());
assert!(!layer.is_deleted(prefix2).unwrap());
}
#[tokio::test(flavor = "multi_thread")]
async fn diff_returns_empty_initially() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn diff_returns_all_modifications() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let key1 = b"key1";
let key2 = b"key2";
let value1 = b"value1";
let value2 = b"value2";
layer.set(key1, Some(value1)).unwrap();
layer.set(key2, Some(value2)).unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 2);
assert!(diff.iter().any(|(k, v)| k == key1 &&
v.as_ref().and_then(|v| v.value.as_deref()) == Some(value1.as_slice())));
assert!(diff.iter().any(|(k, v)| k == key2 &&
v.as_ref().and_then(|v| v.value.as_deref()) == Some(value2.as_slice())));
}
#[tokio::test(flavor = "multi_thread")]
async fn diff_includes_deletions() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let key = b"deleted";
layer.set(key, None).unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 1);
assert_eq!(diff[0].0, key);
assert!(diff[0].1.as_ref().map(|v| v.value.is_none()).unwrap_or(false));
}
#[tokio::test(flavor = "multi_thread")]
async fn diff_excludes_prefix_deleted_keys() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = b"prefix_";
let key = b"prefix_key";
layer.set(key, Some(b"value")).unwrap();
layer.delete_prefix(prefix).unwrap();
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 0, "diff() should not include prefix-deleted keys");
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_writes_to_cache() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"commit_key1";
let key2 = b"commit_key2";
let value1 = b"commit_value1";
let value2 = b"commit_value2";
layer.set(key1, Some(value1)).unwrap();
layer.set(key2, Some(value2)).unwrap();
assert!(
ctx.remote()
.cache()
.get_local_value_at_block(key1, block)
.await
.unwrap()
.is_none()
);
assert!(
ctx.remote()
.cache()
.get_local_value_at_block(key2, block)
.await
.unwrap()
.is_none()
);
layer.commit().await.unwrap();
let cached1 = ctx.remote().cache().get_local_value_at_block(key1, block).await.unwrap();
let cached2 = ctx.remote().cache().get_local_value_at_block(key2, block).await.unwrap();
assert_eq!(cached1, Some(Some(value1.to_vec())));
assert_eq!(cached2, Some(Some(value2.to_vec())));
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_preserves_modifications() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"preserve_key";
let value = b"preserve_value";
layer.set(key, Some(value)).unwrap();
layer.commit().await.unwrap();
let local_result = layer.get(block + 1, key).await.unwrap();
assert_eq!(local_result.as_ref().and_then(|v| v.value.as_deref()), Some(value.as_slice()));
let diff = layer.diff().unwrap();
assert_eq!(diff.len(), 1);
assert_eq!(diff[0].0, key);
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_with_deletions() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key1 = b"delete_key1";
let key2 = b"delete_key2";
let value = b"value";
layer.set(key1, Some(value)).unwrap();
layer.set(key2, None).unwrap();
layer.commit().await.unwrap();
let cached1 = ctx.remote().cache().get_local_value_at_block(key1, block).await.unwrap();
let cached2 = ctx.remote().cache().get_local_value_at_block(key2, block).await.unwrap();
assert_eq!(cached1, Some(Some(value.to_vec())));
assert_eq!(cached2, Some(None)); }
#[tokio::test(flavor = "multi_thread")]
async fn commit_empty_modifications() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let result = layer.commit().await;
assert!(result.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_multiple_times() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let block = layer.get_current_block_number();
let key = b"multi_block_key";
let value = b"multi_block_value";
layer.set(key, Some(value)).unwrap();
layer.commit().await.unwrap();
layer.commit().await.unwrap();
let cached1 = ctx.remote().cache().get_local_value_at_block(key, block).await.unwrap();
let cached2 = ctx.remote().cache().get_local_value_at_block(key, block + 1).await.unwrap();
assert_eq!(cached1, Some(Some(value.to_vec())));
assert_eq!(cached2, Some(Some(value.to_vec())));
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_validity_ranges_work_properly() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let key = b"validity_test_key";
let value1 = b"value_version_1";
let value2 = b"value_version_2";
let block_n = layer.get_current_block_number();
layer.set(key, Some(value1)).unwrap();
layer.commit().await.unwrap();
let key_row = ctx.remote().cache().get_local_key(key).await.unwrap();
assert!(key_row.is_some());
let key_id = key_row.unwrap().id;
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n).await.unwrap(),
Some(Some(value1.to_vec()))
);
layer.commit().await.unwrap();
layer.commit().await.unwrap();
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 1).await.unwrap(),
Some(Some(value1.to_vec()))
);
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 2).await.unwrap(),
Some(Some(value1.to_vec()))
);
layer.set(key, None).unwrap();
layer.commit().await.unwrap();
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n).await.unwrap(),
Some(Some(value1.to_vec())),
);
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 2).await.unwrap(),
Some(Some(value1.to_vec())),
);
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 3).await.unwrap(),
Some(None),
);
assert_eq!(
ctx.remote()
.cache()
.get_local_value_at_block(key, block_n + 3 + 10)
.await
.unwrap(),
Some(None),
);
layer.set(key, Some(value2)).unwrap();
layer.commit().await.unwrap();
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n).await.unwrap(),
Some(Some(value1.to_vec()))
);
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 3).await.unwrap(),
Some(None)
);
assert_eq!(
ctx.remote().cache().get_local_value_at_block(key, block_n + 4).await.unwrap(),
Some(Some(value2.to_vec()))
);
let key_row_after = ctx.remote().cache().get_local_key(key).await.unwrap();
assert_eq!(key_row_after.unwrap().id, key_id);
}
#[tokio::test(flavor = "multi_thread")]
async fn commit_only_commits_the_minimal_information_needed() {
use crate::schema::local_values::{self, columns as lvc};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let cache_clone = layer.parent.cache().clone();
let key1 = b"key_block_n";
let key2 = b"key_block_n_plus_1";
let value1 = b"value1";
let value2 = b"value2";
let value3 = b"value3";
let block_n = layer.get_current_block_number() as i64;
layer.set(key1, Some(value1)).unwrap();
layer.commit().await.unwrap();
layer.set(key2, Some(value2)).unwrap();
layer.commit().await.unwrap();
let key_1_id = 1;
let key_2_id = 2;
{
let mut conn = cache_clone.get_conn().await.unwrap();
let key_1_entries: Vec<(i64, Option<i64>, Option<Vec<u8>>)> = local_values::table
.filter(lvc::key_id.eq(key_1_id))
.select((lvc::valid_from, lvc::valid_until, lvc::value))
.load(&mut conn)
.await
.unwrap();
let key_2_entries: Vec<(i64, Option<i64>, Option<Vec<u8>>)> = local_values::table
.filter(lvc::key_id.eq(key_2_id))
.select((lvc::valid_from, lvc::valid_until, lvc::value))
.load(&mut conn)
.await
.unwrap();
assert_eq!(key_1_entries.len(), 1);
assert_eq!(key_1_entries[0], (block_n, None, Some(value1.to_vec())));
assert_eq!(key_2_entries.len(), 1);
assert_eq!(key_2_entries[0], (block_n + 1, None, Some(value2.to_vec())));
}
layer.set(key1, Some(value3)).unwrap();
layer.commit().await.unwrap();
{
let mut conn = cache_clone.get_conn().await.unwrap();
let key_1_entries: Vec<(i64, Option<i64>, Option<Vec<u8>>)> = local_values::table
.filter(lvc::key_id.eq(key_1_id))
.select((lvc::valid_from, lvc::valid_until, lvc::value))
.load(&mut conn)
.await
.unwrap();
let key_2_entries: Vec<(i64, Option<i64>, Option<Vec<u8>>)> = local_values::table
.filter(lvc::key_id.eq(key_2_id))
.select((lvc::valid_from, lvc::valid_until, lvc::value))
.load(&mut conn)
.await
.unwrap();
assert_eq!(key_1_entries.len(), 2);
assert_eq!(key_1_entries[0], (block_n, Some(block_n + 2), Some(value1.to_vec())));
assert_eq!(key_1_entries[1], (block_n + 2, None, Some(value3.to_vec())));
assert_eq!(key_2_entries.len(), 1);
assert_eq!(key_2_entries[0], (block_n + 1, None, Some(value2.to_vec())));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_returns_next_key_from_parent() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();
let first_key = layer.next_key(&prefix, &[]).await.unwrap();
assert!(first_key.is_some(), "System pallet should have at least one key");
let first_key = first_key.unwrap();
assert!(first_key.starts_with(&prefix), "Returned key should start with the prefix");
let second_key = layer.next_key(&prefix, &first_key).await.unwrap();
assert!(second_key.is_some(), "System pallet should have more than one key");
let second_key = second_key.unwrap();
assert!(second_key.starts_with(&prefix), "Second key should also start with the prefix");
assert!(second_key > first_key, "Second key should be greater than first key");
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_returns_none_when_no_more_keys() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let nonexistent_prefix = b"nonexistent_prefix_12345";
let result = layer.next_key(nonexistent_prefix, &[]).await.unwrap();
assert!(result.is_none(), "Should return None for nonexistent prefix");
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_skips_deleted_prefix() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();
let first_key = layer.next_key(&prefix, &[]).await.unwrap().unwrap();
let second_key = layer.next_key(&prefix, &first_key).await.unwrap().unwrap();
layer.delete_prefix(&first_key).unwrap();
let result = layer.next_key(&prefix, &[]).await.unwrap();
assert!(result.is_some(), "Should find a key after skipping deleted one");
assert_eq!(
result.unwrap(),
second_key,
"Should return second key after skipping deleted first key"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_skips_multiple_deleted_keys() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();
let first_key = layer.next_key(&prefix, &[]).await.unwrap().unwrap();
let second_key = layer.next_key(&prefix, &first_key).await.unwrap().unwrap();
let third_key = layer.next_key(&prefix, &second_key).await.unwrap().unwrap();
layer.delete_prefix(&first_key).unwrap();
layer.delete_prefix(&second_key).unwrap();
let result = layer.next_key(&prefix, &[]).await.unwrap();
assert!(result.is_some(), "Should find a key after skipping deleted ones");
assert_eq!(
result.unwrap(),
third_key,
"Should return third key after skipping first two deleted keys"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_returns_none_when_all_remaining_deleted() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let prefix = hex::decode(SYSTEM_PALLET_PREFIX).unwrap();
layer.delete_prefix(&prefix).unwrap();
let result = layer.next_key(&prefix, &[]).await.unwrap();
assert!(result.is_none(), "Should return None when all keys match deleted prefix");
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_with_empty_prefix() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let result = layer.next_key(&[], &[]).await.unwrap();
assert!(result.is_some(), "Empty prefix should return some key from storage");
}
#[tokio::test(flavor = "multi_thread")]
async fn next_key_with_nonexistent_prefix() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let nonexistent_prefix = b"this_prefix_definitely_does_not_exist_xyz";
let result = layer.next_key(nonexistent_prefix, &[]).await.unwrap();
assert!(result.is_none(), "Nonexistent prefix should return None");
}
#[tokio::test(flavor = "multi_thread")]
async fn metadata_at_returns_metadata_for_future_blocks() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let future_block = ctx.block_number() + 100;
let metadata = layer.metadata_at(future_block).await.unwrap();
assert!(metadata.pallets().count() > 0, "Metadata should be valid for future blocks");
}
#[tokio::test(flavor = "multi_thread")]
async fn metadata_at_fetches_from_remote_for_pre_fork_blocks() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
if ctx.block_number() > 0 {
let metadata = layer.metadata_at(ctx.block_number() - 1).await.unwrap();
assert!(metadata.pallets().count() > 0, "Should fetch metadata from remote");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn register_metadata_version_adds_new_version() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let new_block = ctx.block_number() + 10;
layer.register_metadata_version(new_block, ctx.metadata().clone()).unwrap();
let old_metadata = layer.metadata_at(ctx.block_number()).await.unwrap();
let new_metadata = layer.metadata_at(new_block).await.unwrap();
assert!(old_metadata.pallets().count() > 0);
assert!(new_metadata.pallets().count() > 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn register_metadata_version_respects_block_boundaries() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let upgrade_block = ctx.block_number() + 5;
layer.register_metadata_version(upgrade_block, ctx.metadata().clone()).unwrap();
let before_upgrade = layer.metadata_at(upgrade_block - 1).await.unwrap();
let at_upgrade = layer.metadata_at(upgrade_block).await.unwrap();
let after_upgrade = layer.metadata_at(upgrade_block + 10).await.unwrap();
assert!(before_upgrade.pallets().count() > 0);
assert!(at_upgrade.pallets().count() > 0);
assert!(after_upgrade.pallets().count() > 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn has_code_changed_at_returns_false_when_no_code_modified() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let result = layer.has_code_changed_at(ctx.block_number()).unwrap();
assert!(!result, "Should return false when no code was modified");
}
#[tokio::test(flavor = "multi_thread")]
async fn has_code_changed_at_returns_false_for_non_code_modifications() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
layer.set(b"some_random_key", Some(b"some_value")).unwrap();
let block = layer.get_current_block_number();
let result = layer.has_code_changed_at(block).unwrap();
assert!(!result, "Should return false when only non-code keys modified");
}
#[tokio::test(flavor = "multi_thread")]
async fn has_code_changed_at_returns_true_when_code_modified() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let code_key = sp_core::storage::well_known_keys::CODE;
layer.set(code_key, Some(b"new_runtime_code")).unwrap();
let block = layer.get_current_block_number();
let result = layer.has_code_changed_at(block).unwrap();
assert!(result, "Should return true when :code was modified at the specified block");
}
#[tokio::test(flavor = "multi_thread")]
async fn has_code_changed_at_returns_false_for_different_block() {
let ctx = TestContext::for_local().await;
let layer = create_layer(&ctx);
let code_key = sp_core::storage::well_known_keys::CODE;
layer.set(code_key, Some(b"new_runtime_code")).unwrap();
let current_block = layer.get_current_block_number();
let result = layer.has_code_changed_at(current_block + 1).unwrap();
assert!(!result, "Should return false when checking different block than modification");
let result = layer.has_code_changed_at(current_block - 1).unwrap();
assert!(!result, "Should return false when checking block before modification");
}
#[tokio::test(flavor = "multi_thread")]
async fn has_code_changed_at_tracks_modification_block_correctly() {
let ctx = TestContext::for_local().await;
let mut layer = create_layer(&ctx);
let code_key = sp_core::storage::well_known_keys::CODE;
let first_block = layer.get_current_block_number();
layer.set(code_key, Some(b"runtime_v1")).unwrap();
assert!(
layer.has_code_changed_at(first_block).unwrap(),
"Code should be marked as changed at first block"
);
layer.commit().await.unwrap();
let second_block = layer.get_current_block_number();
assert!(
layer.has_code_changed_at(first_block).unwrap(),
"Code change should still be recorded at first block"
);
assert!(
!layer.has_code_changed_at(second_block).unwrap(),
"Code should not be marked as changed at second block (no new modification)"
);
layer.set(code_key, Some(b"runtime_v2")).unwrap();
assert!(
layer.has_code_changed_at(second_block).unwrap(),
"Code should be marked as changed at second block after new modification"
);
}
}