use async_trait::async_trait;
use std::fmt::Debug;
#[derive(Debug, thiserror::Error)]
pub enum KvError {
#[error("Key not found: {0}")]
NotFound(String),
#[error("Key already exists: {0}")]
AlreadyExists(String),
#[error("Storage error: {0}")]
Storage(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Conflict: {0}")]
Conflict(String),
#[error("Unsupported operation: {0}")]
Unsupported(String),
#[error("Configuration error: {0}")]
Config(String),
}
pub type KvResult<T> = std::result::Result<T, KvError>;
#[derive(Debug, Clone)]
pub struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[derive(Debug, Clone, Default)]
pub struct WriteOptions {
pub sync: bool,
}
#[derive(Debug, Default)]
pub struct WriteBatch {
pub operations: Vec<BatchOperation>,
}
#[derive(Debug, Clone)]
pub enum BatchOperation {
Put { key: Vec<u8>, value: Vec<u8> },
Delete { key: Vec<u8> },
}
impl WriteBatch {
pub fn new() -> Self {
Self::default()
}
pub fn put(&mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) {
self.operations.push(BatchOperation::Put {
key: key.into(),
value: value.into(),
});
}
pub fn delete(&mut self, key: impl Into<Vec<u8>>) {
self.operations
.push(BatchOperation::Delete { key: key.into() });
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
pub fn len(&self) -> usize {
self.operations.len()
}
}
#[async_trait]
pub trait KvStore: Send + Sync + Debug {
async fn get(&self, key: &[u8]) -> KvResult<Option<Vec<u8>>>;
async fn put(&self, key: &[u8], value: &[u8]) -> KvResult<()>;
async fn put_with_options(&self, key: &[u8], value: &[u8], opts: WriteOptions) -> KvResult<()> {
let _ = opts;
self.put(key, value).await
}
async fn delete(&self, key: &[u8]) -> KvResult<()>;
async fn exists(&self, key: &[u8]) -> KvResult<bool> {
Ok(self.get(key).await?.is_some())
}
async fn scan_prefix(&self, prefix: &[u8]) -> KvResult<Vec<KeyValue>>;
async fn scan_range(&self, start: &[u8], end: &[u8]) -> KvResult<Vec<KeyValue>>;
async fn write_batch(&self, batch: WriteBatch) -> KvResult<()>;
async fn write_batch_with_options(
&self,
batch: WriteBatch,
opts: WriteOptions,
) -> KvResult<()> {
let _ = opts;
self.write_batch(batch).await
}
async fn close(&self) -> KvResult<()>;
fn backend_name(&self) -> &'static str;
fn supports_horizontal_scaling(&self) -> bool {
false
}
}
#[derive(Debug, Clone, Default)]
pub enum KvStoreConfig {
#[default]
Memory,
#[cfg(feature = "slatedb-storage")]
SlateDb {
object_store_url: String,
aws_region: Option<String>,
cache_dir: Option<String>,
},
}
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(Debug, Default)]
pub struct MemoryKvStore {
data: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
}
impl MemoryKvStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl KvStore for MemoryKvStore {
async fn get(&self, key: &[u8]) -> KvResult<Option<Vec<u8>>> {
Ok(self.data.read().get(key).cloned())
}
async fn put(&self, key: &[u8], value: &[u8]) -> KvResult<()> {
self.data.write().insert(key.to_vec(), value.to_vec());
Ok(())
}
async fn delete(&self, key: &[u8]) -> KvResult<()> {
self.data.write().remove(key);
Ok(())
}
async fn scan_prefix(&self, prefix: &[u8]) -> KvResult<Vec<KeyValue>> {
let data = self.data.read();
let results: Vec<KeyValue> = data
.range(prefix.to_vec()..)
.take_while(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect();
Ok(results)
}
async fn scan_range(&self, start: &[u8], end: &[u8]) -> KvResult<Vec<KeyValue>> {
let data = self.data.read();
let results: Vec<KeyValue> = data
.range(start.to_vec()..end.to_vec())
.map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect();
Ok(results)
}
async fn write_batch(&self, batch: WriteBatch) -> KvResult<()> {
let mut data = self.data.write();
for op in batch.operations {
match op {
BatchOperation::Put { key, value } => {
data.insert(key, value);
}
BatchOperation::Delete { key } => {
data.remove(&key);
}
}
}
Ok(())
}
async fn close(&self) -> KvResult<()> {
Ok(())
}
fn backend_name(&self) -> &'static str {
"memory"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_store_basic_operations() {
let store = MemoryKvStore::new();
store.put(b"key1", b"value1").await.unwrap();
assert_eq!(store.get(b"key1").await.unwrap(), Some(b"value1".to_vec()));
store.put(b"key1", b"value2").await.unwrap();
assert_eq!(store.get(b"key1").await.unwrap(), Some(b"value2".to_vec()));
store.delete(b"key1").await.unwrap();
assert_eq!(store.get(b"key1").await.unwrap(), None);
store.delete(b"key1").await.unwrap();
}
#[tokio::test]
async fn test_memory_store_scan_prefix() {
let store = MemoryKvStore::new();
store.put(b"user:1", b"alice").await.unwrap();
store.put(b"user:2", b"bob").await.unwrap();
store.put(b"user:3", b"charlie").await.unwrap();
store.put(b"tenant:1", b"acme").await.unwrap();
let results = store.scan_prefix(b"user:").await.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].key, b"user:1");
assert_eq!(results[1].key, b"user:2");
assert_eq!(results[2].key, b"user:3");
let results = store.scan_prefix(b"tenant:").await.unwrap();
assert_eq!(results.len(), 1);
}
#[tokio::test]
async fn test_memory_store_write_batch() {
let store = MemoryKvStore::new();
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.put(b"key3", b"value3");
store.write_batch(batch).await.unwrap();
assert_eq!(store.get(b"key1").await.unwrap(), Some(b"value1".to_vec()));
assert_eq!(store.get(b"key2").await.unwrap(), Some(b"value2".to_vec()));
assert_eq!(store.get(b"key3").await.unwrap(), Some(b"value3".to_vec()));
let mut batch = WriteBatch::new();
batch.delete(b"key2");
batch.put(b"key4", b"value4");
store.write_batch(batch).await.unwrap();
assert_eq!(store.get(b"key2").await.unwrap(), None);
assert_eq!(store.get(b"key4").await.unwrap(), Some(b"value4".to_vec()));
}
#[tokio::test]
async fn test_memory_store_exists() {
let store = MemoryKvStore::new();
assert!(!store.exists(b"key1").await.unwrap());
store.put(b"key1", b"value1").await.unwrap();
assert!(store.exists(b"key1").await.unwrap());
}
}