use crate::backend::StorageBackend;
use crate::error::StorageError;
pub type ListByPrefixIter<'a> =
Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a>;
pub trait BaseStorageTier: Send + Sync {
fn name(&self) -> &str;
fn debounce_ms(&self) -> Option<u32> {
None
}
fn compact_every(&self) -> Option<u32> {
None
}
fn flush(&self) -> Result<(), StorageError>;
fn rollback(&self) -> Result<(), StorageError>;
fn list_by_prefix_bytes<'a>(&'a self, prefix: &str) -> ListByPrefixIter<'a>;
fn compact(&self) -> Result<(), StorageError> {
self.flush()
}
}
pub trait SnapshotStorageTier<T>: BaseStorageTier
where
T: Send + Sync + 'static,
{
fn save(&self, snapshot: T) -> Result<(), StorageError>;
fn load(&self) -> Result<Option<T>, StorageError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AppendLogMode {
#[default]
Append,
Overwrite,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct AppendCursor {
pub position: u64,
pub tag: Option<u64>,
}
impl AppendCursor {
#[must_use]
pub const fn from_position(position: u64) -> Self {
Self {
position,
tag: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LoadEntriesOpts<'a> {
pub cursor: Option<AppendCursor>,
pub page_size: Option<u32>,
pub key_filter: Option<&'a str>,
}
#[must_use = "AppendLoadResult.cursor must be threaded into the next load_entries call; ignoring it voids the pagination contract"]
#[derive(Debug, Clone)]
pub struct AppendLoadResult<T> {
pub entries: Vec<T>,
pub cursor: Option<AppendCursor>,
}
pub trait AppendLogStorageTier<T>: BaseStorageTier
where
T: Send + Sync + 'static,
{
fn append_entries(&self, entries: &[T]) -> Result<(), StorageError>;
fn mode(&self) -> AppendLogMode {
AppendLogMode::Append
}
fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError>;
fn load_entries_all(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError> {
Ok(self
.load_entries(LoadEntriesOpts {
cursor: None,
page_size: None,
key_filter,
})?
.entries)
}
}
pub trait KvStorageTier<T>: BaseStorageTier
where
T: Send + Sync + 'static,
{
fn save(&self, key: &str, value: T) -> Result<(), StorageError>;
fn load(&self, key: &str) -> Result<Option<T>, StorageError>;
fn delete(&self, key: &str) -> Result<(), StorageError>;
fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError>;
}
pub(crate) struct PrefixIter<'a, B: StorageBackend + ?Sized> {
backend: &'a B,
keys: std::vec::IntoIter<String>,
pending_error: Option<StorageError>,
}
impl<'a, B: StorageBackend + ?Sized> PrefixIter<'a, B> {
pub(crate) fn new(backend: &'a B, prefix: &str) -> Self {
match backend.list(prefix) {
Ok(mut keys) => {
keys.retain(|k| k.starts_with(prefix));
keys.sort();
Self {
backend,
keys: keys.into_iter(),
pending_error: None,
}
}
Err(e) => Self {
backend,
keys: Vec::new().into_iter(),
pending_error: Some(e),
},
}
}
}
impl<B: StorageBackend + ?Sized> Iterator for PrefixIter<'_, B> {
type Item = Result<(String, Vec<u8>), StorageError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_error.take() {
return Some(Err(e));
}
loop {
let key = self.keys.next()?;
match self.backend.read(&key) {
Ok(Some(bytes)) if !bytes.is_empty() => return Some(Ok((key, bytes))),
Ok(_) => {}
Err(e) => return Some(Err(e)),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::MemoryBackend;
#[test]
fn tier_traits_are_dyn_safe() {
fn assert_dyn<T: ?Sized>() {}
assert_dyn::<dyn BaseStorageTier>();
assert_dyn::<dyn SnapshotStorageTier<u64>>();
assert_dyn::<dyn AppendLogStorageTier<u64>>();
assert_dyn::<dyn KvStorageTier<u64>>();
}
#[test]
fn prefix_iter_yields_lex_asc() {
let b = MemoryBackend::new();
b.write("g/02", b"two").unwrap();
b.write("g/01", b"one").unwrap();
b.write("g/10", b"ten").unwrap();
b.write("other", b"x").unwrap();
let iter = PrefixIter::new(&b, "g/");
let collected: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(
collected,
vec![
("g/01".to_string(), b"one".to_vec()),
("g/02".to_string(), b"two".to_vec()),
("g/10".to_string(), b"ten".to_vec()),
],
);
}
#[test]
fn prefix_iter_surfaces_backend_no_list_support_lazily() {
struct NoList;
impl StorageBackend for NoList {
fn name(&self) -> &'static str {
"no-list"
}
fn read(&self, _key: &str) -> Result<Option<Vec<u8>>, StorageError> {
Ok(None)
}
fn write(&self, _k: &str, _b: &[u8]) -> Result<(), StorageError> {
Ok(())
}
}
let b = NoList;
let mut iter = PrefixIter::new(&b, "g/");
let first = iter.next();
assert!(matches!(
first,
Some(Err(StorageError::BackendNoListSupport { .. }))
));
assert!(iter.next().is_none());
}
}