mod storage;
use std::num::{NonZeroU64, NonZeroUsize};
pub use storage::Ordinal;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("runtime error: {0}")]
Runtime(#[from] commonware_runtime::Error),
#[error("codec error: {0}")]
Codec(#[from] commonware_codec::Error),
#[error("invalid blob name: {0}")]
InvalidBlobName(String),
#[error("invalid record: {0}")]
InvalidRecord(u64),
#[error("missing record at {0}")]
MissingRecord(u64),
}
#[derive(Clone)]
pub struct Config {
pub partition: String,
pub items_per_blob: NonZeroU64,
pub write_buffer: NonZeroUsize,
pub replay_buffer: NonZeroUsize,
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_codec::{FixedSize, Read, ReadExt, Write};
use commonware_cryptography::Crc32;
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Blob, Buf, BufMut, Metrics, Runner, Storage};
use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
use rand::RngCore;
use std::collections::BTreeMap;
const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
const DEFAULT_WRITE_BUFFER: usize = 4096;
const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
#[test_traced]
fn test_put_get() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let value = FixedBytes::new([42u8; 32]);
assert!(!store.has(0));
store
.put(0, value.clone())
.await
.expect("Failed to put data");
assert!(store.has(0));
let retrieved = store
.get(0)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, value);
store.sync().await.expect("Failed to sync data");
let buffer = context.encode();
assert!(buffer.contains("gets_total 1"), "{}", buffer);
assert!(buffer.contains("puts_total 1"), "{}", buffer);
assert!(buffer.contains("has_total 2"), "{}", buffer);
assert!(buffer.contains("syncs_total 1"), "{}", buffer);
assert!(buffer.contains("pruned_total 0"), "{}", buffer);
let retrieved = store
.get(0)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, value);
});
}
#[test_traced]
fn test_concurrent_sync_does_not_report_success_while_flush_fails() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([42u8; 32]))
.await
.expect("Failed to put data");
let section = 0u64.to_be_bytes();
context
.remove(&cfg.partition, Some(§ion))
.await
.expect("Failed to remove blob");
let (first, second) = futures::future::join(store.sync(), store.sync()).await;
assert!(first.is_err(), "first sync unexpectedly succeeded");
assert!(second.is_err(), "second sync unexpectedly succeeded");
});
}
#[test_traced]
fn test_multiple_indices() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let indices = vec![
(0u64, FixedBytes::new([0u8; 32])),
(5u64, FixedBytes::new([5u8; 32])),
(10u64, FixedBytes::new([10u8; 32])),
(100u64, FixedBytes::new([100u8; 32])),
(1000u64, FixedBytes::new([200u8; 32])), ];
for (index, value) in &indices {
store
.put(*index, value.clone())
.await
.expect("Failed to put data");
}
store.sync().await.expect("Failed to sync");
for (index, value) in &indices {
let retrieved = store
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
});
}
#[test_traced]
fn test_sparse_indices() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let indices = vec![
(0u64, FixedBytes::new([0u8; 32])),
(99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
for (index, value) in &indices {
store
.put(*index, value.clone())
.await
.expect("Failed to put data");
}
assert!(!store.has(1));
assert!(!store.has(50));
assert!(!store.has(101));
assert!(!store.has(499));
store.sync().await.expect("Failed to sync");
for (index, value) in &indices {
let retrieved = store
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
});
}
#[test_traced]
fn test_next_gap() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
let (current_end, start_next) = store.next_gap(0);
assert!(current_end.is_none());
assert_eq!(start_next, Some(1));
let (current_end, start_next) = store.next_gap(1);
assert_eq!(current_end, Some(1));
assert_eq!(start_next, Some(10));
let (current_end, start_next) = store.next_gap(10);
assert_eq!(current_end, Some(11));
assert_eq!(start_next, Some(14));
let (current_end, start_next) = store.next_gap(11);
assert_eq!(current_end, Some(11));
assert_eq!(start_next, Some(14));
let (current_end, start_next) = store.next_gap(12);
assert!(current_end.is_none());
assert_eq!(start_next, Some(14));
let (current_end, start_next) = store.next_gap(14);
assert_eq!(current_end, Some(14));
assert!(start_next.is_none());
});
}
#[test_traced]
fn test_missing_items() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
assert_eq!(store.missing_items(4, 2), vec![4, 7]);
assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
assert_eq!(store.missing_items(5, 2), vec![7, 8]);
assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
let items = store.missing_items(11, 10);
assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
let items = store.missing_items(990, 15);
assert_eq!(
items,
vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
);
store.sync().await.unwrap();
assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
store
.put(10001, FixedBytes::new([101u8; 32]))
.await
.unwrap();
let items = store.missing_items(9998, 5);
assert_eq!(items, vec![9998, 10000]);
});
}
#[test_traced]
fn test_restart() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
let values = vec![
(0u64, FixedBytes::new([0u8; 32])),
(100u64, FixedBytes::new([100u8; 32])),
(1000u64, FixedBytes::new([200u8; 32])),
];
for (index, value) in &values {
store
.put(*index, value.clone())
.await
.expect("Failed to put data");
}
store.sync().await.expect("Failed to sync store");
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
let values = vec![
(0u64, FixedBytes::new([0u8; 32])),
(100u64, FixedBytes::new([100u8; 32])),
(1000u64, FixedBytes::new([200u8; 32])),
];
for (index, value) in &values {
let retrieved = store
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
let (current_end, start_next) = store.next_gap(0);
assert_eq!(current_end, Some(0));
assert_eq!(start_next, Some(100));
}
});
}
#[test_traced]
fn test_invalid_record() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([42u8; 32]))
.await
.expect("Failed to put data");
store.sync().await.expect("Failed to sync store");
}
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
blob.write_at(32, vec![0xFF]).await.unwrap();
blob.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
let result = store.get(0).await.unwrap();
assert!(result.is_none());
assert!(!store.has(0));
}
});
}
#[test_traced]
fn test_get_nonexistent() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let retrieved = store.get(999).await.expect("Failed to get data");
assert!(retrieved.is_none());
assert!(!store.has(999));
});
}
#[test_traced]
fn test_destroy() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([0u8; 32]))
.await
.expect("Failed to put data");
store
.put(1000, FixedBytes::new([100u8; 32]))
.await
.expect("Failed to put data");
store.destroy().await.expect("Failed to destroy store");
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(store.get(0).await.unwrap().is_none());
assert!(store.get(1000).await.unwrap().is_none());
assert!(!store.has(0));
assert!(!store.has(1000));
}
});
}
#[test_traced]
fn test_partial_record_write() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([42u8; 32]))
.await
.expect("Failed to put data");
store
.put(1, FixedBytes::new([43u8; 32]))
.await
.expect("Failed to put data");
store.sync().await.expect("Failed to sync store");
}
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
blob.write_at(36, vec![0xFF; 32]).await.unwrap();
blob.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert_eq!(
store.get(0).await.unwrap().unwrap(),
FixedBytes::new([42u8; 32])
);
assert!(!store.has(1));
assert!(store.get(1).await.unwrap().is_none());
let mut store_mut = store;
store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
assert_eq!(
store_mut.get(1).await.unwrap().unwrap(),
FixedBytes::new([44u8; 32])
);
}
});
}
#[test_traced]
fn test_corrupted_value() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([42u8; 32]))
.await
.expect("Failed to put data");
store
.put(1, FixedBytes::new([43u8; 32]))
.await
.expect("Failed to put data");
store.sync().await.expect("Failed to sync store");
}
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
blob.write_at(10, hex!("0xFFFFFFFF").to_vec())
.await
.unwrap();
blob.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(!store.has(0));
assert!(store.has(1));
assert_eq!(
store.get(1).await.unwrap().unwrap(),
FixedBytes::new([43u8; 32])
);
}
});
}
#[test_traced]
fn test_crc_corruptions() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
store.sync().await.expect("Failed to sync store");
}
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
blob.write_at(32, vec![0xFF]).await.unwrap(); blob.sync().await.unwrap();
let (blob, _) = context
.open("test-ordinal", &1u64.to_be_bytes())
.await
.unwrap();
blob.write_at(5, vec![0xFF; 4]).await.unwrap(); blob.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(!store.has(0)); assert!(!store.has(10));
assert!(store.has(5));
assert!(store.has(15));
assert_eq!(
store.get(5).await.unwrap().unwrap(),
FixedBytes::new([5u8; 32])
);
assert_eq!(
store.get(15).await.unwrap().unwrap(),
FixedBytes::new([15u8; 32])
);
}
});
}
#[test_traced]
fn test_extra_bytes_in_blob() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store
.put(0, FixedBytes::new([42u8; 32]))
.await
.expect("Failed to put data");
store
.put(1, FixedBytes::new([43u8; 32]))
.await
.expect("Failed to put data");
store.sync().await.expect("Failed to sync store");
}
{
let (blob, size) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
garbage.extend_from_slice(&invalid_crc.to_be_bytes());
assert_eq!(garbage.len(), 36); blob.write_at(size, garbage).await.unwrap();
blob.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(store.has(0));
assert!(store.has(1));
assert_eq!(
store.get(0).await.unwrap().unwrap(),
FixedBytes::new([42u8; 32])
);
assert_eq!(
store.get(1).await.unwrap().unwrap(),
FixedBytes::new([43u8; 32])
);
let mut store_mut = store;
store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
assert_eq!(
store_mut.get(2).await.unwrap().unwrap(),
FixedBytes::new([44u8; 32])
);
}
});
}
#[test_traced]
fn test_zero_filled_records() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
let zeros = vec![0u8; 36 * 5]; blob.write_at(0, zeros).await.unwrap();
let mut valid_record = vec![44u8; 32];
let crc = Crc32::checksum(&valid_record);
valid_record.extend_from_slice(&crc.to_be_bytes());
blob.write_at(36 * 5, valid_record).await.unwrap();
blob.sync().await.unwrap();
}
{
let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
for i in 0..5 {
assert!(!store.has(i));
}
assert!(store.has(5));
assert_eq!(
store.get(5).await.unwrap().unwrap(),
FixedBytes::new([44u8; 32])
);
}
});
}
fn test_operations_and_restart(num_values: usize) -> String {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store =
Ordinal::<_, FixedBytes<128>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
let mut values = Vec::new();
let mut rng_index = 0u64;
for _ in 0..num_values {
let mut index_bytes = [0u8; 8];
context.fill_bytes(&mut index_bytes);
let index_offset = u64::from_be_bytes(index_bytes) % 1000;
let index = rng_index + index_offset;
rng_index = index + 1;
let mut value = [0u8; 128];
context.fill_bytes(&mut value);
let value = FixedBytes::<128>::new(value);
store
.put(index, value.clone())
.await
.expect("Failed to put data");
values.push((index, value));
}
store.sync().await.expect("Failed to sync");
for (index, value) in &values {
let retrieved = store
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
for i in 0..10 {
let _ = store.next_gap(i * 100);
}
store.sync().await.expect("Failed to sync store");
drop(store);
let mut store = Ordinal::<_, FixedBytes<128>>::init(context.with_label("second"), cfg)
.await
.expect("Failed to initialize store");
for (index, value) in &values {
let retrieved = store
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, value);
}
for _ in 0..10 {
let mut index_bytes = [0u8; 8];
context.fill_bytes(&mut index_bytes);
let index = u64::from_be_bytes(index_bytes) % 10000;
let mut value = [0u8; 128];
context.fill_bytes(&mut value);
let value = FixedBytes::<128>::new(value);
store.put(index, value).await.expect("Failed to put data");
}
store.sync().await.expect("Failed to sync");
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
let state1 = test_operations_and_restart(100);
let state2 = test_operations_and_restart(100);
assert_eq!(state1, state2);
}
#[test_traced]
fn test_prune_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let values = vec![
(0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
for (index, value) in &values {
store
.put(*index, value.clone())
.await
.expect("Failed to put data");
}
store.sync().await.unwrap();
for (index, value) in &values {
assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
}
store.prune(150).await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("pruned_total 1"));
assert!(!store.has(0));
assert!(!store.has(50));
assert!(store.get(0).await.unwrap().is_none());
assert!(store.get(50).await.unwrap().is_none());
assert!(store.has(100));
assert!(store.has(150));
assert!(store.has(200));
assert!(store.has(300));
assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
store.prune(250).await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("pruned_total 2"));
assert!(!store.has(100));
assert!(!store.has(150));
assert!(store.get(100).await.unwrap().is_none());
assert!(store.get(150).await.unwrap().is_none());
assert!(store.has(200));
assert!(store.has(300));
assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
});
}
#[test_traced]
fn test_prune_with_gaps() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
store.sync().await.unwrap();
let (current_end, next_start) = store.next_gap(0);
assert!(current_end.is_none());
assert_eq!(next_start, Some(5));
let (current_end, next_start) = store.next_gap(5);
assert_eq!(current_end, Some(5));
assert_eq!(next_start, Some(105));
store.prune(150).await.unwrap();
assert!(!store.has(5));
assert!(store.get(5).await.unwrap().is_none());
assert!(store.has(105));
assert!(store.has(305));
let (current_end, next_start) = store.next_gap(0);
assert!(current_end.is_none());
assert_eq!(next_start, Some(105));
let (current_end, next_start) = store.next_gap(105);
assert_eq!(current_end, Some(105));
assert_eq!(next_start, Some(305));
});
}
#[test_traced]
fn test_prune_no_op() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
store.sync().await.unwrap();
store.prune(50).await.unwrap();
assert!(store.has(100));
assert!(store.has(200));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 0"));
store.prune(100).await.unwrap();
assert!(store.has(100));
assert!(store.has(200));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 0"));
});
}
#[test_traced]
fn test_prune_empty_store() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.prune(1000).await.unwrap();
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
assert!(store.has(0));
});
}
#[test_traced]
fn test_prune_after_restart() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
store.sync().await.unwrap();
}
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(store.has(0));
assert!(store.has(100));
assert!(store.has(200));
store.prune(150).await.unwrap();
assert!(!store.has(0));
assert!(store.has(100));
assert!(store.has(200));
store.sync().await.unwrap();
}
{
let store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("third"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(!store.has(0));
assert!(store.has(100));
assert!(store.has(200));
let (current_end, next_start) = store.next_gap(0);
assert!(current_end.is_none());
assert_eq!(next_start, Some(100));
}
});
}
#[test_traced]
fn test_prune_multiple_operations() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
let mut values = Vec::new();
for i in 0..10 {
let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
store.put(index, value.clone()).await.unwrap();
values.push((index, value));
}
store.sync().await.unwrap();
for i in 1..5 {
let prune_index = i * 50 + 10;
store.prune(prune_index).await.unwrap();
for (index, _) in &values {
if *index < prune_index {
assert!(!store.has(*index), "Index {index} should be pruned");
} else {
assert!(store.has(*index), "Index {index} should not be pruned");
}
}
}
let buffer = context.encode();
assert!(buffer.contains("pruned_total 4"));
for i in 4..10 {
let index = i * 50 + 25;
assert!(store.has(index));
assert_eq!(
store.get(index).await.unwrap().unwrap(),
values[i as usize].1
);
}
});
}
#[test_traced]
fn test_prune_blob_boundaries() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
store.prune(100).await.unwrap();
assert!(!store.has(0));
assert!(!store.has(99));
assert!(store.has(100));
assert!(store.has(199));
assert!(store.has(200));
store.prune(199).await.unwrap();
assert!(store.has(100));
assert!(store.has(199));
assert!(store.has(200));
store.prune(200).await.unwrap();
assert!(!store.has(100));
assert!(!store.has(199));
assert!(store.has(200));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 2"));
});
}
#[test_traced]
fn test_prune_non_contiguous_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
assert!(store.has(0));
assert!(store.has(250));
assert!(store.has(500));
assert!(store.has(750));
store.prune(300).await.unwrap();
assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 2"));
store.prune(600).await.unwrap();
assert!(!store.has(500)); assert!(store.has(750));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 3"));
store.prune(1000).await.unwrap();
assert!(!store.has(750));
let buffer = context.encode();
assert!(buffer.contains("pruned_total 4"));
});
}
#[test_traced]
fn test_prune_removes_correct_pending() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(100),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.sync().await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap();
assert!(store.has(5));
assert!(store.has(10));
assert!(store.has(110));
store.prune(150).await.unwrap();
assert!(!store.has(5));
assert!(!store.has(10));
assert!(store.has(110));
assert_eq!(
store.get(110).await.unwrap().unwrap(),
FixedBytes::new([110u8; 32])
);
store.sync().await.unwrap();
assert!(store.has(110));
assert_eq!(
store.get(110).await.unwrap().unwrap(),
FixedBytes::new([110u8; 32])
);
});
}
#[test_traced]
fn test_init_with_bits_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
store.sync().await.unwrap();
}
{
let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
None,
)
.await
.expect("Failed to initialize store with bits");
assert!(store.has(0));
assert!(store.has(5));
assert!(store.has(9));
assert!(store.has(10));
assert!(store.has(15));
assert!(store.has(25));
assert!(!store.has(1));
assert!(!store.has(11));
assert!(!store.has(20));
assert_eq!(
store.get(0).await.unwrap().unwrap(),
FixedBytes::new([0u8; 32])
);
assert_eq!(
store.get(15).await.unwrap().unwrap(),
FixedBytes::new([15u8; 32])
);
}
});
}
#[test_traced]
fn test_init_with_bits_empty_hashmap() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(10),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
store.sync().await.unwrap();
}
{
let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits),
)
.await
.expect("Failed to initialize store with bits");
assert!(!store.has(0));
assert!(!store.has(10));
assert!(!store.has(20));
}
});
}
#[test_traced]
fn test_init_with_bits_selective_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(10),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
for i in 0..10 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
for i in 10..20 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
for i in 20..30 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
store.sync().await.unwrap();
}
{
let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let mut bitmap = BitMap::zeroes(10);
bitmap.set(2, true); bitmap.set(5, true); bitmap.set(8, true); let bitmap_option = Some(bitmap);
bits_map.insert(1, &bitmap_option);
let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits_map),
)
.await
.expect("Failed to initialize store with bits");
assert!(store.has(12));
assert!(store.has(15));
assert!(store.has(18));
assert!(!store.has(10));
assert!(!store.has(11));
assert!(!store.has(13));
assert!(!store.has(14));
assert!(!store.has(16));
assert!(!store.has(17));
assert!(!store.has(19));
for i in 0..10 {
assert!(!store.has(i));
}
for i in 20..30 {
assert!(!store.has(i));
}
assert_eq!(
store.get(12).await.unwrap().unwrap(),
FixedBytes::new([12u8; 32])
);
assert_eq!(
store.get(15).await.unwrap().unwrap(),
FixedBytes::new([15u8; 32])
);
assert_eq!(
store.get(18).await.unwrap().unwrap(),
FixedBytes::new([18u8; 32])
);
}
});
}
#[test_traced]
fn test_init_with_bits_none_option_all_records_exist() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(5),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
for i in 5..10 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
store.sync().await.unwrap();
}
{
let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let none_option: Option<BitMap> = None;
bits_map.insert(1, &none_option);
let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits_map),
)
.await
.expect("Failed to initialize store with bits");
for i in 5..10 {
assert!(store.has(i));
assert_eq!(
store.get(i).await.unwrap().unwrap(),
FixedBytes::new([i as u8; 32])
);
}
}
});
}
#[test_traced]
#[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
fn test_init_with_bits_none_option_missing_record_panics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(5),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
store.sync().await.unwrap();
}
{
let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let none_option: Option<BitMap> = None;
bits_map.insert(1, &none_option);
let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits_map),
)
.await
.expect("Failed to initialize store with bits");
}
});
}
#[test_traced]
fn test_init_with_bits_mixed_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(5),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
for i in 0..5 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
for i in 10..15 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
store.sync().await.unwrap();
}
{
let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let none_option: Option<BitMap> = None;
bits_map.insert(0, &none_option);
let mut bitmap1 = BitMap::zeroes(5);
bitmap1.set(0, true); bitmap1.set(2, true); let bitmap1_option = Some(bitmap1);
bits_map.insert(1, &bitmap1_option);
let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits_map),
)
.await
.expect("Failed to initialize store with bits");
for i in 0..5 {
assert!(store.has(i));
assert_eq!(
store.get(i).await.unwrap().unwrap(),
FixedBytes::new([i as u8; 32])
);
}
assert!(store.has(5));
assert!(store.has(7));
assert!(!store.has(6));
assert!(!store.has(8));
assert!(!store.has(9));
for i in 10..15 {
assert!(!store.has(i));
}
}
});
}
#[test_traced]
#[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
fn test_init_with_bits_corrupted_records() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(5),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
for i in 0..5 {
store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
}
store.sync().await.unwrap();
}
{
let (blob, _) = context
.open("test-ordinal", &0u64.to_be_bytes())
.await
.unwrap();
let offset = 2 * 36 + 32; blob.write_at(offset, vec![0xFF]).await.unwrap();
blob.sync().await.unwrap();
}
{
let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
let mut bitmap = BitMap::zeroes(5);
bitmap.set(0, true); bitmap.set(2, true); bitmap.set(4, true); let bitmap_option = Some(bitmap);
bits_map.insert(0, &bitmap_option);
let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
context.with_label("second"),
cfg.clone(),
Some(bits_map),
)
.await
.expect("Failed to initialize store with bits");
}
});
}
#[derive(Debug, PartialEq, Eq)]
pub struct DummyValue {
pub value: u64,
}
impl Write for DummyValue {
fn write(&self, buf: &mut impl BufMut) {
self.value.write(buf);
}
}
impl Read for DummyValue {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let value = u64::read(buf)?;
if value == 0 {
return Err(commonware_codec::Error::Invalid(
"DummyValue",
"value must be non-zero",
));
}
Ok(Self { value })
}
}
impl FixedSize for DummyValue {
const SIZE: usize = u64::SIZE;
}
#[test_traced]
fn test_init_skip_unparseable_record() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-ordinal".into(),
items_per_blob: NZU64!(1),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
};
{
let mut store =
Ordinal::<_, DummyValue>::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize store");
store.put(1, DummyValue { value: 1 }).await.unwrap();
store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
store.sync().await.unwrap();
}
{
let store =
Ordinal::<_, DummyValue>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize store");
assert!(store.has(1), "Record 1 should be available");
assert_eq!(
store.get(1).await.unwrap().unwrap(),
DummyValue { value: 1 },
"Record 0 should have correct value"
);
assert!(
!store.has(2),
"Record 2 should not be available (unparseable)"
);
assert!(
store.has(4),
"Record 4 should be available - we should not exit early on unparseable record"
);
assert_eq!(
store.get(4).await.unwrap().unwrap(),
DummyValue { value: 4 },
"Record 4 should have correct value"
);
}
});
}
}