mod storage;
pub use storage::Metadata;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("runtime error: {0}")]
Runtime(#[from] commonware_runtime::Error),
}
#[derive(Clone)]
pub struct Config<C> {
pub partition: String,
pub codec_config: C,
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
use commonware_utils::{hex, sequence::U64};
use rand::{Rng, RngCore};
#[test_traced]
fn test_put_get_clear() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let value = metadata.get(&key);
assert!(value.is_none());
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 0"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
assert!(buffer.contains("first_keys 0"));
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
let value = metadata.get(&key).unwrap();
assert_eq!(value, &hello);
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 0"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
assert!(buffer.contains("first_keys 1"));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 1"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
assert!(buffer.contains("first_keys 1"));
drop(metadata);
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 0"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 1"));
let value = metadata.get(&key).unwrap();
assert_eq!(value, &hello);
metadata.clear();
let value = metadata.get(&key);
assert!(value.is_none());
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 0"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 0"));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_put_returns_previous_value() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let previous = metadata.put(key.clone(), b"first".to_vec());
assert!(previous.is_none());
let previous = metadata.put(key.clone(), b"second".to_vec());
assert_eq!(previous, Some(b"first".to_vec()));
let previous = metadata.put(key.clone(), b"third".to_vec());
assert_eq!(previous, Some(b"second".to_vec()));
assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
let other_key = U64::new(99);
let previous = metadata.put(other_key.clone(), b"other".to_vec());
assert!(previous.is_none());
metadata.sync().await.unwrap();
drop(metadata);
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let previous = metadata.put(key.clone(), b"fourth".to_vec());
assert_eq!(previous, Some(b"third".to_vec()));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_multi_sync() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 1"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
assert!(buffer.contains("first_keys 1"));
let world = b"world".to_vec();
metadata.put(key.clone(), world.clone());
let key2 = U64::new(43);
let foo = b"foo".to_vec();
metadata.put(key2.clone(), foo.clone());
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 2"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
assert!(buffer.contains("first_keys 2"));
drop(metadata);
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 0"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 2"));
let value = metadata.get(&key).unwrap();
assert_eq!(value, &world);
let value = metadata.get(&key2).unwrap();
assert_eq!(value, &foo);
metadata.remove(&key);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 1"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 1"));
drop(metadata);
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("third"), cfg)
.await
.unwrap();
let buffer = context.encode();
assert!(buffer.contains("third_sync_rewrites_total 0"));
assert!(buffer.contains("third_sync_overwrites_total 0"));
assert!(buffer.contains("third_keys 1"));
let value = metadata.get(&key);
assert!(value.is_none());
let value = metadata.get(&key2).unwrap();
assert_eq!(value, &foo);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_recover_corrupted_one() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
metadata.sync().await.unwrap();
let world = b"world".to_vec();
metadata.put(key.clone(), world.clone());
let key2 = U64::new(43);
let foo = b"foo".to_vec();
metadata.put(key2, foo.clone());
metadata.sync().await.unwrap();
drop(metadata);
let (blob, _) = context.open("test", b"left").await.unwrap();
blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
blob.sync().await.unwrap();
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, &hello);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_recover_corrupted_both() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
metadata.sync().await.unwrap();
let world = b"world".to_vec();
metadata.put(key.clone(), world.clone());
let key2 = U64::new(43);
let foo = b"foo".to_vec();
metadata.put(key2, foo.clone());
metadata.sync().await.unwrap();
drop(metadata);
let (blob, _) = context.open("test", b"left").await.unwrap();
blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
blob.sync().await.unwrap();
let (blob, _) = context.open("test", b"right").await.unwrap();
blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
blob.sync().await.unwrap();
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key);
assert!(value.is_none());
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 0"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 0"));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_recover_corrupted_truncate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
metadata.sync().await.unwrap();
let world = b"world".to_vec();
metadata.put(key.clone(), world.clone());
let key2 = U64::new(43);
let foo = b"foo".to_vec();
metadata.put(key2, foo.clone());
metadata.sync().await.unwrap();
drop(metadata);
let (blob, len) = context.open("test", b"left").await.unwrap();
blob.resize(len - 8).await.unwrap();
blob.sync().await.unwrap();
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, &hello);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_recover_corrupted_short() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::init(context.with_label("first"), cfg)
.await
.unwrap();
let key = U64::new(42);
let hello = b"hello".to_vec();
metadata.put(key.clone(), hello.clone());
metadata.sync().await.unwrap();
let world = b"world".to_vec();
metadata.put(key.clone(), world.clone());
let key2 = U64::new(43);
let foo = b"foo".to_vec();
metadata.put(key2, foo.clone());
metadata.sync().await.unwrap();
drop(metadata);
let (blob, _) = context.open("test", b"left").await.unwrap();
blob.resize(5).await.unwrap();
blob.sync().await.unwrap();
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, &hello);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_unclean_shutdown() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let key = U64::new(42);
let hello = b"hello".to_vec();
{
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::init(context.with_label("first"), cfg)
.await
.unwrap();
metadata.put(key.clone(), hello.clone());
}
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key);
assert!(value.is_none());
let buffer = context.encode();
assert!(buffer.contains("second_sync_rewrites_total 0"));
assert!(buffer.contains("second_sync_overwrites_total 0"));
assert!(buffer.contains("second_keys 0"));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
#[should_panic(expected = "usize value is larger than u32")]
fn test_value_too_big_error() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
let value = vec![0u8; (u32::MAX as usize) + 1];
metadata.put(U64::new(1), value);
metadata.sync().await.unwrap();
});
}
#[test_traced]
fn test_delta_writes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
for i in 0..100 {
metadata.put(U64::new(i), vec![i as u8; 100]);
}
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 10912"),
"{buffer}",
);
metadata.put(U64::new(51), vec![0xff; 100]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 21824"),
"{buffer}",
);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 21937"),
"{buffer}",
);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 21949"),
"{buffer}",
);
metadata.remove(&U64::new(51));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 32752"),
"{buffer}"
);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 43555"),
"{buffer}"
);
metadata.put(U64::new(50), vec![0xff; 100]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
assert!(
buffer.contains("runtime_storage_write_bytes_total 43668"),
"{buffer}"
);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_sync_with_no_changes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
.await
.unwrap();
metadata.put(U64::new(1), b"hello".to_vec());
metadata.sync().await.unwrap();
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 0"));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 1"));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 2"));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_get_mut_marks_modified() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
metadata.put(U64::new(1), b"hello".to_vec());
metadata.sync().await.unwrap();
metadata.sync().await.unwrap();
let value = metadata.get_mut(&U64::new(1)).unwrap();
value[0] = b'H';
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 2"));
assert!(buffer.contains("first_sync_overwrites_total 1"));
drop(metadata);
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&U64::new(1)).unwrap();
assert_eq!(value[0], b'H');
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_mixed_operation_sequences() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
let key = U64::new(1);
metadata.put(key.clone(), b"first".to_vec());
metadata.remove(&key);
metadata.put(key.clone(), b"second".to_vec());
metadata.sync().await.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, b"second");
metadata.put(key.clone(), b"third".to_vec());
let value = metadata.get_mut(&key).unwrap();
value[0] = b'T';
metadata.remove(&key);
metadata.put(key.clone(), b"fourth".to_vec());
metadata.sync().await.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, b"fourth");
drop(metadata);
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&key).unwrap();
assert_eq!(value, b"fourth");
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_overwrite_vs_rewrite() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
.await
.unwrap();
metadata.put(U64::new(1), vec![1; 10]);
metadata.put(U64::new(2), vec![2; 10]);
metadata.sync().await.unwrap();
metadata.put(U64::new(1), vec![0xFF; 10]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 0"));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 1"));
metadata.put(U64::new(1), vec![0xAA; 10]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 2"));
assert!(buffer.contains("sync_overwrites_total 2"));
metadata.put(U64::new(1), vec![0xFF; 20]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 3"));
assert!(buffer.contains("sync_overwrites_total 2"));
metadata.put(U64::new(3), vec![3; 10]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 4"));
assert!(buffer.contains("sync_overwrites_total 2"));
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 5"));
assert!(buffer.contains("sync_overwrites_total 2"));
metadata.put(U64::new(2), vec![0xAA; 10]);
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("sync_rewrites_total 5"));
assert!(buffer.contains("sync_overwrites_total 3"));
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_blob_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
for i in 0..10 {
metadata.put(U64::new(i), vec![i as u8; 100]);
}
metadata.sync().await.unwrap();
metadata.sync().await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 2"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
for i in 1..10 {
metadata.remove(&U64::new(i));
}
metadata.sync().await.unwrap();
let value = metadata.get(&U64::new(0)).unwrap();
assert_eq!(value.len(), 100);
assert_eq!(value[0], 0);
let buffer = context.encode();
assert!(buffer.contains("first_sync_rewrites_total 3"));
assert!(buffer.contains("first_sync_overwrites_total 0"));
drop(metadata);
let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
let value = metadata.get(&U64::new(0)).unwrap();
assert_eq!(value.len(), 100);
assert_eq!(value[0], 0);
for i in 1..10 {
assert!(metadata.get(&U64::new(i)).is_none());
}
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_clear_and_repopulate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
metadata.put(U64::new(1), b"first".to_vec());
metadata.put(U64::new(2), b"second".to_vec());
metadata.sync().await.unwrap();
metadata.clear();
metadata.sync().await.unwrap();
assert!(metadata.get(&U64::new(1)).is_none());
assert!(metadata.get(&U64::new(2)).is_none());
drop(metadata);
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
assert!(metadata.get(&U64::new(1)).is_none());
assert!(metadata.get(&U64::new(2)).is_none());
metadata.put(U64::new(3), b"third".to_vec());
metadata.put(U64::new(4), b"fourth".to_vec());
metadata.sync().await.unwrap();
assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
assert!(metadata.get(&U64::new(1)).is_none());
assert!(metadata.get(&U64::new(2)).is_none());
metadata.destroy().await.unwrap();
});
}
fn test_metadata_operations_and_restart(num_operations: usize) -> String {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let cfg = Config {
partition: "test-determinism".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
.await
.unwrap();
for i in 0..num_operations {
let key = U64::new(i as u64);
let mut value = vec![0u8; 64];
context.fill_bytes(&mut value);
metadata.put(key, value);
if context.gen_bool(0.1) {
metadata.sync().await.unwrap();
}
if context.gen_bool(0.1) {
let selected_index = context.gen_range(0..=i);
let update_key = U64::new(selected_index as u64);
let mut new_value = vec![0u8; 64];
context.fill_bytes(&mut new_value);
metadata.put(update_key, new_value);
}
if context.gen_bool(0.1) {
let selected_index = context.gen_range(0..=i);
let remove_key = U64::new(selected_index as u64);
metadata.remove(&remove_key);
}
if context.gen_bool(0.1) {
let selected_index = context.gen_range(0..=i);
let mut_key = U64::new(selected_index as u64);
if let Some(value) = metadata.get_mut(&mut_key) {
if !value.is_empty() {
value[0] = value[0].wrapping_add(1);
}
}
}
}
metadata.sync().await.unwrap();
metadata.destroy().await.unwrap();
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
let state1 = test_metadata_operations_and_restart(1_000);
let state2 = test_metadata_operations_and_restart(1_000);
assert_eq!(state1, state2);
}
#[test_traced]
fn test_keys_iterator() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
.await
.unwrap();
metadata.put(U64::new(0x1000), b"value1".to_vec());
metadata.put(U64::new(0x1001), b"value2".to_vec());
metadata.put(U64::new(0x1002), b"value3".to_vec());
metadata.put(U64::new(0x2000), b"value4".to_vec());
metadata.put(U64::new(0x2001), b"value5".to_vec());
metadata.put(U64::new(0x3000), b"value6".to_vec());
let all_keys: Vec<_> = metadata.keys().cloned().collect();
assert_eq!(all_keys.len(), 6);
assert!(all_keys.contains(&U64::new(0x1000)));
assert!(all_keys.contains(&U64::new(0x3000)));
let prefix = hex!("0x00000000000010");
let prefix_keys: Vec<_> = metadata
.keys()
.filter(|k| k.as_ref().starts_with(&prefix))
.cloned()
.collect();
assert_eq!(prefix_keys.len(), 3);
assert!(prefix_keys.contains(&U64::new(0x1000)));
assert!(prefix_keys.contains(&U64::new(0x1001)));
assert!(prefix_keys.contains(&U64::new(0x1002)));
assert!(!prefix_keys.contains(&U64::new(0x2000)));
let prefix = hex!("0x00000000000020");
let prefix_keys: Vec<_> = metadata
.keys()
.filter(|k| k.as_ref().starts_with(&prefix))
.cloned()
.collect();
assert_eq!(prefix_keys.len(), 2);
assert!(prefix_keys.contains(&U64::new(0x2000)));
assert!(prefix_keys.contains(&U64::new(0x2001)));
let prefix = hex!("0x00000000000040");
let prefix_keys: Vec<_> = metadata
.keys()
.filter(|k| k.as_ref().starts_with(&prefix))
.cloned()
.collect();
assert_eq!(prefix_keys.len(), 0);
metadata.destroy().await.unwrap();
});
}
#[test_traced]
fn test_retain() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
.await
.unwrap();
metadata.put(U64::new(0x1000), b"value1".to_vec());
metadata.put(U64::new(0x1001), b"value2".to_vec());
metadata.put(U64::new(0x1002), b"value3".to_vec());
metadata.put(U64::new(0x2000), b"value4".to_vec());
metadata.put(U64::new(0x2001), b"value5".to_vec());
metadata.put(U64::new(0x3000), b"value6".to_vec());
let buffer = context.encode();
assert!(buffer.contains("first_keys 6"));
let prefix = hex!("0x00000000000010");
metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
let buffer = context.encode();
assert!(buffer.contains("first_keys 3"));
assert!(metadata.get(&U64::new(0x1000)).is_none());
assert!(metadata.get(&U64::new(0x1001)).is_none());
assert!(metadata.get(&U64::new(0x1002)).is_none());
assert!(metadata.get(&U64::new(0x2000)).is_some());
assert!(metadata.get(&U64::new(0x2001)).is_some());
assert!(metadata.get(&U64::new(0x3000)).is_some());
metadata.sync().await.unwrap();
drop(metadata);
let cfg = Config {
partition: "test".into(),
codec_config: ((0..).into(), ()),
};
let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
.await
.unwrap();
assert!(metadata.get(&U64::new(0x1000)).is_none());
assert!(metadata.get(&U64::new(0x2000)).is_some());
assert_eq!(metadata.keys().count(), 3);
let prefix = hex!("0x00000000000040");
metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
metadata.retain(|_, _| false);
assert_eq!(metadata.keys().count(), 0);
metadata.destroy().await.unwrap();
});
}
}