use crate::block::Block;
use aegis_common::{BlockId, Result, TransactionId, AegisError};
use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn write_block(&self, block: Block) -> Result<BlockId>;
async fn read_block(&self, id: BlockId) -> Result<Block>;
async fn delete_block(&self, id: BlockId) -> Result<()>;
async fn block_exists(&self, id: BlockId) -> Result<bool>;
async fn begin_transaction(&self) -> Result<TransactionId>;
async fn commit_transaction(&self, tx_id: TransactionId) -> Result<()>;
async fn rollback_transaction(&self, tx_id: TransactionId) -> Result<()>;
async fn sync(&self) -> Result<()>;
fn stats(&self) -> StorageStats;
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub total_blocks: u64,
pub total_bytes: u64,
pub read_ops: u64,
pub write_ops: u64,
pub delete_ops: u64,
}
pub struct MemoryBackend {
blocks: RwLock<HashMap<BlockId, Block>>,
next_block_id: AtomicU64,
next_tx_id: AtomicU64,
stats: RwLock<StorageStats>,
}
impl MemoryBackend {
pub fn new() -> Self {
Self {
blocks: RwLock::new(HashMap::new()),
next_block_id: AtomicU64::new(1),
next_tx_id: AtomicU64::new(1),
stats: RwLock::new(StorageStats::default()),
}
}
fn allocate_block_id(&self) -> BlockId {
BlockId(self.next_block_id.fetch_add(1, Ordering::SeqCst))
}
fn allocate_tx_id(&self) -> TransactionId {
TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst))
}
}
impl Default for MemoryBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl StorageBackend for MemoryBackend {
async fn write_block(&self, mut block: Block) -> Result<BlockId> {
let block_id = if block.header.block_id.0 == 0 {
self.allocate_block_id()
} else {
block.header.block_id
};
block.header.block_id = block_id;
let size = block.data.len() as u64;
let mut blocks = self.blocks.write();
let is_new = !blocks.contains_key(&block_id);
blocks.insert(block_id, block);
let mut stats = self.stats.write();
stats.write_ops += 1;
if is_new {
stats.total_blocks += 1;
stats.total_bytes += size;
}
Ok(block_id)
}
async fn read_block(&self, id: BlockId) -> Result<Block> {
let blocks = self.blocks.read();
let block = blocks
.get(&id)
.cloned()
.ok_or_else(|| AegisError::BlockNotFound(id.0))?;
self.stats.write().read_ops += 1;
Ok(block)
}
async fn delete_block(&self, id: BlockId) -> Result<()> {
let mut blocks = self.blocks.write();
if let Some(block) = blocks.remove(&id) {
let mut stats = self.stats.write();
stats.delete_ops += 1;
stats.total_blocks -= 1;
stats.total_bytes -= block.data.len() as u64;
Ok(())
} else {
Err(AegisError::BlockNotFound(id.0))
}
}
async fn block_exists(&self, id: BlockId) -> Result<bool> {
Ok(self.blocks.read().contains_key(&id))
}
async fn begin_transaction(&self) -> Result<TransactionId> {
Ok(self.allocate_tx_id())
}
async fn commit_transaction(&self, _tx_id: TransactionId) -> Result<()> {
Ok(())
}
async fn rollback_transaction(&self, _tx_id: TransactionId) -> Result<()> {
Ok(())
}
async fn sync(&self) -> Result<()> {
Ok(())
}
fn stats(&self) -> StorageStats {
self.stats.read().clone()
}
}
pub struct LocalBackend {
data_dir: PathBuf,
blocks: RwLock<HashMap<BlockId, PathBuf>>,
next_block_id: AtomicU64,
next_tx_id: AtomicU64,
stats: RwLock<StorageStats>,
sync_writes: bool,
}
impl LocalBackend {
pub fn new(data_dir: PathBuf, sync_writes: bool) -> Result<Self> {
std::fs::create_dir_all(&data_dir)?;
let backend = Self {
data_dir,
blocks: RwLock::new(HashMap::new()),
next_block_id: AtomicU64::new(1),
next_tx_id: AtomicU64::new(1),
stats: RwLock::new(StorageStats::default()),
sync_writes,
};
Ok(backend)
}
fn block_path(&self, id: BlockId) -> PathBuf {
self.data_dir.join(format!("block_{:016x}.dat", id.0))
}
fn allocate_block_id(&self) -> BlockId {
BlockId(self.next_block_id.fetch_add(1, Ordering::SeqCst))
}
fn allocate_tx_id(&self) -> TransactionId {
TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst))
}
}
#[async_trait]
impl StorageBackend for LocalBackend {
async fn write_block(&self, mut block: Block) -> Result<BlockId> {
let block_id = if block.header.block_id.0 == 0 {
self.allocate_block_id()
} else {
block.header.block_id
};
block.header.block_id = block_id;
let path = self.block_path(block_id);
let data = block.to_bytes()?;
tokio::fs::write(&path, &data).await?;
if self.sync_writes {
let file = tokio::fs::File::open(&path).await?;
file.sync_all().await?;
}
let mut blocks = self.blocks.write();
let is_new = !blocks.contains_key(&block_id);
blocks.insert(block_id, path);
let mut stats = self.stats.write();
stats.write_ops += 1;
if is_new {
stats.total_blocks += 1;
stats.total_bytes += data.len() as u64;
}
Ok(block_id)
}
async fn read_block(&self, id: BlockId) -> Result<Block> {
let path = self.block_path(id);
let data = tokio::fs::read(&path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
AegisError::BlockNotFound(id.0)
} else {
AegisError::Io(e)
}
})?;
self.stats.write().read_ops += 1;
Block::from_bytes(&data)
}
async fn delete_block(&self, id: BlockId) -> Result<()> {
let path = self.block_path(id);
tokio::fs::remove_file(&path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
AegisError::BlockNotFound(id.0)
} else {
AegisError::Io(e)
}
})?;
self.blocks.write().remove(&id);
let mut stats = self.stats.write();
stats.delete_ops += 1;
stats.total_blocks = stats.total_blocks.saturating_sub(1);
Ok(())
}
async fn block_exists(&self, id: BlockId) -> Result<bool> {
let path = self.block_path(id);
Ok(tokio::fs::try_exists(&path).await.unwrap_or(false))
}
async fn begin_transaction(&self) -> Result<TransactionId> {
Ok(self.allocate_tx_id())
}
async fn commit_transaction(&self, _tx_id: TransactionId) -> Result<()> {
Ok(())
}
async fn rollback_transaction(&self, _tx_id: TransactionId) -> Result<()> {
Ok(())
}
async fn sync(&self) -> Result<()> {
Ok(())
}
fn stats(&self) -> StorageStats {
self.stats.read().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use aegis_common::BlockType;
use bytes::Bytes;
#[tokio::test]
async fn test_memory_backend_write_read() {
let backend = MemoryBackend::new();
let data = Bytes::from("test data");
let block = Block::new(BlockId(0), BlockType::TableData, data.clone());
let id = backend.write_block(block).await.unwrap();
let read_block = backend.read_block(id).await.unwrap();
assert_eq!(read_block.data, data);
}
#[tokio::test]
async fn test_memory_backend_delete() {
let backend = MemoryBackend::new();
let block = Block::new(BlockId(0), BlockType::TableData, Bytes::from("test"));
let id = backend.write_block(block).await.unwrap();
assert!(backend.block_exists(id).await.unwrap());
backend.delete_block(id).await.unwrap();
assert!(!backend.block_exists(id).await.unwrap());
}
#[tokio::test]
async fn test_local_backend_write_read() {
let temp_dir = tempfile::tempdir().unwrap();
let backend = LocalBackend::new(temp_dir.path().to_path_buf(), false).unwrap();
let data = Bytes::from("test data");
let block = Block::new(BlockId(0), BlockType::TableData, data.clone());
let id = backend.write_block(block).await.unwrap();
let read_block = backend.read_block(id).await.unwrap();
assert_eq!(read_block.data, data);
}
}