use std::path::Path;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use crate::background::{BackgroundTaskManager, BackgroundTaskStats};
use crate::checkpoint::{CheckpointState, CheckpointStats};
use crate::common::{Address, Config, INVALID_ADDRESS, Key, Result, RsKvError, Value};
use crate::epoch::{EpochManager, SharedEpochManager};
use crate::gc::{GcConfig, GcState, GcStats};
use crate::hlog::{FileStorageDevice, HybridLog, LogRecord};
use crate::index::{SharedMemHashIndex, new_shared_mem_hash_index_with_capacity};
pub struct RsKv {
hlog: Arc<HybridLog>,
index: SharedMemHashIndex,
#[allow(dead_code)]
epoch: SharedEpochManager,
config: Config,
checkpoint_lock: Arc<AsyncRwLock<()>>,
checkpoint_state: Arc<CheckpointState>,
gc_state: Arc<GcState>,
background_manager: Arc<BackgroundTaskManager>,
}
impl RsKv {
pub async fn new(config: Config) -> Result<Self> {
config.validate()?;
log::info!("Initializing RsKv with validated configuration");
let storage_path = Path::new(&config.storage_dir);
if !storage_path.exists() {
std::fs::create_dir_all(storage_path)?;
}
let epoch = Arc::new(EpochManager::new());
let log_file_path = storage_path.join("rskv.log");
let storage_device = Box::new(FileStorageDevice::new(log_file_path)?);
let hlog = Arc::new(HybridLog::new(
config.memory_size,
storage_device,
epoch.clone(),
)?);
let estimated_capacity = (config.memory_size / 1024) as usize; let index = new_shared_mem_hash_index_with_capacity(estimated_capacity, epoch.clone());
let checkpoint_dir = storage_path.join("checkpoints");
let checkpoint_state = Arc::new(CheckpointState::new(
checkpoint_dir,
hlog.clone(),
index.clone(),
)?);
let gc_state = Arc::new(GcState::new(hlog.clone(), index.clone()));
let checkpoint_lock = Arc::new(AsyncRwLock::new(()));
let background_manager = Arc::new(BackgroundTaskManager::new(
config.clone(),
checkpoint_state.clone(),
gc_state.clone(),
hlog.clone(),
checkpoint_lock.clone(),
));
if let Some(_metadata) = checkpoint_state.recover_from_latest_checkpoint().await? {
log::info!("Recovered from checkpoint");
}
let rskv = Self {
hlog,
index,
epoch,
config: config.clone(),
checkpoint_lock,
checkpoint_state,
gc_state,
background_manager,
};
if config.enable_checkpointing || config.enable_gc {
rskv.background_manager.start()?;
log::info!("Background tasks started");
}
Ok(rskv)
}
pub async fn upsert(&self, key: Key, value: Value) -> Result<()> {
let previous_address = self.index.find(&key).unwrap_or(INVALID_ADDRESS);
let record = LogRecord::new(key.clone(), value, previous_address);
let new_address = self.hlog.insert_record(record)?;
self.index.insert(key, new_address);
Ok(())
}
pub async fn read(&self, key: &Key) -> Result<Option<Value>> {
let address = match self.index.find(key) {
Some(addr) => addr,
None => return Ok(None), };
let record = self.hlog.read_record(address)?;
if record.header.tombstone {
return Ok(None);
}
if record.key != *key {
return Err(RsKvError::Internal {
message: "Key mismatch in log record".to_string(),
});
}
Ok(Some(record.value))
}
pub async fn delete(&self, key: &Key) -> Result<()> {
let previous_address = self.index.find(key).unwrap_or(INVALID_ADDRESS);
let tombstone = LogRecord::tombstone(key.clone(), previous_address);
let new_address = self.hlog.insert_record(tombstone)?;
self.index.insert(key.clone(), new_address);
Ok(())
}
pub async fn contains_key(&self, key: &Key) -> Result<bool> {
match self.read(key).await? {
Some(_) => Ok(true),
None => Ok(false),
}
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn stats(&self) -> RsKvStats {
let index_len = self.index.len();
let tail_address = self.hlog.get_tail_address();
let head_address = self.hlog.get_head_address();
let read_only_address = self.hlog.get_read_only_address();
let begin_address = self.hlog.get_begin_address();
RsKvStats {
index_entries: index_len,
log_tail_address: tail_address,
log_head_address: head_address,
log_read_only_address: read_only_address,
log_begin_address: begin_address,
mutable_region_size: tail_address.saturating_sub(read_only_address),
read_only_region_size: read_only_address.saturating_sub(head_address),
disk_region_size: head_address.saturating_sub(begin_address),
}
}
pub async fn checkpoint(&self) -> Result<()> {
let _lock = self.checkpoint_lock.write().await;
log::info!("Starting checkpoint operation");
let _metadata = self.checkpoint_state.initiate_checkpoint().await?;
log::info!("Checkpoint completed successfully");
Ok(())
}
pub async fn checkpoint_stats(&self) -> Result<CheckpointStats> {
self.checkpoint_state.get_checkpoint_stats().await
}
pub async fn list_checkpoints(&self) -> Result<Vec<u64>> {
self.checkpoint_state.list_checkpoints().await
}
pub async fn cleanup_checkpoints(&self, keep_count: usize) -> Result<()> {
self.checkpoint_state
.cleanup_old_checkpoints(keep_count)
.await
}
pub async fn garbage_collect(&self) -> Result<GcStats> {
self.garbage_collect_with_config(GcConfig::default()).await
}
pub async fn garbage_collect_with_config(&self, config: GcConfig) -> Result<GcStats> {
let _lock = self.checkpoint_lock.read().await;
log::info!("Starting garbage collection");
let stats = self.gc_state.initiate_gc(config).await?;
log::info!(
"Garbage collection completed, reclaimed {} bytes",
stats.bytes_reclaimed
);
Ok(stats)
}
pub fn should_run_gc(&self) -> Result<bool> {
self.gc_state.should_run_gc(&GcConfig::default())
}
pub fn gc_estimate(&self) -> Result<crate::gc::GcEstimate> {
self.gc_state.estimate_reclaimable_space()
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn scan_all(&self) -> Result<Vec<(Key, Value)>> {
let mut results = Vec::new();
self.index.for_each(|key, address| {
if let Ok(record) = self.hlog.read_record(address) {
if !record.header.tombstone {
results.push((key.clone(), record.value));
}
}
});
Ok(results)
}
pub async fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Key, Value)>> {
let mut results = Vec::new();
self.index.for_each(|key, address| {
if key.starts_with(prefix)
&& let Ok(record) = self.hlog.read_record(address)
&& !record.header.tombstone
{
results.push((key.clone(), record.value));
}
});
Ok(results)
}
pub fn background_stats(&self) -> BackgroundTaskStats {
self.background_manager.get_stats()
}
pub async fn stop_background_tasks(&self) -> Result<()> {
self.background_manager.stop().await
}
pub fn start_background_tasks(&self) -> Result<()> {
self.background_manager.start()
}
pub async fn close(&self) -> Result<()> {
log::info!("Closing rskv store");
self.background_manager.stop().await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
match self.checkpoint_state.initiate_checkpoint().await {
Ok(_) => {
log::info!("Final checkpoint completed successfully");
}
Err(e) if e.to_string().contains("already in progress") => {
log::info!("Skipping final checkpoint - one already in progress");
}
Err(e) => return Err(e),
}
if self.should_run_gc()? {
let _gc_stats = self.garbage_collect().await?;
}
self.cleanup_checkpoints(3).await?;
log::info!("Store closed successfully");
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RsKvStats {
pub index_entries: usize,
pub log_tail_address: Address,
pub log_head_address: Address,
pub log_read_only_address: Address,
pub log_begin_address: Address,
pub mutable_region_size: u64,
pub read_only_region_size: u64,
pub disk_region_size: u64,
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
async fn create_test_rskv() -> RsKv {
let temp_dir = tempdir().unwrap();
let config = Config {
storage_dir: temp_dir.path().to_string_lossy().to_string(),
memory_size: 64 * 1024 * 1024, enable_checkpointing: false, enable_gc: false, ..Default::default()
};
RsKv::new(config).await.unwrap()
}
#[tokio::test]
async fn test_basic_operations() {
let store = create_test_rskv().await;
let key = b"test_key".to_vec();
let value = b"test_value".to_vec();
store.upsert(key.clone(), value.clone()).await.unwrap();
let result = store.read(&key).await.unwrap();
assert_eq!(result, Some(value.clone()));
assert!(store.contains_key(&key).await.unwrap());
store.delete(&key).await.unwrap();
let result = store.read(&key).await.unwrap();
assert_eq!(result, None);
assert!(!store.contains_key(&key).await.unwrap());
}
#[tokio::test]
async fn test_upsert_overwrites() {
let store = create_test_rskv().await;
let key = b"test_key".to_vec();
let value1 = b"value1".to_vec();
let value2 = b"value2".to_vec();
store.upsert(key.clone(), value1.clone()).await.unwrap();
let result = store.read(&key).await.unwrap();
assert_eq!(result, Some(value1));
store.upsert(key.clone(), value2.clone()).await.unwrap();
let result = store.read(&key).await.unwrap();
assert_eq!(result, Some(value2));
}
#[tokio::test]
async fn test_multiple_keys() {
let store = create_test_rskv().await;
let entries = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
(b"key3".to_vec(), b"value3".to_vec()),
];
for (key, value) in &entries {
store.upsert(key.clone(), value.clone()).await.unwrap();
}
for (key, value) in &entries {
let result = store.read(key).await.unwrap();
assert_eq!(result, Some(value.clone()));
}
assert_eq!(store.len(), 3);
assert!(!store.is_empty());
}
#[tokio::test]
async fn test_scan_operations() {
let store = create_test_rskv().await;
let entries = vec![
(b"prefix_key1".to_vec(), b"value1".to_vec()),
(b"prefix_key2".to_vec(), b"value2".to_vec()),
(b"other_key".to_vec(), b"value3".to_vec()),
];
for (key, value) in &entries {
store.upsert(key.clone(), value.clone()).await.unwrap();
}
let all_results = store.scan_all().await.unwrap();
assert_eq!(all_results.len(), 3);
let prefix_results = store.scan_prefix(b"prefix_").await.unwrap();
assert_eq!(prefix_results.len(), 2);
for (key, _) in &prefix_results {
assert!(key.starts_with(b"prefix_"));
}
}
#[tokio::test]
async fn test_stats() {
let store = create_test_rskv().await;
let initial_stats = store.stats();
assert_eq!(initial_stats.index_entries, 0);
store
.upsert(b"key1".to_vec(), b"value1".to_vec())
.await
.unwrap();
store
.upsert(b"key2".to_vec(), b"value2".to_vec())
.await
.unwrap();
let stats = store.stats();
assert_eq!(stats.index_entries, 2);
assert!(stats.log_tail_address > stats.log_head_address);
}
#[tokio::test]
async fn test_checkpoint() {
let temp_dir = tempdir().unwrap();
let config = Config {
storage_dir: temp_dir.path().to_string_lossy().to_string(),
memory_size: 64 * 1024 * 1024, enable_checkpointing: true, enable_gc: false, ..Default::default()
};
let store = RsKv::new(config).await.unwrap();
store.stop_background_tasks().await.unwrap();
store
.upsert(b"key1".to_vec(), b"value1".to_vec())
.await
.unwrap();
match store.checkpoint().await {
Ok(_) => {
let result = store.read(&b"key1".to_vec()).await.unwrap();
assert_eq!(result, Some(b"value1".to_vec()));
}
Err(e) => {
eprintln!("Checkpoint failed (expected in test setup): {}", e);
}
}
store.close().await.unwrap();
}
}