use crate::translator::Translator;
use commonware_runtime::buffer::paged::CacheRef;
use std::num::{NonZeroU64, NonZeroUsize};
mod storage;
pub use storage::Archive;
#[derive(Clone)]
pub struct Config<T: Translator, C> {
pub translator: T,
pub key_partition: String,
pub key_page_cache: CacheRef,
pub value_partition: String,
pub compression: Option<u8>,
pub codec_config: C,
pub items_per_section: NonZeroU64,
pub key_write_buffer: NonZeroUsize,
pub value_write_buffer: NonZeroUsize,
pub replay_buffer: NonZeroUsize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
archive::{Archive as _, Error, Identifier, MultiArchive as _},
journal::Error as JournalError,
translator::{FourCap, TwoCap},
};
use commonware_codec::{DecodeExt, Error as CodecError};
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Metrics, Runner};
use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
use rand::Rng;
use std::{collections::BTreeMap, num::NonZeroU16};
fn test_key(key: &str) -> FixedBytes<64> {
let mut buf = [0u8; 64];
let key = key.as_bytes();
assert!(key.len() <= buf.len());
buf[..key.len()].copy_from_slice(key);
FixedBytes::decode(buf.as_ref()).unwrap()
}
const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
const DEFAULT_WRITE_BUFFER: usize = 1024;
const DEFAULT_REPLAY_BUFFER: usize = 4096;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
#[test_traced]
fn test_archive_compression_then_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: Some(3),
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
};
let mut archive = Archive::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize archive");
let index = 1u64;
let key = test_key("testkey");
let data = 1;
archive
.put(index, key.clone(), data)
.await
.expect("Failed to put data");
archive.sync().await.expect("Failed to sync archive");
drop(archive);
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
};
let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
context.with_label("second"),
cfg.clone(),
)
.await
.unwrap();
let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
assert!(matches!(
result,
Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
_
))))
));
});
}
#[test_traced]
fn test_archive_overlapping_key_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize archive");
let index1 = 1u64;
let key1 = test_key("keys1");
let data1 = 1;
let index2 = 2u64;
let key2 = test_key("keys2");
let data2 = 2;
archive
.put(index1, key1.clone(), data1)
.await
.expect("Failed to put data");
archive
.put(index2, key2.clone(), data2)
.await
.expect("Failed to put data");
let retrieved = archive
.get(Identifier::Key(&key1))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data1);
let retrieved = archive
.get(Identifier::Key(&key2))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data2);
let buffer = context.encode();
assert!(buffer.contains("items_tracked 2"));
assert!(buffer.contains("unnecessary_reads_total 1"));
assert!(buffer.contains("gets_total 2"));
});
}
#[test_traced]
fn test_archive_overlapping_key_multiple_sections() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize archive");
let index1 = 1u64;
let key1 = test_key("keys1");
let data1 = 1;
let index2 = 2_000_000u64;
let key2 = test_key("keys2");
let data2 = 2;
archive
.put(index1, key1.clone(), data1)
.await
.expect("Failed to put data");
archive
.put(index2, key2.clone(), data2)
.await
.expect("Failed to put data");
let retrieved = archive
.get(Identifier::Key(&key1))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data1);
let retrieved = archive
.get(Identifier::Key(&key2))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data2);
});
}
#[test_traced]
fn test_archive_prune_keys() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(1), };
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize archive");
let keys = vec![
(1u64, test_key("key1-blah"), 1),
(2u64, test_key("key2-blah"), 2),
(3u64, test_key("key3-blah"), 3),
(4u64, test_key("key3-bleh"), 3),
(5u64, test_key("key4-blah"), 4),
];
for (index, key, data) in &keys {
archive
.put(*index, key.clone(), *data)
.await
.expect("Failed to put data");
}
let buffer = context.encode();
assert!(buffer.contains("items_tracked 5"));
archive.prune(3).await.expect("Failed to prune");
for (index, key, data) in keys {
let retrieved = archive
.get(Identifier::Key(&key))
.await
.expect("Failed to get data");
if index < 3 {
assert!(retrieved.is_none());
} else {
assert_eq!(retrieved.expect("Data not found"), data);
}
}
let buffer = context.encode();
assert!(buffer.contains("items_tracked 3"));
assert!(buffer.contains("indices_pruned_total 2"));
assert!(buffer.contains("pruned_total 0"));
archive.prune(2).await.expect("Failed to prune");
archive.prune(3).await.expect("Failed to prune");
let result = archive.put(1, test_key("key1-blah"), 1).await;
assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
archive
.put(6, test_key("key2-blfh"), 5)
.await
.expect("Failed to put data");
let buffer = context.encode();
assert!(buffer.contains("items_tracked 4")); assert!(buffer.contains("indices_pruned_total 2"));
assert!(buffer.contains("pruned_total 1"));
});
}
fn test_archive_keys_and_restart(num_keys: usize) -> String {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let items_per_section = 256u64;
let cfg = Config {
translator: TwoCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(items_per_section),
};
let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
.await
.expect("Failed to initialize archive");
let mut keys = BTreeMap::new();
while keys.len() < num_keys {
let index = keys.len() as u64;
let mut key = [0u8; 64];
context.fill(&mut key);
let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
let mut data = [0u8; 1024];
context.fill(&mut data);
let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
archive
.put(index, key.clone(), data.clone())
.await
.expect("Failed to put data");
keys.insert(key, (index, data));
}
for (key, (index, data)) in &keys {
let retrieved = archive
.get(Identifier::Index(*index))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, data);
let retrieved = archive
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, data);
}
let buffer = context.encode();
let tracked = format!("items_tracked {num_keys:?}");
assert!(buffer.contains(&tracked));
assert!(buffer.contains("pruned_total 0"));
archive.sync().await.expect("Failed to sync archive");
drop(archive);
let cfg = Config {
translator: TwoCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(items_per_section),
};
let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
context.with_label("init2"),
cfg.clone(),
)
.await
.expect("Failed to initialize archive");
for (key, (index, data)) in &keys {
let retrieved = archive
.get(Identifier::Index(*index))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, data);
let retrieved = archive
.get(Identifier::Key(key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, data);
}
let min = (keys.len() / 2) as u64;
archive.prune(min).await.expect("Failed to prune");
let min = (min / items_per_section) * items_per_section;
let mut removed = 0;
for (key, (index, data)) in keys {
if index >= min {
let retrieved = archive
.get(Identifier::Key(&key))
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data);
let (current_end, start_next) = archive.next_gap(index);
assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
assert!(start_next.is_none());
} else {
let retrieved = archive
.get(Identifier::Key(&key))
.await
.expect("Failed to get data");
assert!(retrieved.is_none());
removed += 1;
let (current_end, start_next) = archive.next_gap(index);
assert!(current_end.is_none());
assert_eq!(start_next.unwrap(), min);
}
}
let buffer = context.encode();
let tracked = format!("items_tracked {:?}", num_keys - removed);
assert!(buffer.contains(&tracked));
let pruned = format!("indices_pruned_total {removed}");
assert!(buffer.contains(&pruned));
assert!(buffer.contains("pruned_total 0"));
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_archive_many_keys_and_restart() {
test_archive_keys_and_restart(100_000);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
let state1 = test_archive_keys_and_restart(5_000);
let state2 = test_archive_keys_and_restart(5_000);
assert_eq!(state1, state2);
}
#[test_traced]
fn test_get_all_after_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(1),
};
let mut archive = Archive::init(context.clone(), cfg)
.await
.expect("Failed to initialize archive");
archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
archive.prune(3).await.unwrap();
let all = archive.get_all(1).await.unwrap();
assert_eq!(all, None);
let all = archive.get_all(3).await.unwrap();
assert_eq!(all, Some(vec![30]));
});
}
#[test_traced]
fn test_put_multi_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
translator: FourCap,
key_partition: "test-index".into(),
key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
value_partition: "test-value".into(),
codec_config: (),
compression: None,
key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_section: NZU64!(1),
};
let mut archive = Archive::init(context.clone(), cfg)
.await
.expect("Failed to initialize archive");
archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
let buffer = context.encode();
assert!(buffer.contains("items_tracked 2"));
archive.prune(3).await.unwrap();
assert_eq!(
archive
.get(Identifier::Key(&test_key("aaa")))
.await
.unwrap(),
None
);
assert_eq!(
archive
.get(Identifier::Key(&test_key("bbb")))
.await
.unwrap(),
None
);
assert_eq!(
archive
.get(Identifier::Key(&test_key("ccc")))
.await
.unwrap(),
Some(30)
);
let buffer = context.encode();
assert!(buffer.contains("items_tracked 1"));
assert!(buffer.contains("indices_pruned_total 1"));
let result = archive.put_multi(2, test_key("ddd"), 40).await;
assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
});
}
}