#[cfg(test)]
mod conformance;
mod storage;
use commonware_runtime::buffer::paged::CacheRef;
use commonware_utils::Array;
use std::num::NonZeroUsize;
pub use storage::{Checkpoint, Cursor, Freezer};
use thiserror::Error;
pub enum Identifier<'a, K: Array> {
Cursor(Cursor),
Key(&'a K),
}
#[derive(Debug, Error)]
pub enum Error {
#[error("runtime error: {0}")]
Runtime(#[from] commonware_runtime::Error),
#[error("journal error: {0}")]
Journal(#[from] crate::journal::Error),
#[error("codec error: {0}")]
Codec(#[from] commonware_codec::Error),
}
#[derive(Clone)]
pub struct Config<C> {
pub key_partition: String,
pub key_write_buffer: NonZeroUsize,
pub key_page_cache: CacheRef,
pub value_partition: String,
pub value_compression: Option<u8>,
pub value_write_buffer: NonZeroUsize,
pub value_target_size: u64,
pub table_partition: String,
pub table_initial_size: u32,
pub table_resize_frequency: u8,
pub table_resize_chunk_size: u32,
pub table_replay_buffer: NonZeroUsize,
pub codec_config: C,
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_codec::DecodeExt;
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
use rand::{Rng, RngCore};
use std::num::NonZeroU16;
fn test_key(key: &str) -> FixedBytes<64> {
let mut buf = [0u8; 64];
let key = key.as_bytes();
assert!(key.len() <= buf.len());
buf[..key.len()].copy_from_slice(key);
FixedBytes::decode(buf.as_ref()).unwrap()
}
const DEFAULT_WRITE_BUFFER: usize = 1024;
const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
fn test_put_get(compression: Option<u8>) {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: compression,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize freezer");
let key = test_key("testkey");
let data = 42;
let value = freezer
.get(Identifier::Key(&key))
.await
.expect("Failed to check key");
assert!(value.is_none());
freezer
.put(key.clone(), data)
.await
.expect("Failed to put data");
let value = freezer
.get(Identifier::Key(&key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(value, data);
let buffer = context.encode();
assert!(buffer.contains("gets_total 2"), "{}", buffer);
assert!(buffer.contains("puts_total 1"), "{}", buffer);
assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
freezer.sync().await.expect("Failed to sync data");
});
}
#[test_traced]
fn test_put_get_no_compression() {
test_put_get(None);
}
#[test_traced]
fn test_put_get_compression() {
test_put_get(Some(3));
}
#[test_traced]
fn test_multiple_keys() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize freezer");
let keys = vec![
(test_key("key1"), 1),
(test_key("key2"), 2),
(test_key("key3"), 3),
(test_key("key4"), 4),
(test_key("key5"), 5),
];
for (key, data) in &keys {
freezer
.put(key.clone(), *data)
.await
.expect("Failed to put data");
}
for (key, data) in &keys {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *data);
}
});
}
#[test_traced]
fn test_collision_handling() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize freezer");
let keys = vec![
(test_key("key1"), 1),
(test_key("key2"), 2),
(test_key("key3"), 3),
(test_key("key4"), 4),
(test_key("key5"), 5),
(test_key("key6"), 6),
(test_key("key7"), 7),
(test_key("key8"), 8),
];
for (key, data) in &keys {
freezer
.put(key.clone(), *data)
.await
.expect("Failed to put data");
}
freezer.sync().await.expect("Failed to sync");
for (key, data) in &keys {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *data);
}
let buffer = context.encode();
assert!(buffer.contains("gets_total 8"), "{}", buffer);
assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
});
}
#[test_traced]
fn test_restart() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
let keys = vec![
(test_key("persist1"), 100),
(test_key("persist2"), 200),
(test_key("persist3"), 300),
];
for (key, data) in &keys {
freezer
.put(key.clone(), *data)
.await
.expect("Failed to put data");
}
freezer.close().await.expect("Failed to close freezer")
};
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
let keys = vec![
(test_key("persist1"), 100),
(test_key("persist2"), 200),
(test_key("persist3"), 300),
];
for (key, data) in &keys {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *data);
}
}
});
}
#[test_traced]
fn test_crash_consistency() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
freezer
.put(test_key("committed1"), 1)
.await
.expect("Failed to put data");
freezer
.put(test_key("committed2"), 2)
.await
.expect("Failed to put data");
freezer.sync().await.expect("Failed to sync");
freezer
.put(test_key("uncommitted1"), 3)
.await
.expect("Failed to put data");
freezer
.put(test_key("uncommitted2"), 4)
.await
.expect("Failed to put data");
freezer.close().await.expect("Failed to close")
};
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
assert_eq!(
freezer
.get(Identifier::Key(&test_key("committed1")))
.await
.unwrap(),
Some(1)
);
assert_eq!(
freezer
.get(Identifier::Key(&test_key("committed2")))
.await
.unwrap(),
Some(2)
);
if let Some(val) = freezer
.get(Identifier::Key(&test_key("uncommitted1")))
.await
.unwrap()
{
assert_eq!(val, 3);
}
if let Some(val) = freezer
.get(Identifier::Key(&test_key("uncommitted2")))
.await
.unwrap()
{
assert_eq!(val, 4);
}
}
});
}
#[test_traced]
fn test_destroy() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
{
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
freezer
.put(test_key("destroy1"), 1)
.await
.expect("Failed to put data");
freezer
.put(test_key("destroy2"), 2)
.await
.expect("Failed to put data");
freezer.destroy().await.expect("Failed to destroy freezer");
}
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("second"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
assert!(freezer
.get(Identifier::Key(&test_key("destroy1")))
.await
.unwrap()
.is_none());
assert!(freezer
.get(Identifier::Key(&test_key("destroy2")))
.await
.unwrap()
.is_none());
}
});
}
#[test_traced]
fn test_partial_table_entry_write() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
freezer.put(test_key("key1"), 42).await.unwrap();
freezer.sync().await.unwrap();
freezer.close().await.unwrap()
};
{
let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
blob.write_at(0, vec![0xFF; 10]).await.unwrap();
blob.sync().await.unwrap();
}
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
let result = freezer
.get(Identifier::Key(&test_key("key1")))
.await
.unwrap();
assert!(result.is_none() || result == Some(42));
}
});
}
#[test_traced]
fn test_table_entry_invalid_crc() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
freezer.put(test_key("key1"), 42).await.unwrap();
freezer.sync().await.unwrap();
freezer.close().await.unwrap()
};
{
let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
let entry_data = blob.read_at(0, 24).await.unwrap();
let mut corrupted = entry_data.coalesce();
corrupted.as_mut()[20] ^= 0xFF;
blob.write_at(0, corrupted).await.unwrap();
blob.sync().await.unwrap();
}
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
let result = freezer
.get(Identifier::Key(&test_key("key1")))
.await
.unwrap();
assert!(result.is_none() || result == Some(42));
}
});
}
#[test_traced]
fn test_table_extra_bytes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
freezer.put(test_key("key1"), 42).await.unwrap();
freezer.sync().await.unwrap();
freezer.close().await.unwrap()
};
{
let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
blob.write_at(size, hex!("0xdeadbeef").to_vec())
.await
.unwrap();
blob.sync().await.unwrap();
}
{
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
assert_eq!(
freezer
.get(Identifier::Key(&test_key("key1")))
.await
.unwrap(),
Some(42)
);
let mut freezer_mut = freezer;
freezer_mut.put(test_key("key2"), 43).await.unwrap();
assert_eq!(
freezer_mut
.get(Identifier::Key(&test_key("key2")))
.await
.unwrap(),
Some(43)
);
}
});
}
#[test_traced]
fn test_indexing_across_resizes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer =
Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize freezer");
let mut keys = Vec::new();
for i in 0..1000 {
let key = test_key(&format!("key{i}"));
keys.push((key.clone(), i));
freezer.put(key, i).await.expect("Failed to put data");
freezer.sync().await.expect("Failed to sync");
}
for (key, value) in &keys {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
}
let checkpoint = freezer.close().await.expect("Failed to close");
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to reinitialize freezer");
for (key, value) in &keys {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *value, "Value mismatch for key after restart");
}
let buffer = context.encode();
assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
});
}
#[test_traced]
fn test_insert_during_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: 2,
table_resize_frequency: 1,
table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
.await
.unwrap();
freezer.put(test_key("key0"), 0).await.unwrap();
freezer.put(test_key("key2"), 1).await.unwrap();
freezer.sync().await.unwrap();
assert!(freezer.resizing().is_some());
freezer.put(test_key("key6"), 2).await.unwrap();
assert!(context.encode().contains("unnecessary_writes_total 1"));
assert_eq!(freezer.resizable(), 3);
freezer.put(test_key("key3"), 3).await.unwrap();
assert!(context.encode().contains("unnecessary_writes_total 1"));
assert_eq!(freezer.resizable(), 3);
freezer.sync().await.unwrap();
assert!(freezer.resizing().is_none());
assert_eq!(freezer.resizable(), 2);
freezer.put(test_key("key4"), 4).await.unwrap();
freezer.put(test_key("key7"), 5).await.unwrap();
freezer.sync().await.unwrap();
assert!(freezer.resizing().is_some());
let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
for (i, k) in keys.iter().enumerate() {
assert_eq!(
freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
Some(i as i32)
);
}
while freezer.resizing().is_some() {
freezer.sync().await.unwrap();
}
assert_eq!(freezer.resizable(), 0);
});
}
#[test_traced]
fn test_resize_after_startup() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: 2,
table_resize_frequency: 1,
table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.unwrap();
freezer.put(test_key("key0"), 0).await.unwrap();
freezer.put(test_key("key2"), 1).await.unwrap();
let checkpoint = freezer.sync().await.unwrap();
assert!(freezer.resizing().is_some());
checkpoint
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.unwrap();
assert_eq!(freezer.resizable(), 1);
freezer.sync().await.unwrap();
assert!(freezer.resizing().is_some());
while freezer.resizing().is_some() {
freezer.sync().await.unwrap();
}
assert_eq!(freezer.resizable(), 0);
});
}
fn test_operations_and_restart(num_keys: usize) -> String {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: 128, table_partition: "test-table".into(),
table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
context.with_label("init1"),
cfg.clone(),
)
.await
.expect("Failed to initialize freezer");
let mut pairs = Vec::new();
for _ in 0..num_keys {
let mut key = [0u8; 96];
context.fill_bytes(&mut key);
let key = FixedBytes::<96>::new(key);
let mut value = [0u8; 256];
context.fill_bytes(&mut value);
let value = FixedBytes::<256>::new(value);
freezer
.put(key.clone(), value.clone())
.await
.expect("Failed to put data");
pairs.push((key, value));
if context.gen_bool(0.1) {
freezer.sync().await.expect("Failed to sync");
}
}
freezer.sync().await.expect("Failed to sync");
for (key, value) in &pairs {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
for (key, _) in &pairs {
assert!(freezer
.get(Identifier::Key(key))
.await
.expect("Failed to check key")
.is_some());
}
for _ in 0..10 {
let mut key = [0u8; 96];
context.fill_bytes(&mut key);
let key = FixedBytes::<96>::new(key);
assert!(freezer
.get(Identifier::Key(&key))
.await
.expect("Failed to check key")
.is_none());
}
let checkpoint = freezer.close().await.expect("Failed to close freezer");
let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
context.with_label("init2"),
cfg.clone(),
Some(checkpoint),
)
.await
.expect("Failed to initialize freezer");
for (key, value) in &pairs {
let retrieved = freezer
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
for _ in 0..20 {
let mut key = [0u8; 96];
context.fill_bytes(&mut key);
let key = FixedBytes::<96>::new(key);
let mut value = [0u8; 256];
context.fill_bytes(&mut value);
let value = FixedBytes::<256>::new(value);
freezer.put(key, value).await.expect("Failed to put data");
}
for _ in 0..3 {
freezer.sync().await.expect("Failed to sync");
for _ in 0..5 {
let mut key = [0u8; 96];
context.fill_bytes(&mut key);
let key = FixedBytes::<96>::new(key);
let mut value = [0u8; 256];
context.fill_bytes(&mut value);
let value = FixedBytes::<256>::new(value);
freezer.put(key, value).await.expect("Failed to put data");
}
}
freezer.sync().await.expect("Failed to sync");
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
let state1 = test_operations_and_restart(1_000);
let state2 = test_operations_and_restart(1_000);
assert_eq!(state1, state2);
}
#[test_traced]
fn test_put_multiple_updates() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_target_size: DEFAULT_VALUE_TARGET_SIZE,
table_partition: "test-table".into(),
table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
codec_config: (),
};
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize freezer");
let key = test_key("key1");
freezer
.put(key.clone(), 1)
.await
.expect("Failed to put data");
freezer
.put(key.clone(), 2)
.await
.expect("Failed to put data");
freezer.sync().await.expect("Failed to sync");
assert_eq!(
freezer
.get(Identifier::Key(&key))
.await
.expect("Failed to get data")
.unwrap(),
2
);
});
}
}