#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "serde-typed")]
pub mod typed;
#[cfg(feature = "serde-typed")]
pub use typed::{JsonCodec, TypedCodec, TypedKvError, TypedKvStore};
#[derive(Debug, Clone)]
pub enum StoreError {
Io(Arc<std::io::Error>),
Corruption(String),
NotFound,
AlreadyExists,
TxnConflict,
ReadOnly,
Timeout,
CapacityExceeded,
CasMismatch,
KeyNotFound,
Unsupported(String),
Other(String),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StoreError::Io(e) => write!(f, "I/O error: {e}"),
StoreError::Corruption(s) => write!(f, "corruption: {s}"),
StoreError::NotFound => write!(f, "not found"),
StoreError::AlreadyExists => write!(f, "already exists"),
StoreError::TxnConflict => write!(f, "transaction conflict"),
StoreError::ReadOnly => write!(f, "store is read-only"),
StoreError::Timeout => write!(f, "operation timed out"),
StoreError::CapacityExceeded => write!(f, "capacity exceeded"),
StoreError::CasMismatch => write!(f, "compare-and-swap mismatch"),
StoreError::KeyNotFound => write!(f, "key not found"),
StoreError::Unsupported(s) => write!(f, "unsupported: {s}"),
StoreError::Other(s) => write!(f, "error: {s}"),
}
}
}
impl std::error::Error for StoreError {}
impl From<std::io::Error> for StoreError {
fn from(e: std::io::Error) -> Self {
StoreError::Io(Arc::new(e))
}
}
impl From<String> for StoreError {
fn from(s: String) -> Self {
StoreError::Other(s)
}
}
pub fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
if prefix.is_empty() {
return None;
}
let mut upper = prefix.to_vec();
while let Some(&last) = upper.last() {
if last == 0xFF {
upper.pop();
} else {
if let Some(b) = upper.last_mut() {
*b += 1;
}
return Some(upper);
}
}
None
}
pub fn expiry_epoch_millis(ttl: Duration) -> Result<u64, StoreError> {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| StoreError::Other(e.to_string()))
.map(|d| d.checked_add(ttl).unwrap_or(d).as_millis() as u64)
}
#[must_use]
pub fn is_expired(expiry_millis: u64) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
expiry_millis <= now
}
pub trait KvStore: Send + Sync {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError>;
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError>;
fn delete(&self, key: &[u8]) -> Result<(), StoreError>;
fn get_many(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
keys.iter().map(|k| self.get(k)).collect()
}
fn get_ref<'a>(&'a self, key: &[u8]) -> Result<Option<std::borrow::Cow<'a, [u8]>>, StoreError> {
self.get(key).map(|opt| opt.map(std::borrow::Cow::Owned))
}
fn contains(&self, key: &[u8]) -> Result<bool, StoreError> {
Ok(self.get(key)?.is_some())
}
fn range<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError>;
fn range_rev<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError> {
let items: Vec<RangeItem> = self.range(lo, hi)?.collect();
Ok(Box::new(items.into_iter().rev()))
}
fn prefix_scan<'a>(&'a self, prefix: &[u8]) -> Result<RangeIter<'a>, StoreError> {
if prefix.is_empty() {
return self.iter();
}
match prefix_upper_bound(prefix) {
Some(hi) => self.range(prefix, &hi),
None => {
let prefix_owned = prefix.to_vec();
let items: Vec<RangeItem> = self
.iter()?
.filter(|r| {
r.as_ref()
.map(|(k, _)| k.starts_with(&prefix_owned))
.unwrap_or(true) })
.collect();
Ok(Box::new(items.into_iter()))
}
}
}
fn batch_write(&self, pairs: &[(&[u8], &[u8])]) -> Result<(), StoreError> {
let mut txn = self.transaction()?;
for &(k, v) in pairs {
txn.put(k, v)?;
}
txn.commit()
}
fn batch_delete(&self, keys: &[&[u8]]) -> Result<(), StoreError> {
let mut txn = self.transaction()?;
for &k in keys {
txn.delete(k)?;
}
txn.commit()
}
fn count(&self) -> Result<u64, StoreError> {
let mut n = 0u64;
for item in self.iter()? {
let _ = item?;
n += 1;
}
Ok(n)
}
fn size_on_disk(&self) -> Result<u64, StoreError> {
Ok(0)
}
fn iter<'a>(&'a self) -> Result<RangeIter<'a>, StoreError>;
fn keys<'a>(&'a self) -> Result<KeysIter<'a>, StoreError> {
let it = self.iter()?;
Ok(Box::new(it.map(|r| r.map(|(k, _v)| k))))
}
fn compare_and_swap(
&self,
key: &[u8],
expected: Option<&[u8]>,
new_value: &[u8],
) -> Result<bool, StoreError> {
let mut txn = self.transaction()?;
let current = txn.get(key)?;
let matches = match (current.as_deref(), expected) {
(None, None) => true,
(Some(cur), Some(exp)) => cur == exp,
_ => false,
};
if matches {
txn.put(key, new_value)?;
txn.commit()?;
Ok(true)
} else {
txn.rollback()?;
Ok(false)
}
}
fn put_with_ttl(&self, _key: &[u8], _value: &[u8], _ttl: Duration) -> Result<(), StoreError> {
Err(StoreError::Unsupported("TTL not supported".to_string()))
}
fn expire(&self, _key: &[u8], _ttl: Duration) -> Result<(), StoreError> {
Err(StoreError::Unsupported("TTL not supported".to_string()))
}
fn ttl(&self, _key: &[u8]) -> Result<Option<Duration>, StoreError> {
Err(StoreError::Unsupported("TTL not supported".to_string()))
}
fn persist(&self, _key: &[u8]) -> Result<bool, StoreError> {
Err(StoreError::Unsupported("TTL not supported".to_string()))
}
fn purge_expired(&self) -> Result<u64, StoreError> {
Ok(0)
}
fn compact(&self) -> Result<(), StoreError> {
Ok(())
}
fn backup(&self, _path: &Path) -> Result<(), StoreError> {
Err(StoreError::Other(
"backup not supported for this backend".to_string(),
))
}
fn restore(&self, _path: &Path) -> Result<(), StoreError> {
Err(StoreError::Other(
"restore not supported for this backend".to_string(),
))
}
fn transaction(&self) -> Result<Box<dyn KvTxn + '_>, StoreError>;
fn snapshot(&self) -> Result<Box<dyn KvSnapshot + '_>, StoreError>;
fn flush(&self) -> Result<(), StoreError>;
}
pub trait KvTxn {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError>;
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StoreError>;
fn delete(&mut self, key: &[u8]) -> Result<(), StoreError>;
fn contains(&self, key: &[u8]) -> Result<bool, StoreError> {
Ok(self.get(key)?.is_some())
}
fn range<'a>(&'a self, _lo: &[u8], _hi: &[u8]) -> Result<RangeIter<'a>, StoreError> {
Err(StoreError::Other(
"range not supported within this transaction type".to_string(),
))
}
fn commit(self: Box<Self>) -> Result<(), StoreError>;
fn rollback(self: Box<Self>) -> Result<(), StoreError>;
}
pub trait KvSnapshot {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError>;
fn range<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError>;
fn prefix_scan<'a>(&'a self, prefix: &[u8]) -> Result<RangeIter<'a>, StoreError> {
match prefix_upper_bound(prefix) {
Some(hi) => self.range(prefix, &hi),
None => {
self.range(&[], &[])
}
}
}
fn contains(&self, key: &[u8]) -> Result<bool, StoreError> {
Ok(self.get(key)?.is_some())
}
}
pub trait ColumnarStore: Send + Sync {}
pub trait BlobStore: Send + Sync {}
pub type BoxKvStore = Box<dyn KvStore>;
pub type RangeItem = Result<(Vec<u8>, Vec<u8>), StoreError>;
pub type RangeIter<'a> = Box<dyn Iterator<Item = RangeItem> + 'a>;
pub type KeysIter<'a> = Box<dyn Iterator<Item = Result<Vec<u8>, StoreError>> + 'a>;
#[derive(Debug, Clone)]
pub struct StoreConfig {
pub cache_size_bytes: Option<u64>,
pub sync_writes: bool,
pub read_only: bool,
}
impl Default for StoreConfig {
fn default() -> Self {
StoreConfig {
cache_size_bytes: None,
sync_writes: true,
read_only: false,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StoreMetrics {
pub reads: u64,
pub writes: u64,
pub deletes: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub cache_hits: u64,
pub cache_misses: u64,
}
impl StoreMetrics {
#[must_use]
pub fn cache_hit_rate(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total == 0 {
0.0
} else {
self.cache_hits as f64 / total as f64
}
}
}
pub fn ensure_parent_dir(path: &Path) -> Result<(), StoreError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn prefix_upper_bound_basic() {
assert_eq!(prefix_upper_bound(b"foo"), Some(b"fop".to_vec()));
}
#[test]
fn prefix_upper_bound_trailing_ff() {
assert_eq!(prefix_upper_bound(b"ab\xff"), Some(b"ac".to_vec()));
}
#[test]
fn prefix_upper_bound_all_ff() {
assert_eq!(prefix_upper_bound(b"\xff\xff"), None);
}
#[test]
fn prefix_upper_bound_empty() {
assert_eq!(prefix_upper_bound(b""), None);
}
#[test]
fn prefix_upper_bound_single_byte() {
assert_eq!(prefix_upper_bound(b"a"), Some(b"b".to_vec()));
}
#[test]
fn store_error_display() {
assert_eq!(format!("{}", StoreError::NotFound), "not found");
assert_eq!(format!("{}", StoreError::ReadOnly), "store is read-only");
assert_eq!(format!("{}", StoreError::Timeout), "operation timed out");
assert_eq!(
format!("{}", StoreError::CapacityExceeded),
"capacity exceeded"
);
assert_eq!(
format!("{}", StoreError::CasMismatch),
"compare-and-swap mismatch"
);
}
#[test]
fn store_error_from_string() {
let err: StoreError = "test error".to_string().into();
assert_eq!(format!("{err}"), "error: test error");
}
#[test]
fn store_config_default() {
let cfg = StoreConfig::default();
assert!(cfg.cache_size_bytes.is_none());
assert!(cfg.sync_writes);
assert!(!cfg.read_only);
}
#[test]
fn store_metrics_hit_rate() {
let m = StoreMetrics {
cache_hits: 80,
cache_misses: 20,
..StoreMetrics::default()
};
assert!((m.cache_hit_rate() - 0.8).abs() < f64::EPSILON);
}
#[test]
fn store_metrics_hit_rate_zero() {
let m = StoreMetrics::default();
assert!((m.cache_hit_rate()).abs() < f64::EPSILON);
}
#[test]
fn store_error_clone_io() {
let original = StoreError::from(std::io::Error::new(std::io::ErrorKind::NotFound, "test"));
let cloned = original.clone();
if let StoreError::Io(arc) = cloned {
assert_eq!(arc.kind(), std::io::ErrorKind::NotFound);
} else {
panic!("expected StoreError::Io after clone");
}
}
#[test]
fn store_error_clone_non_io_variants() {
let variants = [
StoreError::NotFound,
StoreError::AlreadyExists,
StoreError::TxnConflict,
StoreError::ReadOnly,
StoreError::Timeout,
StoreError::CapacityExceeded,
StoreError::CasMismatch,
StoreError::KeyNotFound,
StoreError::Corruption("bad".to_string()),
StoreError::Unsupported("nope".to_string()),
StoreError::Other("misc".to_string()),
];
for v in &variants {
let _ = v.clone(); }
}
struct MemKv(std::sync::Mutex<std::collections::BTreeMap<Vec<u8>, Vec<u8>>>);
impl MemKv {
fn new() -> Self {
MemKv(std::sync::Mutex::new(std::collections::BTreeMap::new()))
}
}
impl KvStore for MemKv {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
Ok(self.0.lock().unwrap().get(key).cloned())
}
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
self.0.lock().unwrap().insert(key.to_vec(), value.to_vec());
Ok(())
}
fn delete(&self, key: &[u8]) -> Result<(), StoreError> {
self.0.lock().unwrap().remove(key);
Ok(())
}
fn range<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError> {
use std::ops::Bound;
let map = self.0.lock().unwrap();
let pairs: Vec<RangeItem> = map
.range((Bound::Included(lo.to_vec()), Bound::Excluded(hi.to_vec())))
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect();
Ok(Box::new(pairs.into_iter()))
}
fn iter<'a>(&'a self) -> Result<RangeIter<'a>, StoreError> {
let map = self.0.lock().unwrap();
let pairs: Vec<RangeItem> = map
.iter()
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect();
Ok(Box::new(pairs.into_iter()))
}
fn transaction(&self) -> Result<Box<dyn KvTxn + '_>, StoreError> {
Err(StoreError::Unsupported("no txn in MemKv".to_string()))
}
fn snapshot(&self) -> Result<Box<dyn KvSnapshot + '_>, StoreError> {
Err(StoreError::Unsupported("no snapshot in MemKv".to_string()))
}
fn flush(&self) -> Result<(), StoreError> {
Ok(())
}
}
#[test]
fn range_rev_descending_order() {
let store = MemKv::new();
store.put(b"a", b"1").unwrap();
store.put(b"b", b"2").unwrap();
store.put(b"c", b"3").unwrap();
store.put(b"d", b"4").unwrap();
let items: Vec<(Vec<u8>, Vec<u8>)> = store
.range_rev(b"a", b"e")
.unwrap()
.map(|r| r.unwrap())
.collect();
let keys: Vec<&[u8]> = items.iter().map(|(k, _)| k.as_slice()).collect();
assert_eq!(keys, vec![b"d", b"c", b"b", b"a"]);
}
#[test]
fn range_rev_empty_range() {
let store = MemKv::new();
store.put(b"x", b"v").unwrap();
let items: Vec<_> = store.range_rev(b"z", b"z").unwrap().collect();
assert!(items.is_empty());
}
#[test]
fn ensure_parent_dir_empty_path() {
let result = ensure_parent_dir(std::path::Path::new("some_file.db"));
assert!(result.is_ok());
}
#[test]
fn ensure_parent_dir_nested() {
use std::process;
let tmp = std::env::temp_dir().join(format!("oxistore_ensure_parent_{}", process::id()));
let deep = tmp.join("a").join("b").join("file.db");
let result = ensure_parent_dir(&deep);
assert!(result.is_ok());
assert!(deep.parent().expect("has parent").exists());
let _ = std::fs::remove_dir_all(&tmp);
}
#[test]
fn ensure_parent_dir_already_exists() {
let tmp = std::env::temp_dir();
let path = tmp.join("existing_check.db");
let result = ensure_parent_dir(&path);
assert!(result.is_ok());
}
#[test]
fn store_error_from_io_error_variants() {
use std::io;
let kinds = [
io::ErrorKind::NotFound,
io::ErrorKind::PermissionDenied,
io::ErrorKind::AlreadyExists,
io::ErrorKind::WouldBlock,
io::ErrorKind::TimedOut,
];
for kind in kinds {
let io_err = io::Error::new(kind, "test error");
let store_err: StoreError = io_err.into();
match &store_err {
StoreError::Io(arc) => assert_eq!(arc.kind(), kind),
other => panic!("expected StoreError::Io, got {other:?}"),
}
}
}
}