use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
pub const DEFAULT_EPOCH_DURATION_MS: u64 = 10;
pub const MAX_EPOCHS_IN_MEMORY: usize = 100;
pub const MIN_ENTRIES_PER_EPOCH: usize = 1000;
pub struct EpochManager {
current_epoch: AtomicU64,
epoch_start_time: AtomicU64,
epoch_duration_ns: u64,
active_readers: RwLock<BTreeMap<u64, u64>>,
min_safe_epoch: AtomicU64,
}
impl EpochManager {
pub fn new() -> Self {
Self::with_duration_ms(DEFAULT_EPOCH_DURATION_MS)
}
pub fn with_duration_ms(duration_ms: u64) -> Self {
let now = Self::current_time_ns();
Self {
current_epoch: AtomicU64::new(1),
epoch_start_time: AtomicU64::new(now),
epoch_duration_ns: duration_ms * 1_000_000,
active_readers: RwLock::new(BTreeMap::new()),
min_safe_epoch: AtomicU64::new(1),
}
}
#[inline]
pub fn current_epoch(&self) -> u64 {
self.current_epoch.load(Ordering::Acquire)
}
#[inline]
pub fn min_safe_epoch(&self) -> u64 {
self.min_safe_epoch.load(Ordering::Acquire)
}
pub fn should_advance(&self) -> bool {
let now = Self::current_time_ns();
let start = self.epoch_start_time.load(Ordering::Relaxed);
now.saturating_sub(start) >= self.epoch_duration_ns
}
pub fn advance_epoch(&self) -> u64 {
let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
self.epoch_start_time.store(Self::current_time_ns(), Ordering::Relaxed);
new_epoch
}
pub fn register_reader(&self, epoch: u64) {
let mut readers = self.active_readers.write();
*readers.entry(epoch).or_insert(0) += 1;
}
pub fn unregister_reader(&self, epoch: u64) {
let mut readers = self.active_readers.write();
if let Some(count) = readers.get_mut(&epoch) {
*count = count.saturating_sub(1);
if *count == 0 {
readers.remove(&epoch);
if let Some(&min_epoch) = readers.keys().next() {
self.min_safe_epoch.store(min_epoch, Ordering::Release);
} else {
self.min_safe_epoch.store(
self.current_epoch.load(Ordering::Relaxed),
Ordering::Release,
);
}
}
}
}
pub fn gc_eligible_epochs(&self) -> Vec<u64> {
let min_safe = self.min_safe_epoch.load(Ordering::Acquire);
let readers = self.active_readers.read();
readers
.keys()
.filter(|&&e| e < min_safe)
.copied()
.collect()
}
#[inline]
fn current_time_ns() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
}
impl Default for EpochManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct VersionEntry<V> {
pub value: V,
pub epoch: u64,
pub txn_id: u64,
pub is_delete: bool,
}
impl<V> VersionEntry<V> {
pub fn new(value: V, epoch: u64, txn_id: u64) -> Self {
Self {
value,
epoch,
txn_id,
is_delete: false,
}
}
pub fn tombstone(epoch: u64, txn_id: u64) -> Self
where
V: Default,
{
Self {
value: V::default(),
epoch,
txn_id,
is_delete: true,
}
}
}
pub struct EpochVersionChain<V> {
versions: RwLock<BTreeMap<u64, VersionEntry<V>>>,
latest_epoch: AtomicU64,
}
impl<V: Clone> EpochVersionChain<V> {
pub fn new() -> Self {
Self {
versions: RwLock::new(BTreeMap::new()),
latest_epoch: AtomicU64::new(0),
}
}
pub fn add_version(&self, epoch: u64, entry: VersionEntry<V>) {
let mut versions = self.versions.write();
versions.insert(epoch, entry);
let current = self.latest_epoch.load(Ordering::Relaxed);
if epoch > current {
self.latest_epoch.store(epoch, Ordering::Release);
}
}
pub fn read_at_epoch(&self, target_epoch: u64) -> Option<V> {
let latest = self.latest_epoch.load(Ordering::Acquire);
if target_epoch >= latest {
let versions = self.versions.read();
return versions.get(&latest).and_then(|v| {
if v.is_delete {
None
} else {
Some(v.value.clone())
}
});
}
let versions = self.versions.read();
versions
.range(..=target_epoch)
.next_back()
.and_then(|(_, v)| {
if v.is_delete {
None
} else {
Some(v.value.clone())
}
})
}
pub fn all_versions(&self) -> Vec<(u64, VersionEntry<V>)> {
self.versions
.read()
.iter()
.map(|(&e, v)| (e, v.clone()))
.collect()
}
pub fn gc_before_epoch(&self, epoch: u64) -> usize {
let mut versions = self.versions.write();
let old_len = versions.len();
versions.retain(|&e, _| e >= epoch);
old_len - versions.len()
}
pub fn version_count(&self) -> usize {
self.versions.read().len()
}
pub fn is_empty(&self) -> bool {
self.versions.read().is_empty()
}
}
impl<V: Clone> Default for EpochVersionChain<V> {
fn default() -> Self {
Self::new()
}
}
pub type Key = Vec<u8>;
pub struct EpochMvccStore<V> {
data: dashmap::DashMap<Key, EpochVersionChain<V>>,
epoch_manager: Arc<EpochManager>,
next_txn_id: AtomicU64,
}
impl<V: Clone + Send + Sync + 'static> EpochMvccStore<V> {
pub fn new() -> Self {
Self::with_epoch_manager(Arc::new(EpochManager::new()))
}
pub fn with_epoch_manager(epoch_manager: Arc<EpochManager>) -> Self {
Self {
data: dashmap::DashMap::new(),
epoch_manager,
next_txn_id: AtomicU64::new(1),
}
}
pub fn epoch_manager(&self) -> &Arc<EpochManager> {
&self.epoch_manager
}
pub fn begin_txn(&self) -> EpochTransaction<'_, V> {
let epoch = self.epoch_manager.current_epoch();
let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
self.epoch_manager.register_reader(epoch);
EpochTransaction {
txn_id,
read_epoch: epoch,
write_buffer: Vec::new(),
store: self,
}
}
fn write(&self, key: Key, value: V, epoch: u64, txn_id: u64) {
let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
chain.add_version(epoch, VersionEntry::new(value, epoch, txn_id));
}
fn delete(&self, key: Key, epoch: u64, txn_id: u64)
where
V: Default,
{
let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
chain.add_version(epoch, VersionEntry::tombstone(epoch, txn_id));
}
pub fn read_at_epoch(&self, key: &[u8], epoch: u64) -> Option<V> {
self.data.get(key).and_then(|chain| chain.read_at_epoch(epoch))
}
pub fn maybe_advance_epoch(&self) -> Option<u64> {
if self.epoch_manager.should_advance() {
Some(self.epoch_manager.advance_epoch())
} else {
None
}
}
pub fn gc(&self) -> GcStats {
let min_safe = self.epoch_manager.min_safe_epoch();
let mut stats = GcStats::default();
for mut entry in self.data.iter_mut() {
let removed = entry.value_mut().gc_before_epoch(min_safe);
stats.versions_removed += removed;
if entry.value().is_empty() {
stats.chains_emptied += 1;
}
}
self.data.retain(|_, chain| !chain.is_empty());
stats
}
pub fn stats(&self) -> StoreStats {
let mut total_versions = 0;
let mut max_versions_per_key = 0;
for entry in self.data.iter() {
let count = entry.value().version_count();
total_versions += count;
max_versions_per_key = max_versions_per_key.max(count);
}
StoreStats {
key_count: self.data.len(),
total_versions,
max_versions_per_key,
current_epoch: self.epoch_manager.current_epoch(),
min_safe_epoch: self.epoch_manager.min_safe_epoch(),
}
}
}
impl<V: Clone + Send + Sync + 'static> Default for EpochMvccStore<V> {
fn default() -> Self {
Self::new()
}
}
pub struct EpochTransaction<'a, V> {
txn_id: u64,
read_epoch: u64,
write_buffer: Vec<WriteOp<V>>,
store: &'a EpochMvccStore<V>,
}
enum WriteOp<V> {
Put(Key, V),
Delete(Key),
}
impl<'a, V: Clone + Send + Sync + Default + 'static> EpochTransaction<'a, V> {
pub fn txn_id(&self) -> u64 {
self.txn_id
}
pub fn read_epoch(&self) -> u64 {
self.read_epoch
}
pub fn get(&self, key: &[u8]) -> Option<V> {
for op in self.write_buffer.iter().rev() {
match op {
WriteOp::Put(k, v) if k == key => return Some(v.clone()),
WriteOp::Delete(k) if k == key => return None,
_ => {}
}
}
self.store.read_at_epoch(key, self.read_epoch)
}
pub fn put(&mut self, key: Key, value: V) {
self.write_buffer.push(WriteOp::Put(key, value));
}
pub fn delete(&mut self, key: Key) {
self.write_buffer.push(WriteOp::Delete(key));
}
pub fn commit(mut self) -> CommitResult {
let commit_epoch = self.store.epoch_manager.current_epoch();
let write_count = self.write_buffer.len();
for op in self.write_buffer.drain(..) {
match op {
WriteOp::Put(key, value) => {
self.store.write(key, value, commit_epoch, self.txn_id);
}
WriteOp::Delete(key) => {
self.store.delete(key, commit_epoch, self.txn_id);
}
}
}
self.store.epoch_manager.unregister_reader(self.read_epoch);
CommitResult {
txn_id: self.txn_id,
commit_epoch,
write_count,
}
}
pub fn abort(self) {
self.store.epoch_manager.unregister_reader(self.read_epoch);
}
}
impl<'a, V> Drop for EpochTransaction<'a, V> {
fn drop(&mut self) {
}
}
#[derive(Debug)]
pub struct CommitResult {
pub txn_id: u64,
pub commit_epoch: u64,
pub write_count: usize,
}
#[derive(Debug, Default)]
pub struct GcStats {
pub versions_removed: usize,
pub chains_emptied: usize,
}
#[derive(Debug)]
pub struct StoreStats {
pub key_count: usize,
pub total_versions: usize,
pub max_versions_per_key: usize,
pub current_epoch: u64,
pub min_safe_epoch: u64,
}
pub struct EpochSnapshot<'a, V> {
epoch: u64,
store: &'a EpochMvccStore<V>,
}
impl<'a, V: Clone + Send + Sync + 'static> EpochSnapshot<'a, V> {
pub fn new(store: &'a EpochMvccStore<V>) -> Self {
let epoch = store.epoch_manager.current_epoch();
store.epoch_manager.register_reader(epoch);
Self { epoch, store }
}
pub fn at_epoch(store: &'a EpochMvccStore<V>, epoch: u64) -> Self {
store.epoch_manager.register_reader(epoch);
Self { epoch, store }
}
pub fn epoch(&self) -> u64 {
self.epoch
}
pub fn get(&self, key: &[u8]) -> Option<V> {
self.store.read_at_epoch(key, self.epoch)
}
pub fn keys(&self) -> Vec<Key> {
self.store
.data
.iter()
.filter(|e| e.value().read_at_epoch(self.epoch).is_some())
.map(|e| e.key().clone())
.collect()
}
}
impl<'a, V> Drop for EpochSnapshot<'a, V> {
fn drop(&mut self) {
self.store.epoch_manager.unregister_reader(self.epoch);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_epoch_manager_basics() {
let manager = EpochManager::with_duration_ms(1);
assert_eq!(manager.current_epoch(), 1);
std::thread::sleep(std::time::Duration::from_millis(2));
assert!(manager.should_advance());
let new_epoch = manager.advance_epoch();
assert_eq!(new_epoch, 2);
assert_eq!(manager.current_epoch(), 2);
}
#[test]
fn test_epoch_reader_tracking() {
let manager = EpochManager::new();
manager.register_reader(1);
manager.register_reader(1);
manager.register_reader(2);
assert_eq!(manager.min_safe_epoch(), 1);
manager.unregister_reader(1);
assert_eq!(manager.min_safe_epoch(), 1);
manager.unregister_reader(1);
assert_eq!(manager.min_safe_epoch(), 2); }
#[test]
fn test_version_chain_read_at_epoch() {
let chain: EpochVersionChain<String> = EpochVersionChain::new();
chain.add_version(1, VersionEntry::new("v1".to_string(), 1, 1));
chain.add_version(3, VersionEntry::new("v3".to_string(), 3, 3));
chain.add_version(5, VersionEntry::new("v5".to_string(), 5, 5));
assert_eq!(chain.read_at_epoch(0), None);
assert_eq!(chain.read_at_epoch(1), Some("v1".to_string()));
assert_eq!(chain.read_at_epoch(2), Some("v1".to_string()));
assert_eq!(chain.read_at_epoch(3), Some("v3".to_string()));
assert_eq!(chain.read_at_epoch(4), Some("v3".to_string()));
assert_eq!(chain.read_at_epoch(5), Some("v5".to_string()));
assert_eq!(chain.read_at_epoch(100), Some("v5".to_string()));
}
#[test]
fn test_version_chain_delete() {
let chain: EpochVersionChain<String> = EpochVersionChain::new();
chain.add_version(1, VersionEntry::new("value".to_string(), 1, 1));
chain.add_version(2, VersionEntry::tombstone(2, 2));
chain.add_version(3, VersionEntry::new("resurrected".to_string(), 3, 3));
assert_eq!(chain.read_at_epoch(1), Some("value".to_string()));
assert_eq!(chain.read_at_epoch(2), None); assert_eq!(chain.read_at_epoch(3), Some("resurrected".to_string()));
}
#[test]
fn test_version_chain_gc() {
let chain: EpochVersionChain<i32> = EpochVersionChain::new();
for i in 1..=10 {
chain.add_version(i, VersionEntry::new(i as i32, i, i));
}
assert_eq!(chain.version_count(), 10);
let removed = chain.gc_before_epoch(5);
assert_eq!(removed, 4);
assert_eq!(chain.version_count(), 6);
assert_eq!(chain.read_at_epoch(4), None);
assert_eq!(chain.read_at_epoch(5), Some(5));
}
#[test]
fn test_mvcc_store_basic() {
let store: EpochMvccStore<String> = EpochMvccStore::new();
let mut txn = store.begin_txn();
txn.put(b"key1".to_vec(), "value1".to_string());
txn.put(b"key2".to_vec(), "value2".to_string());
let result = txn.commit();
assert_eq!(result.write_count, 2);
let txn2 = store.begin_txn();
assert_eq!(txn2.get(b"key1"), Some("value1".to_string()));
assert_eq!(txn2.get(b"key2"), Some("value2".to_string()));
txn2.abort();
}
#[test]
fn test_mvcc_store_snapshot_isolation() {
let store: EpochMvccStore<i32> = EpochMvccStore::new();
let mut txn1 = store.begin_txn();
txn1.put(b"x".to_vec(), 1);
txn1.commit();
store.epoch_manager().advance_epoch();
let snapshot = EpochSnapshot::new(&store);
assert_eq!(snapshot.get(b"x"), Some(1));
store.epoch_manager().advance_epoch();
let mut txn2 = store.begin_txn();
txn2.put(b"x".to_vec(), 2);
txn2.commit();
assert_eq!(snapshot.get(b"x"), Some(1));
let txn3 = store.begin_txn();
assert_eq!(txn3.get(b"x"), Some(2));
txn3.abort();
}
#[test]
fn test_mvcc_store_delete() {
let store: EpochMvccStore<String> = EpochMvccStore::new();
let mut txn1 = store.begin_txn();
txn1.put(b"key".to_vec(), "value".to_string());
txn1.commit();
store.epoch_manager().advance_epoch();
let snap = EpochSnapshot::new(&store);
store.epoch_manager().advance_epoch();
let mut txn2 = store.begin_txn();
txn2.delete(b"key".to_vec());
txn2.commit();
assert_eq!(snap.get(b"key"), Some("value".to_string()));
let txn3 = store.begin_txn();
assert_eq!(txn3.get(b"key"), None);
txn3.abort();
}
#[test]
fn test_mvcc_store_write_buffer() {
let store: EpochMvccStore<i32> = EpochMvccStore::new();
let mut txn = store.begin_txn();
txn.put(b"a".to_vec(), 1);
txn.put(b"b".to_vec(), 2);
assert_eq!(txn.get(b"a"), Some(1));
assert_eq!(txn.get(b"b"), Some(2));
txn.put(b"a".to_vec(), 10);
assert_eq!(txn.get(b"a"), Some(10));
txn.delete(b"b".to_vec());
assert_eq!(txn.get(b"b"), None);
txn.commit();
}
#[test]
fn test_mvcc_store_gc() {
let store: EpochMvccStore<i32> = EpochMvccStore::new();
for i in 0..5 {
let mut txn = store.begin_txn();
txn.put(b"key".to_vec(), i);
txn.commit();
store.epoch_manager().advance_epoch();
}
let stats = store.stats();
assert!(stats.total_versions >= 5);
let gc_stats = store.gc();
assert!(gc_stats.versions_removed >= 0);
}
#[test]
fn test_epoch_snapshot_keys() {
let store: EpochMvccStore<i32> = EpochMvccStore::new();
let mut txn = store.begin_txn();
txn.put(b"a".to_vec(), 1);
txn.put(b"b".to_vec(), 2);
txn.put(b"c".to_vec(), 3);
txn.commit();
let snap = EpochSnapshot::new(&store);
let keys = snap.keys();
assert_eq!(keys.len(), 3);
assert!(keys.contains(&b"a".to_vec()));
assert!(keys.contains(&b"b".to_vec()));
assert!(keys.contains(&b"c".to_vec()));
}
}