use crate::{Error, Result, StorageEngine};
use crate::storage::Transaction;
use std::sync::Arc;
pub trait StorageAdapter: Send + Sync {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn delete(&self, key: &[u8]) -> Result<()>;
fn scan(&self, prefix: &[u8], limit: Option<usize>) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
fn begin_transaction(&self) -> Result<Box<dyn TransactionAdapter>>;
fn flush(&self) -> Result<()>;
}
pub trait TransactionAdapter: Send + Sync {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
fn delete(&mut self, key: &[u8]) -> Result<()>;
fn commit(self: Box<Self>) -> Result<()>;
fn rollback(self: Box<Self>) -> Result<()>;
}
pub struct LiteStorageAdapter {
engine: Arc<StorageEngine>,
}
impl LiteStorageAdapter {
pub fn new(engine: Arc<StorageEngine>) -> Self {
Self { engine }
}
pub fn engine(&self) -> &Arc<StorageEngine> {
&self.engine
}
}
impl StorageAdapter for LiteStorageAdapter {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let key_vec = key.to_vec();
self.engine.get(&key_vec)
}
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let key_vec = key.to_vec();
let value_vec = value.to_vec();
self.engine.put(&key_vec, &value_vec)
}
fn delete(&self, key: &[u8]) -> Result<()> {
let key_vec = key.to_vec();
self.engine.delete(&key_vec)
}
fn scan(&self, prefix: &[u8], limit: Option<usize>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
use rocksdb::IteratorMode;
let mut results = Vec::new();
let iter = self.engine.db.iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
for item in iter {
let (key, value) = item.map_err(|e| Error::storage(format!("Scan error: {}", e)))?;
if key.starts_with(prefix) {
results.push((key.to_vec(), value.to_vec()));
if let Some(limit) = limit {
if results.len() >= limit {
break;
}
}
} else {
break;
}
}
Ok(results)
}
fn begin_transaction(&self) -> Result<Box<dyn TransactionAdapter>> {
let txn = self.engine.begin_transaction()?;
Ok(Box::new(LiteTransactionAdapter { txn: Some(txn) }))
}
fn flush(&self) -> Result<()> {
self.engine.flush()
}
}
struct LiteTransactionAdapter {
txn: Option<Transaction>,
}
impl TransactionAdapter for LiteTransactionAdapter {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.txn.as_ref()
.ok_or_else(|| Error::transaction("Transaction already consumed"))?
.get(&key.to_vec())
}
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
self.txn.as_mut()
.ok_or_else(|| Error::transaction("Transaction already consumed"))?
.put(key.to_vec(), value.to_vec())
}
fn delete(&mut self, key: &[u8]) -> Result<()> {
self.txn.as_mut()
.ok_or_else(|| Error::transaction("Transaction already consumed"))?
.delete(key.to_vec())
}
fn commit(mut self: Box<Self>) -> Result<()> {
let txn = self.txn.take()
.ok_or_else(|| Error::transaction("Transaction already consumed"))?;
txn.commit()
}
fn rollback(mut self: Box<Self>) -> Result<()> {
let txn = self.txn.take()
.ok_or_else(|| Error::transaction("Transaction already consumed"))?;
txn.rollback()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::Config;
#[test]
fn test_storage_adapter_get_put() -> Result<()> {
let config = Config::in_memory();
let engine = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteStorageAdapter::new(engine);
let key = b"test_key";
let value = b"test_value";
adapter.put(key, value)?;
let result = adapter.get(key)?;
assert_eq!(result, Some(value.to_vec()));
Ok(())
}
#[test]
fn test_storage_adapter_delete() -> Result<()> {
let config = Config::in_memory();
let engine = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteStorageAdapter::new(engine);
let key = b"test_key";
let value = b"test_value";
adapter.put(key, value)?;
adapter.delete(key)?;
let result = adapter.get(key)?;
assert_eq!(result, None);
Ok(())
}
#[test]
fn test_storage_adapter_scan() -> Result<()> {
let config = Config::in_memory();
let engine = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteStorageAdapter::new(engine);
adapter.put(b"prefix:key1", b"value1")?;
adapter.put(b"prefix:key2", b"value2")?;
adapter.put(b"prefix:key3", b"value3")?;
adapter.put(b"other:key", b"other")?;
let results = adapter.scan(b"prefix:", None)?;
assert_eq!(results.len(), 3);
let results = adapter.scan(b"prefix:", Some(2))?;
assert_eq!(results.len(), 2);
Ok(())
}
#[test]
fn test_transaction_adapter() -> Result<()> {
let config = Config::in_memory();
let engine = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteStorageAdapter::new(engine);
let key = b"txn_key";
let value = b"txn_value";
let mut txn = adapter.begin_transaction()?;
txn.put(key, value)?;
txn.commit()?;
let result = adapter.get(key)?;
assert_eq!(result, Some(value.to_vec()));
Ok(())
}
#[test]
fn test_transaction_rollback() -> Result<()> {
let config = Config::in_memory();
let engine = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteStorageAdapter::new(engine);
let key = b"rollback_key";
let value = b"rollback_value";
let mut txn = adapter.begin_transaction()?;
txn.put(key, value)?;
txn.rollback()?;
let result = adapter.get(key)?;
assert_eq!(result, None);
Ok(())
}
}