use std::collections::HashMap;
use std::sync::{Arc, PoisonError, RwLock};
use crate::error::Result;
use crate::timestamp::Timestamp;
pub type WriteEntry = (Arc<[u8]>, Option<Arc<[u8]>>);
pub trait VersionStore: Send + Sync {
fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>>;
fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>>;
fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()>;
}
#[derive(Debug, Clone)]
struct Version {
commit_ts: Timestamp,
value: Option<Arc<[u8]>>,
}
#[derive(Debug, Default)]
pub struct MemoryStore {
chains: RwLock<HashMap<Arc<[u8]>, Vec<Version>>>,
}
impl MemoryStore {
#[inline]
#[must_use]
pub fn new() -> Self {
MemoryStore {
chains: RwLock::new(HashMap::new()),
}
}
#[must_use]
pub fn key_count(&self) -> usize {
read_guard(&self.chains).len()
}
}
impl VersionStore for MemoryStore {
fn get(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Arc<[u8]>>> {
let chains = read_guard(&self.chains);
let Some(versions) = chains.get(key) else {
return Ok(None);
};
let visible = versions.partition_point(|v| v.commit_ts <= read_ts);
match visible.checked_sub(1).map(|i| &versions[i]) {
Some(version) => Ok(version.value.clone()),
None => Ok(None),
}
}
fn latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>> {
let chains = read_guard(&self.chains);
Ok(chains.get(key).and_then(|v| v.last()).map(|v| v.commit_ts))
}
fn apply(&self, commit_ts: Timestamp, writes: Vec<WriteEntry>) -> Result<()> {
let mut chains = write_guard(&self.chains);
for (key, value) in writes {
chains
.entry(key)
.or_default()
.push(Version { commit_ts, value });
}
Ok(())
}
}
#[inline]
fn read_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockReadGuard<'_, T> {
lock.read().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
fn write_guard<T>(lock: &RwLock<T>) -> std::sync::RwLockWriteGuard<'_, T> {
lock.write().unwrap_or_else(PoisonError::into_inner)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn k(b: &[u8]) -> Arc<[u8]> {
Arc::from(b)
}
fn v(b: &[u8]) -> Option<Arc<[u8]>> {
Some(Arc::from(b))
}
#[test]
fn test_get_on_missing_key_returns_none() {
let store = MemoryStore::new();
assert_eq!(store.get(b"absent", Timestamp::from_raw(10)).unwrap(), None);
}
#[test]
fn test_read_sees_only_versions_at_or_before_snapshot() {
let store = MemoryStore::new();
store
.apply(Timestamp::from_raw(2), vec![(k(b"x"), v(b"a"))])
.unwrap();
store
.apply(Timestamp::from_raw(4), vec![(k(b"x"), v(b"b"))])
.unwrap();
assert_eq!(store.get(b"x", Timestamp::from_raw(1)).unwrap(), None);
assert_eq!(
store.get(b"x", Timestamp::from_raw(2)).unwrap().as_deref(),
Some(&b"a"[..])
);
assert_eq!(
store.get(b"x", Timestamp::from_raw(3)).unwrap().as_deref(),
Some(&b"a"[..])
);
assert_eq!(
store.get(b"x", Timestamp::from_raw(4)).unwrap().as_deref(),
Some(&b"b"[..])
);
assert_eq!(
store.get(b"x", Timestamp::from_raw(99)).unwrap().as_deref(),
Some(&b"b"[..])
);
}
#[test]
fn test_tombstone_reads_as_absent() {
let store = MemoryStore::new();
store
.apply(Timestamp::from_raw(1), vec![(k(b"x"), v(b"a"))])
.unwrap();
store
.apply(Timestamp::from_raw(2), vec![(k(b"x"), None)])
.unwrap();
assert_eq!(
store.get(b"x", Timestamp::from_raw(1)).unwrap().as_deref(),
Some(&b"a"[..])
);
assert_eq!(store.get(b"x", Timestamp::from_raw(2)).unwrap(), None);
}
#[test]
fn test_latest_commit_ts_tracks_newest_write() {
let store = MemoryStore::new();
assert_eq!(store.latest_commit_ts(b"x").unwrap(), None);
store
.apply(Timestamp::from_raw(3), vec![(k(b"x"), v(b"a"))])
.unwrap();
store
.apply(Timestamp::from_raw(7), vec![(k(b"x"), None)])
.unwrap();
assert_eq!(
store.latest_commit_ts(b"x").unwrap(),
Some(Timestamp::from_raw(7))
);
}
#[test]
fn test_key_count_counts_distinct_keys() {
let store = MemoryStore::new();
store
.apply(
Timestamp::from_raw(1),
vec![(k(b"a"), v(b"1")), (k(b"b"), v(b"2"))],
)
.unwrap();
store
.apply(Timestamp::from_raw(2), vec![(k(b"a"), v(b"3"))])
.unwrap();
assert_eq!(store.key_count(), 2);
}
}