use crate::backend::native::v2::kv_store::ttl;
use crate::backend::native::v2::kv_store::types::{KvEntry, KvMetadata, KvStoreError, KvValue};
use crate::snapshot::SnapshotId;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::time::SystemTime;
#[derive(Debug, Default)]
pub struct KvStore {
pub(crate) entries: RwLock<HashMap<Vec<u8>, Vec<KvEntry>>>,
}
impl KvStore {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self, key: &[u8]) -> Result<Option<KvValue>, KvStoreError> {
let entries = self.entries.read();
if let Some(versions) = entries.get(key) {
if let Some(entry) = versions.last() {
if ttl::is_expired(entry) {
return Ok(None);
}
Ok(Some(entry.value.clone()))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn get_at_snapshot(
&self,
key: &[u8],
snapshot_id: SnapshotId,
) -> Result<Option<KvValue>, KvStoreError> {
let entries = self.entries.read();
let snapshot_lsn = snapshot_id.as_lsn();
if let Some(versions) = entries.get(key) {
if snapshot_lsn == 0 {
if let Some(entry) = versions.last() {
if ttl::is_expired(entry) {
return Ok(None);
}
return Ok(Some(entry.value.clone()));
}
return Ok(None);
}
let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);
if idx == 0 {
return Ok(None);
}
let entry = &versions[idx - 1];
if ttl::is_expired(entry) {
return Ok(None);
}
Ok(Some(entry.value.clone()))
} else {
Ok(None)
}
}
pub fn set(
&mut self,
key: Vec<u8>,
value: KvValue,
ttl: Option<u64>,
) -> Result<(), KvStoreError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut entries = self.entries.write();
let created_at = if let Some(versions) = entries.get(&key) {
if let Some(latest) = versions.last() {
latest.metadata.created_at
} else {
now
}
} else {
now
};
let metadata = KvMetadata {
created_at,
updated_at: now,
ttl_seconds: ttl,
version: 0, };
let entry = KvEntry {
key: key.clone(),
value,
metadata,
};
entries.entry(key).or_default().push(entry);
Ok(())
}
pub fn delete(&mut self, key: &[u8]) -> Result<(), KvStoreError> {
let mut entries = self.entries.write();
entries
.remove(key)
.map(|_| ())
.ok_or_else(|| KvStoreError::KeyNotFound(key.to_vec()))
}
pub fn exists(&self, key: &[u8]) -> bool {
let entries = self.entries.read();
if let Some(versions) = entries.get(key) {
if let Some(entry) = versions.last() {
!ttl::is_expired(entry)
} else {
false
}
} else {
false
}
}
pub fn len(&self) -> usize {
let entries = self.entries.read();
entries.len()
}
pub fn prefix_scan(
&self,
snapshot_id: SnapshotId,
prefix: &[u8],
) -> Result<Vec<(Vec<u8>, KvValue)>, KvStoreError> {
let entries = self.entries.read();
let snapshot_lsn = snapshot_id.as_lsn();
let mut results = Vec::new();
for (key, versions) in entries.iter() {
if !key.starts_with(prefix) {
continue;
}
let entry = if snapshot_lsn == 0 {
versions.last()
} else {
let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);
if idx == 0 {
None
} else {
Some(&versions[idx - 1])
}
};
if let Some(e) = entry {
if !ttl::is_expired(e) {
results.push((key.clone(), e.value.clone()));
}
}
}
results.sort_by(|a, b| a.0.cmp(&b.0)); Ok(results)
}
pub fn cleanup_expired(&mut self) -> usize {
ttl::cleanup_expired_entries(self)
}
pub fn set_with_version(
&mut self,
key: Vec<u8>,
value: KvValue,
ttl: Option<u64>,
version: u64,
) -> Result<(), KvStoreError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut entries = self.entries.write();
let created_at = if let Some(versions) = entries.get(&key) {
if let Some(latest) = versions.last() {
latest.metadata.created_at
} else {
now
}
} else {
now
};
let metadata = KvMetadata {
created_at,
updated_at: now,
ttl_seconds: ttl,
version,
};
let entry = KvEntry {
key: key.clone(),
value,
metadata,
};
let versions = entries.entry(key).or_default();
let pos = versions.partition_point(|e| e.metadata.version < version);
versions.insert(pos, entry);
Ok(())
}
}
pub fn recover_from_wal<P: AsRef<std::path::Path>>(wal_path: P) -> Result<KvStore, KvStoreError> {
let wal_path = wal_path.as_ref();
if !wal_path.exists() {
return Ok(KvStore::new());
}
let mut reader = crate::backend::native::v2::wal::V2WALReader::open(wal_path)
.map_err(|e| KvStoreError::RecoveryFailed(format!("Failed to open WAL: {}", e)))?;
let mut store = KvStore::new();
loop {
let result = reader.read_next_record_opt(false);
match result {
Ok(Some((lsn, record))) => {
match record {
crate::backend::native::v2::wal::V2WALRecord::KvSet {
key,
value_bytes,
value_type,
ttl_seconds,
version,
} => {
let record_version = if version == 0 { lsn } else { version };
crate::backend::native::v2::kv_store::wal::apply_set(
&mut store,
key,
value_bytes,
value_type,
ttl_seconds,
record_version,
)?;
}
crate::backend::native::v2::wal::V2WALRecord::KvDelete { key, .. } => {
let _ = crate::backend::native::v2::kv_store::wal::apply_delete(
&mut store,
key,
0, );
}
_ => {
}
}
}
Ok(None) => {
break;
}
Err(_) => {
break;
}
}
}
Ok(store)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_store() {
let store = KvStore::new();
assert_eq!(store.len(), 0);
}
#[test]
fn test_get_at_snapshot_visible() {
let mut store = KvStore::new();
store
.set_with_version(b"key".to_vec(), KvValue::Integer(42), None, 100)
.unwrap();
let snapshot = SnapshotId::from_lsn(150);
let result = store.get_at_snapshot(b"key", snapshot).unwrap();
assert_eq!(result, Some(KvValue::Integer(42)));
}
#[test]
fn test_get_at_snapshot_not_visible() {
let mut store = KvStore::new();
store
.set_with_version(b"key".to_vec(), KvValue::Integer(42), None, 200)
.unwrap();
let snapshot = SnapshotId::from_lsn(150);
let result = store.get_at_snapshot(b"key", snapshot).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_get_at_snapshot_expired() {
let mut store = KvStore::new();
store
.set_with_version(
b"key".to_vec(),
KvValue::Integer(42),
Some(1), 100,
)
.unwrap();
std::thread::sleep(std::time::Duration::from_secs(2));
let snapshot = SnapshotId::from_lsn(150);
let result = store.get_at_snapshot(b"key", snapshot).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_get_at_snapshot_missing_key() {
let store = KvStore::new();
let snapshot = SnapshotId::from_lsn(100);
let result = store.get_at_snapshot(b"missing", snapshot).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_snapshot_isolation_multiple_versions() {
let mut store = KvStore::new();
store
.set_with_version(b"key".to_vec(), KvValue::Integer(100), None, 100)
.unwrap();
store
.set_with_version(b"key".to_vec(), KvValue::Integer(200), None, 200)
.unwrap();
let snapshot_250 = SnapshotId::from_lsn(250);
let result = store.get_at_snapshot(b"key", snapshot_250).unwrap();
assert_eq!(result, Some(KvValue::Integer(200)));
let snapshot_150 = SnapshotId::from_lsn(150);
let result = store.get_at_snapshot(b"key", snapshot_150).unwrap();
assert_eq!(result, Some(KvValue::Integer(100)));
let snapshot_50 = SnapshotId::from_lsn(50);
let result = store.get_at_snapshot(b"key", snapshot_50).unwrap();
assert_eq!(result, None);
}
}