use crate::backend::native::v3::kv_store::types::{hash_key, KvEntry, KvMetadata, KvValue};
use crate::snapshot::SnapshotId;
use parking_lot::RwLock;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub struct KvStore {
entries: RwLock<HashMap<u64, Vec<KvEntry>>>,
}
impl KvStore {
pub fn new() -> Self {
Self::default()
}
pub fn get_at_snapshot(
&self,
key: &[u8],
snapshot_id: SnapshotId,
) -> Option<KvValue> {
let key_hash = hash_key(key);
let entries = self.entries.read();
let versions = entries.get(&key_hash)?;
let snapshot_lsn = snapshot_id.as_lsn();
if snapshot_lsn == 0 {
return versions.last()
.filter(|e| !e.is_expired())
.map(|e| e.value.clone());
}
let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);
if idx == 0 {
return None; }
let entry = &versions[idx - 1];
if entry.is_expired() {
return None;
}
if matches!(entry.value, KvValue::Null) {
return None;
}
Some(entry.value.clone())
}
pub fn set(&self, key: Vec<u8>, value: KvValue, ttl_seconds: Option<u64>, version: u64) {
let key_hash = hash_key(&key);
let mut entries = self.entries.write();
let created_at = entries
.get(&key_hash)
.and_then(|versions| versions.last().map(|e| e.metadata.created_at))
.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = KvEntry {
key: key.clone(),
value,
metadata: KvMetadata {
created_at,
updated_at: now,
ttl_seconds,
version,
},
};
entries.entry(key_hash).or_default().push(entry);
}
pub fn delete(&self, key: &[u8], version: u64) {
let key_hash = hash_key(key);
let mut entries = self.entries.write();
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = KvEntry {
key: key.to_vec(),
value: KvValue::Null,
metadata: KvMetadata {
created_at: now,
updated_at: now,
ttl_seconds: None,
version,
},
};
entries.entry(key_hash).or_default().push(entry);
}
pub fn prefix_scan(
&self,
prefix: &[u8],
snapshot_id: SnapshotId,
) -> Vec<(Vec<u8>, KvValue)> {
let entries = self.entries.read();
let mut results = Vec::new();
for versions in entries.values() {
if let Some(entry) = versions.last() {
if entry.key.starts_with(prefix) {
if entry.metadata.version <= snapshot_id.as_lsn() {
if !entry.is_expired() && !matches!(entry.value, KvValue::Null) {
results.push((entry.key.clone(), entry.value.clone()));
}
}
}
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results
}
pub fn cleanup_expired(&self, _min_active_snapshot: u64) {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kv_store_basic_operations() {
let store = KvStore::new();
let snapshot = SnapshotId::current();
store.set(b"key1".to_vec(), KvValue::Integer(42), None, 1);
let value = store.get_at_snapshot(b"key1", snapshot);
assert_eq!(value, Some(KvValue::Integer(42)));
let value = store.get_at_snapshot(b"nonexistent", snapshot);
assert_eq!(value, None);
}
#[test]
fn test_kv_store_snapshot_isolation() {
let store = KvStore::new();
store.set(b"key".to_vec(), KvValue::Integer(1), None, 10);
store.set(b"key".to_vec(), KvValue::Integer(2), None, 20);
store.set(b"key".to_vec(), KvValue::Integer(3), None, 30);
let snapshot_15 = SnapshotId::from_lsn(15);
let snapshot_25 = SnapshotId::from_lsn(25);
let snapshot_35 = SnapshotId::from_lsn(35);
assert_eq!(store.get_at_snapshot(b"key", snapshot_15), Some(KvValue::Integer(1)));
assert_eq!(store.get_at_snapshot(b"key", snapshot_25), Some(KvValue::Integer(2)));
assert_eq!(store.get_at_snapshot(b"key", snapshot_35), Some(KvValue::Integer(3)));
}
#[test]
fn test_kv_store_delete() {
let store = KvStore::new();
store.set(b"key".to_vec(), KvValue::Integer(42), None, 10);
store.delete(b"key", 20);
let snapshot_before = SnapshotId::from_lsn(15);
let snapshot_after = SnapshotId::from_lsn(25);
assert_eq!(store.get_at_snapshot(b"key", snapshot_before), Some(KvValue::Integer(42)));
assert_eq!(store.get_at_snapshot(b"key", snapshot_after), None);
}
#[test]
fn test_kv_store_prefix_scan() {
let store = KvStore::new();
let snapshot = SnapshotId::from_lsn(100);
store.set(b"user:1".to_vec(), KvValue::String("Alice".to_string()), None, 10);
store.set(b"user:2".to_vec(), KvValue::String("Bob".to_string()), None, 10);
store.set(b"user:3".to_vec(), KvValue::String("Charlie".to_string()), None, 10);
store.set(b"other".to_vec(), KvValue::String("Other".to_string()), None, 10);
let results = store.prefix_scan(b"user:", snapshot);
assert_eq!(results.len(), 3);
assert_eq!(results[0].0, b"user:1".to_vec());
assert_eq!(results[1].0, b"user:2".to_vec());
assert_eq!(results[2].0, b"user:3".to_vec());
}
#[test]
fn test_kv_store_ttl_expiration() {
let store = KvStore::new();
store.set(b"expired".to_vec(), KvValue::Integer(1), Some(0), 10);
store.set(b"valid".to_vec(), KvValue::Integer(2), None, 10);
let snapshot = SnapshotId::from_lsn(100);
assert_eq!(store.get_at_snapshot(b"expired", snapshot), None);
assert_eq!(store.get_at_snapshot(b"valid", snapshot), Some(KvValue::Integer(2)));
}
}