use crate::backend::native::v3::kv_store::types::{KvEntry, KvMetadata, KvValue, hash_key};
use crate::snapshot::SnapshotId;
use parking_lot::RwLock;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub struct KvStore {
entries: RwLock<HashMap<u64, Vec<KvEntry>>>,
}
impl KvStore {
pub fn new() -> Self {
Self::default()
}
pub fn get_at_snapshot(&self, key: &[u8], snapshot_id: SnapshotId) -> Option<KvValue> {
let key_hash = hash_key(key);
let entries = self.entries.read();
let versions = entries.get(&key_hash)?;
let snapshot_lsn = snapshot_id.as_lsn();
if snapshot_lsn == 0 {
return versions
.last()
.filter(|e| !e.is_expired())
.map(|e| e.value.clone());
}
let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);
if idx == 0 {
return None; }
let entry = &versions[idx - 1];
if entry.is_expired() {
return None;
}
if matches!(entry.value, KvValue::Null) {
return None;
}
Some(entry.value.clone())
}
pub fn set(&self, key: Vec<u8>, value: KvValue, ttl_seconds: Option<u64>, version: u64) {
let key_hash = hash_key(&key);
let mut entries = self.entries.write();
let created_at = entries
.get(&key_hash)
.and_then(|versions| versions.last().map(|e| e.metadata.created_at))
.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = KvEntry {
key: key.clone(),
value,
metadata: KvMetadata {
created_at,
updated_at: now,
ttl_seconds,
version,
},
};
entries.entry(key_hash).or_default().push(entry);
}
pub fn delete(&self, key: &[u8], version: u64) {
let key_hash = hash_key(key);
let mut entries = self.entries.write();
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = KvEntry {
key: key.to_vec(),
value: KvValue::Null,
metadata: KvMetadata {
created_at: now,
updated_at: now,
ttl_seconds: None,
version,
},
};
entries.entry(key_hash).or_default().push(entry);
}
pub fn prefix_scan(&self, prefix: &[u8], snapshot_id: SnapshotId) -> Vec<(Vec<u8>, KvValue)> {
let entries = self.entries.read();
let mut results = Vec::new();
for versions in entries.values() {
if let Some(entry) = versions.last() {
if entry.key.starts_with(prefix) {
if entry.metadata.version <= snapshot_id.as_lsn() {
if !entry.is_expired() && !matches!(entry.value, KvValue::Null) {
results.push((entry.key.clone(), entry.value.clone()));
}
}
}
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results
}
pub fn cleanup_expired(&self, _min_active_snapshot: u64) {
}
pub fn to_bytes(&self) -> Vec<u8> {
let entries = self.entries.read();
let mut result = Vec::new();
let count: u32 = entries.len().try_into().unwrap_or(u32::MAX);
result.extend_from_slice(&count.to_le_bytes());
for versions in entries.values() {
if let Some(entry) = versions.last() {
if matches!(entry.value, KvValue::Null) {
continue;
}
if entry.is_expired() {
continue;
}
let key_len: u16 = entry.key.len().try_into().unwrap_or(u16::MAX);
result.extend_from_slice(&key_len.to_le_bytes());
result.extend_from_slice(&entry.key);
let value_bytes = entry.value.to_bytes();
let value_type = entry.value.type_tag();
let value_len: u16 = value_bytes.len().try_into().unwrap_or(u16::MAX);
result.extend_from_slice(&value_len.to_le_bytes());
result.extend_from_slice(&value_bytes);
result.push(value_type);
let ttl = entry.metadata.ttl_seconds.unwrap_or(0);
result.extend_from_slice(&ttl.to_le_bytes());
result.extend_from_slice(&entry.metadata.version.to_le_bytes());
}
}
result
}
pub fn from_bytes(&mut self, bytes: &[u8]) -> Result<(), String> {
use std::io::Read;
if bytes.len() < 4 {
return Err("Checkpoint data too short".to_string());
}
let mut cursor = std::io::Cursor::new(bytes);
let mut count_bytes = [0u8; 4];
cursor
.read_exact(&mut count_bytes)
.map_err(|e| e.to_string())?;
let count = u32::from_le_bytes(count_bytes) as usize;
let mut new_entries = HashMap::new();
for _ in 0..count {
let mut key_len_bytes = [0u8; 2];
if cursor.read_exact(&mut key_len_bytes).is_err() {
break; }
let key_len = u16::from_le_bytes(key_len_bytes) as usize;
let mut key = vec![0u8; key_len];
cursor.read_exact(&mut key).map_err(|e| e.to_string())?;
let mut value_len_bytes = [0u8; 2];
cursor
.read_exact(&mut value_len_bytes)
.map_err(|e| e.to_string())?;
let value_len = u16::from_le_bytes(value_len_bytes) as usize;
let mut value_bytes = vec![0u8; value_len];
cursor
.read_exact(&mut value_bytes)
.map_err(|e| e.to_string())?;
let mut value_type_byte = [0u8; 1];
cursor
.read_exact(&mut value_type_byte)
.map_err(|e| e.to_string())?;
let value_type = value_type_byte[0];
let value = KvValue::from_bytes(&value_bytes, value_type)
.ok_or_else(|| "Failed to deserialize value".to_string())?;
let mut ttl_bytes = [0u8; 8];
cursor
.read_exact(&mut ttl_bytes)
.map_err(|e| e.to_string())?;
let ttl = u64::from_le_bytes(ttl_bytes);
let ttl_seconds = if ttl > 0 { Some(ttl) } else { None };
let mut version_bytes = [0u8; 8];
cursor
.read_exact(&mut version_bytes)
.map_err(|e| e.to_string())?;
let version = u64::from_le_bytes(version_bytes);
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = KvEntry {
key: key.clone(),
value,
metadata: KvMetadata {
created_at: now,
updated_at: now,
ttl_seconds,
version,
},
};
let key_hash = hash_key(&key);
new_entries.insert(key_hash, vec![entry]);
}
*self.entries.write() = new_entries;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kv_store_basic_operations() {
let store = KvStore::new();
let snapshot = SnapshotId::current();
store.set(b"key1".to_vec(), KvValue::Integer(42), None, 1);
let value = store.get_at_snapshot(b"key1", snapshot);
assert_eq!(value, Some(KvValue::Integer(42)));
let value = store.get_at_snapshot(b"nonexistent", snapshot);
assert_eq!(value, None);
}
#[test]
fn test_kv_store_snapshot_isolation() {
let store = KvStore::new();
store.set(b"key".to_vec(), KvValue::Integer(1), None, 10);
store.set(b"key".to_vec(), KvValue::Integer(2), None, 20);
store.set(b"key".to_vec(), KvValue::Integer(3), None, 30);
let snapshot_15 = SnapshotId::from_lsn(15);
let snapshot_25 = SnapshotId::from_lsn(25);
let snapshot_35 = SnapshotId::from_lsn(35);
assert_eq!(
store.get_at_snapshot(b"key", snapshot_15),
Some(KvValue::Integer(1))
);
assert_eq!(
store.get_at_snapshot(b"key", snapshot_25),
Some(KvValue::Integer(2))
);
assert_eq!(
store.get_at_snapshot(b"key", snapshot_35),
Some(KvValue::Integer(3))
);
}
#[test]
fn test_kv_store_delete() {
let store = KvStore::new();
store.set(b"key".to_vec(), KvValue::Integer(42), None, 10);
store.delete(b"key", 20);
let snapshot_before = SnapshotId::from_lsn(15);
let snapshot_after = SnapshotId::from_lsn(25);
assert_eq!(
store.get_at_snapshot(b"key", snapshot_before),
Some(KvValue::Integer(42))
);
assert_eq!(store.get_at_snapshot(b"key", snapshot_after), None);
}
#[test]
fn test_kv_store_prefix_scan() {
let store = KvStore::new();
let snapshot = SnapshotId::from_lsn(100);
store.set(
b"user:1".to_vec(),
KvValue::String("Alice".to_string()),
None,
10,
);
store.set(
b"user:2".to_vec(),
KvValue::String("Bob".to_string()),
None,
10,
);
store.set(
b"user:3".to_vec(),
KvValue::String("Charlie".to_string()),
None,
10,
);
store.set(
b"other".to_vec(),
KvValue::String("Other".to_string()),
None,
10,
);
let results = store.prefix_scan(b"user:", snapshot);
assert_eq!(results.len(), 3);
assert_eq!(results[0].0, b"user:1".to_vec());
assert_eq!(results[1].0, b"user:2".to_vec());
assert_eq!(results[2].0, b"user:3".to_vec());
}
#[test]
fn test_kv_store_ttl_expiration() {
let store = KvStore::new();
store.set(b"expired".to_vec(), KvValue::Integer(1), Some(0), 10);
store.set(b"valid".to_vec(), KvValue::Integer(2), None, 10);
let snapshot = SnapshotId::from_lsn(100);
assert_eq!(store.get_at_snapshot(b"expired", snapshot), None);
assert_eq!(
store.get_at_snapshot(b"valid", snapshot),
Some(KvValue::Integer(2))
);
}
}