use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use super::error::{StorageError, StorageResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionModel {
Async,
SyncWrapped,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushBehavior {
NoOp,
Persists,
}
#[async_trait]
pub trait KvStore: Send + Sync {
async fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>>;
async fn put(&self, key: &[u8], value: Vec<u8>) -> StorageResult<()>;
async fn delete(&self, key: &[u8]) -> StorageResult<bool>;
async fn exists(&self, key: &[u8]) -> StorageResult<bool>;
async fn scan_prefix(&self, prefix: &[u8]) -> StorageResult<Vec<(Vec<u8>, Vec<u8>)>>;
async fn batch_put(&self, items: Vec<(Vec<u8>, Vec<u8>)>) -> StorageResult<()>;
async fn batch_delete(&self, keys: Vec<Vec<u8>>) -> StorageResult<()>;
async fn flush(&self) -> StorageResult<()>;
fn backend_name(&self) -> &'static str;
fn execution_model(&self) -> ExecutionModel;
fn flush_behavior(&self) -> FlushBehavior;
}
#[async_trait]
pub trait NamespacedStore: Send + Sync {
async fn open_namespace(&self, name: &str) -> StorageResult<Arc<dyn KvStore>>;
async fn list_namespaces(&self) -> StorageResult<Vec<String>>;
async fn delete_namespace(&self, name: &str) -> StorageResult<bool>;
}
#[async_trait]
pub trait TypedStore: Send + Sync {
async fn put_item<T: Serialize + Send + Sync>(&self, key: &str, item: &T) -> StorageResult<()>;
async fn get_item<T: DeserializeOwned + Send + Sync>(
&self,
key: &str,
) -> StorageResult<Option<T>>;
async fn delete_item(&self, key: &str) -> StorageResult<bool>;
async fn list_keys_with_prefix(&self, prefix: &str) -> StorageResult<Vec<String>>;
async fn scan_items_with_prefix<T: DeserializeOwned + Send + Sync>(
&self,
prefix: &str,
) -> StorageResult<Vec<(String, T)>>;
async fn batch_put_items<T: Serialize + Send + Sync>(
&self,
items: Vec<(String, T)>,
) -> StorageResult<()>;
async fn exists_item(&self, key: &str) -> StorageResult<bool>;
}
pub struct TypedKvStore<S: KvStore + ?Sized> {
inner: Arc<S>,
}
impl<S: KvStore + ?Sized> TypedKvStore<S> {
pub fn new(store: Arc<S>) -> Self {
Self { inner: store }
}
pub fn inner(&self) -> &Arc<S> {
&self.inner
}
}
#[async_trait]
impl<S: KvStore + ?Sized + 'static> TypedStore for TypedKvStore<S> {
async fn put_item<T: Serialize + Send + Sync>(&self, key: &str, item: &T) -> StorageResult<()> {
let bytes = serde_json::to_vec(item)
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
self.inner.put(key.as_bytes(), bytes).await
}
async fn get_item<T: DeserializeOwned + Send + Sync>(
&self,
key: &str,
) -> StorageResult<Option<T>> {
match self.inner.get(key.as_bytes()).await? {
Some(bytes) => {
let item = serde_json::from_slice(&bytes)
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
Ok(Some(item))
}
None => Ok(None),
}
}
async fn delete_item(&self, key: &str) -> StorageResult<bool> {
self.inner.delete(key.as_bytes()).await
}
async fn list_keys_with_prefix(&self, prefix: &str) -> StorageResult<Vec<String>> {
let results = self.inner.scan_prefix(prefix.as_bytes()).await?;
Ok(results
.into_iter()
.map(|(k, _)| String::from_utf8_lossy(&k).to_string())
.collect())
}
async fn scan_items_with_prefix<T: DeserializeOwned + Send + Sync>(
&self,
prefix: &str,
) -> StorageResult<Vec<(String, T)>> {
let results = self.inner.scan_prefix(prefix.as_bytes()).await?;
let mut items = Vec::new();
for (key_bytes, value_bytes) in results {
let key = String::from_utf8_lossy(&key_bytes).to_string();
let value = serde_json::from_slice(&value_bytes).map_err(|e| {
StorageError::SerializationError(format!("Failed to deserialize {}: {}", key, e))
})?;
items.push((key, value));
}
Ok(items)
}
async fn batch_put_items<T: Serialize + Send + Sync>(
&self,
items: Vec<(String, T)>,
) -> StorageResult<()> {
let serialized: Result<Vec<_>, StorageError> = items
.into_iter()
.map(|(k, v)| {
let bytes = serde_json::to_vec(&v)
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
Ok::<(Vec<u8>, Vec<u8>), StorageError>((k.into_bytes(), bytes))
})
.collect();
self.inner.batch_put(serialized?).await
}
async fn exists_item(&self, key: &str) -> StorageResult<bool> {
self.inner.exists(key.as_bytes()).await
}
}