use commonware_runtime::buffer::paged::CacheRef;
use std::num::{NonZeroU64, NonZeroUsize};
use thiserror::Error;
mod storage;
pub use storage::Cache;
#[derive(Debug, Error)]
pub enum Error {
#[error("journal error: {0}")]
Journal(#[from] crate::journal::Error),
#[error("record corrupted")]
RecordCorrupted,
#[error("already pruned to: {0}")]
AlreadyPrunedTo(u64),
#[error("record too large")]
RecordTooLarge,
}
#[derive(Clone)]
pub struct Config<C> {
pub partition: String,
pub compression: Option<u8>,
pub codec_config: C,
pub items_per_blob: NonZeroU64,
pub write_buffer: NonZeroUsize,
pub replay_buffer: NonZeroUsize,
pub page_cache: CacheRef,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::journal::Error as JournalError;
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Metrics, Runner};
use commonware_utils::{NZUsize, NZU16, NZU64};
use rand::Rng;
use std::{collections::BTreeMap, num::NonZeroU16};
const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
const DEFAULT_WRITE_BUFFER: usize = 1024;
const DEFAULT_REPLAY_BUFFER: usize = 4096;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
#[test_traced]
fn test_cache_compression_then_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: Some(3),
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize cache");
let index = 1u64;
let data = 1;
cache.put(index, data).await.expect("Failed to put data");
cache.sync().await.expect("Failed to sync cache");
drop(cache);
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let result = Cache::<_, i32>::init(context.with_label("second"), cfg.clone()).await;
assert!(matches!(
result,
Err(Error::Journal(JournalError::Codec(_)))
));
});
}
#[test_traced]
fn test_cache_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(1), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
for (index, data) in &items {
cache.put(*index, *data).await.expect("Failed to put data");
}
assert_eq!(cache.first(), Some(1));
let buffer = context.encode();
assert!(buffer.contains("items_tracked 5"));
cache.prune(3).await.expect("Failed to prune");
for (index, data) in items {
let retrieved = cache.get(index).await.expect("Failed to get data");
if index < 3 {
assert!(retrieved.is_none());
} else {
assert_eq!(retrieved.expect("Data not found"), data);
}
}
assert_eq!(cache.first(), Some(3));
let buffer = context.encode();
assert!(buffer.contains("items_tracked 3"));
cache.prune(2).await.expect("Failed to prune");
assert_eq!(cache.first(), Some(3));
cache.prune(3).await.expect("Failed to prune");
assert_eq!(cache.first(), Some(3));
let result = cache.put(1, 1).await;
assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
});
}
fn test_cache_restart(num_items: usize) -> String {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let items_per_blob = 256u64;
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(items_per_blob),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.with_label("init1"), cfg.clone())
.await
.expect("Failed to initialize cache");
let mut items = BTreeMap::new();
while items.len() < num_items {
let index = items.len() as u64;
let mut data = [0u8; 1024];
context.fill(&mut data);
items.insert(index, data);
cache.put(index, data).await.expect("Failed to put data");
}
for (index, data) in &items {
let retrieved = cache
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *data);
}
let buffer = context.encode();
let tracked = format!("items_tracked {num_items:?}");
assert!(buffer.contains(&tracked));
cache.sync().await.expect("Failed to sync cache");
drop(cache);
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(items_per_blob),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::<_, [u8; 1024]>::init(context.with_label("init2"), cfg.clone())
.await
.expect("Failed to initialize cache");
for (index, data) in &items {
let retrieved = cache
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(&retrieved, data);
}
let min = (items.len() / 2) as u64;
cache.prune(min).await.expect("Failed to prune");
let min = (min / items_per_blob) * items_per_blob;
let mut removed = 0;
for (index, data) in items {
if index >= min {
let retrieved = cache
.get(index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, data);
} else {
let retrieved = cache.get(index).await.expect("Failed to get data");
assert!(retrieved.is_none());
removed += 1;
}
}
let buffer = context.encode();
let tracked = format!("items_tracked {:?}", num_items - removed);
assert!(buffer.contains(&tracked));
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_cache_many_items_and_restart() {
test_cache_restart(100_000);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
let state1 = test_cache_restart(5_000);
let state2 = test_cache_restart(5_000);
assert_eq!(state1, state2);
}
#[test_traced]
fn test_cache_next_gap() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
assert_eq!(cache.first(), None);
cache.put(1, 1).await.unwrap();
cache.put(10, 10).await.unwrap();
cache.put(11, 11).await.unwrap();
cache.put(14, 14).await.unwrap();
let (current_end, start_next) = cache.next_gap(0);
assert!(current_end.is_none());
assert_eq!(start_next, Some(1));
assert_eq!(cache.first(), Some(1));
let (current_end, start_next) = cache.next_gap(1);
assert_eq!(current_end, Some(1));
assert_eq!(start_next, Some(10));
let (current_end, start_next) = cache.next_gap(10);
assert_eq!(current_end, Some(11));
assert_eq!(start_next, Some(14));
let (current_end, start_next) = cache.next_gap(11);
assert_eq!(current_end, Some(11));
assert_eq!(start_next, Some(14));
let (current_end, start_next) = cache.next_gap(12);
assert!(current_end.is_none());
assert_eq!(start_next, Some(14));
let (current_end, start_next) = cache.next_gap(14);
assert_eq!(current_end, Some(14));
assert!(start_next.is_none());
});
}
#[test_traced]
fn test_cache_missing_items() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
assert_eq!(cache.first(), None);
assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
cache.put(1, 1).await.unwrap();
cache.put(2, 2).await.unwrap();
cache.put(5, 5).await.unwrap();
cache.put(6, 6).await.unwrap();
cache.put(10, 10).await.unwrap();
assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
cache.put(1000, 1000).await.unwrap();
let items = cache.missing_items(11, 10);
assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
let items = cache.missing_items(990, 15);
assert_eq!(
items,
vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
);
cache.sync().await.unwrap();
assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
assert_eq!(
items,
vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
);
});
}
#[test_traced]
fn test_cache_intervals_after_restart() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
{
let mut cache = Cache::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to initialize cache");
cache.put(0, 0).await.expect("Failed to put data");
cache.put(100, 100).await.expect("Failed to put data");
cache.put(1000, 1000).await.expect("Failed to put data");
cache.sync().await.expect("Failed to sync cache");
}
{
let cache = Cache::<_, i32>::init(context.with_label("second"), cfg.clone())
.await
.expect("Failed to initialize cache");
let (current_end, start_next) = cache.next_gap(0);
assert_eq!(current_end, Some(0));
assert_eq!(start_next, Some(100));
let (current_end, start_next) = cache.next_gap(100);
assert_eq!(current_end, Some(100));
assert_eq!(start_next, Some(1000));
let items = cache.missing_items(1, 5);
assert_eq!(items, vec![1, 2, 3, 4, 5]);
}
});
}
#[test_traced]
fn test_cache_intervals_with_pruning() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(100), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
cache.put(50, 50).await.unwrap();
cache.put(150, 150).await.unwrap();
cache.put(250, 250).await.unwrap();
cache.put(350, 350).await.unwrap();
let (current_end, start_next) = cache.next_gap(0);
assert!(current_end.is_none());
assert_eq!(start_next, Some(50));
cache.prune(200).await.expect("Failed to prune");
assert!(!cache.has(50));
assert!(!cache.has(150));
let (current_end, start_next) = cache.next_gap(200);
assert!(current_end.is_none());
assert_eq!(start_next, Some(250));
let items = cache.missing_items(200, 5);
assert_eq!(items, vec![200, 201, 202, 203, 204]);
assert!(cache.has(250));
assert!(cache.has(350));
assert_eq!(cache.get(250).await.unwrap(), Some(250));
assert_eq!(cache.get(350).await.unwrap(), Some(350));
});
}
#[test_traced]
fn test_cache_sparse_indices() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(100), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
let indices = vec![
(0u64, 0),
(99u64, 99), (100u64, 100), (500u64, 500), ];
for (index, value) in &indices {
cache.put(*index, *value).await.expect("Failed to put data");
}
assert!(!cache.has(1));
assert!(!cache.has(50));
assert!(!cache.has(101));
assert!(!cache.has(499));
let (current_end, start_next) = cache.next_gap(50);
assert!(current_end.is_none());
assert_eq!(start_next, Some(99));
let (current_end, start_next) = cache.next_gap(99);
assert_eq!(current_end, Some(100));
assert_eq!(start_next, Some(500));
cache.sync().await.expect("Failed to sync");
for (index, value) in &indices {
let retrieved = cache
.get(*index)
.await
.expect("Failed to get data")
.expect("Data not found");
assert_eq!(retrieved, *value);
}
});
}
#[test_traced]
fn test_cache_intervals_edge_cases() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
cache.put(42, 42).await.unwrap();
let (current_end, start_next) = cache.next_gap(42);
assert_eq!(current_end, Some(42));
assert!(start_next.is_none());
let (current_end, start_next) = cache.next_gap(41);
assert!(current_end.is_none());
assert_eq!(start_next, Some(42));
let (current_end, start_next) = cache.next_gap(43);
assert!(current_end.is_none());
assert!(start_next.is_none());
cache.put(43, 43).await.unwrap();
cache.put(44, 44).await.unwrap();
let (current_end, start_next) = cache.next_gap(42);
assert_eq!(current_end, Some(44));
assert!(start_next.is_none());
cache.put(u64::MAX - 1, 999).await.unwrap();
let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
assert!(current_end.is_none());
assert_eq!(start_next, Some(u64::MAX - 1));
let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
assert_eq!(current_end, Some(u64::MAX - 1));
assert!(start_next.is_none());
});
}
#[test_traced]
fn test_cache_intervals_duplicate_inserts() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
codec_config: (),
compression: None,
write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut cache = Cache::init(context.clone(), cfg.clone())
.await
.expect("Failed to initialize cache");
cache.put(10, 10).await.unwrap();
assert!(cache.has(10));
assert_eq!(cache.get(10).await.unwrap(), Some(10));
cache.put(10, 20).await.unwrap();
assert!(cache.has(10));
assert_eq!(cache.get(10).await.unwrap(), Some(10));
let (current_end, start_next) = cache.next_gap(10);
assert_eq!(current_end, Some(10));
assert!(start_next.is_none());
cache.put(9, 9).await.unwrap();
cache.put(11, 11).await.unwrap();
let (current_end, start_next) = cache.next_gap(9);
assert_eq!(current_end, Some(11));
assert!(start_next.is_none());
});
}
}