use dashmap::DashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct VersionId {
pub epoch: u64,
pub sequence: u32,
}
impl VersionId {
pub fn new(epoch: u64, sequence: u32) -> Self {
Self { epoch, sequence }
}
pub fn is_stale(&self, watermark: u64) -> bool {
self.epoch < watermark
}
}
#[derive(Debug, Clone)]
pub struct VersionedValue<T> {
pub version: VersionId,
pub value: T,
pub deleted: bool,
}
impl<T> VersionedValue<T> {
pub fn new(version: VersionId, value: T) -> Self {
Self {
version,
value,
deleted: false,
}
}
pub fn tombstone(version: VersionId, value: T) -> Self {
Self {
version,
value,
deleted: true,
}
}
}
#[derive(Debug)]
pub struct VersionChain<T> {
versions: VecDeque<VersionedValue<T>>,
total_versions: u64,
}
impl<T: Clone> VersionChain<T> {
pub fn new() -> Self {
Self {
versions: VecDeque::new(),
total_versions: 0,
}
}
pub fn add_version(&mut self, version: VersionedValue<T>) {
self.versions.push_front(version);
self.total_versions += 1;
}
pub fn latest(&self) -> Option<&VersionedValue<T>> {
self.versions.front()
}
pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
for v in &self.versions {
if v.version.epoch < epoch {
if v.deleted {
return None;
}
return Some(v);
}
}
None
}
pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
let initial_len = self.versions.len();
let mut kept = 0;
let mut found_base = false;
for v in self.versions.iter() {
if v.version.epoch >= watermark {
kept += 1;
} else if !found_base {
found_base = true;
kept += 1;
}
}
self.versions.truncate(kept);
let removed = initial_len - self.versions.len();
let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
(removed, bytes_freed)
}
pub fn len(&self) -> usize {
self.versions.len()
}
pub fn is_empty(&self) -> bool {
self.versions.is_empty()
}
}
impl<T: Clone> Default for VersionChain<T> {
fn default() -> Self {
Self::new()
}
}
const MAX_READER_SLOTS: usize = 256;
const SLOT_EMPTY: u64 = u64::MAX;
#[derive(Debug)]
#[repr(C, align(64))]
struct EpochSlot {
epoch: AtomicU64,
}
impl EpochSlot {
const fn empty() -> Self {
Self {
epoch: AtomicU64::new(SLOT_EMPTY),
}
}
}
#[derive(Debug)]
pub struct ReaderRegistry {
slots: Box<[EpochSlot; MAX_READER_SLOTS]>,
active_count: AtomicUsize,
}
impl ReaderRegistry {
pub fn new() -> Self {
let slots: Box<[EpochSlot; MAX_READER_SLOTS]> = {
let mut v: Vec<EpochSlot> = Vec::with_capacity(MAX_READER_SLOTS);
for _ in 0..MAX_READER_SLOTS {
v.push(EpochSlot::empty());
}
v.into_boxed_slice().try_into().ok().unwrap()
};
Self {
slots,
active_count: AtomicUsize::new(0),
}
}
pub fn register(&self, epoch: u64) -> Option<u64> {
for (i, slot) in self.slots.iter().enumerate() {
if slot
.epoch
.compare_exchange(SLOT_EMPTY, epoch, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.active_count.fetch_add(1, Ordering::Relaxed);
return Some(i as u64);
}
}
None
}
pub fn unregister(&self, reader_id: u64) {
let idx = reader_id as usize;
if idx < MAX_READER_SLOTS {
let prev = self.slots[idx].epoch.swap(SLOT_EMPTY, Ordering::Release);
if prev != SLOT_EMPTY {
self.active_count.fetch_sub(1, Ordering::Relaxed);
}
}
}
pub fn min_active_epoch(&self) -> Option<u64> {
let mut min_epoch = u64::MAX;
for slot in self.slots.iter() {
let e = slot.epoch.load(Ordering::Relaxed);
if e != SLOT_EMPTY && e < min_epoch {
min_epoch = e;
}
}
if min_epoch == u64::MAX {
None
} else {
Some(min_epoch)
}
}
pub fn active_count(&self) -> usize {
self.active_count.load(Ordering::Relaxed)
}
}
impl Default for ReaderRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct GCStats {
pub gc_cycles: AtomicU64,
pub versions_collected: AtomicU64,
pub bytes_freed: AtomicU64,
pub chains_scanned: AtomicU64,
pub last_gc_epoch: AtomicU64,
pub last_gc_duration_us: AtomicU64,
}
impl GCStats {
pub fn snapshot(&self) -> GCStatsSnapshot {
GCStatsSnapshot {
gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
versions_collected: self.versions_collected.load(Ordering::Relaxed),
bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct GCStatsSnapshot {
pub gc_cycles: u64,
pub versions_collected: u64,
pub bytes_freed: u64,
pub chains_scanned: u64,
pub last_gc_epoch: u64,
pub last_gc_duration_us: u64,
}
#[derive(Debug, Clone)]
pub struct GCConfig {
pub min_epochs_to_keep: u64,
pub gc_trigger_threshold: usize,
pub max_versions_per_cycle: usize,
}
impl Default for GCConfig {
fn default() -> Self {
Self {
min_epochs_to_keep: 2,
gc_trigger_threshold: 1000,
max_versions_per_cycle: 10000,
}
}
}
pub struct EpochGC<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
current_epoch: AtomicU64,
current_sequence: AtomicU64,
chains: DashMap<K, VersionChain<V>>,
readers: Arc<ReaderRegistry>,
config: GCConfig,
stats: GCStats,
pending_versions: AtomicUsize,
}
impl<K, V> EpochGC<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
pub fn new() -> Self {
Self::with_config(GCConfig::default())
}
pub fn with_config(config: GCConfig) -> Self {
Self {
current_epoch: AtomicU64::new(0),
current_sequence: AtomicU64::new(0),
chains: DashMap::new(),
readers: Arc::new(ReaderRegistry::new()),
config,
stats: GCStats::default(),
pending_versions: AtomicUsize::new(0),
}
}
pub fn current_epoch(&self) -> u64 {
self.current_epoch.load(Ordering::SeqCst)
}
pub fn advance_epoch(&self) -> u64 {
self.current_sequence.store(0, Ordering::SeqCst);
self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn next_version(&self) -> VersionId {
let epoch = self.current_epoch.load(Ordering::SeqCst);
let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
VersionId::new(epoch, seq)
}
pub fn insert(&self, key: K, value: V) -> VersionId {
let version = self.next_version();
let versioned = VersionedValue::new(version, value);
self.chains.entry(key).or_default().add_version(versioned);
let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
if pending >= self.config.gc_trigger_threshold {
self.try_gc();
}
version
}
pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
let version = self.next_version();
let versioned = VersionedValue::tombstone(version, tombstone_value);
self.chains.entry(key).or_default().add_version(versioned);
self.pending_versions.fetch_add(1, Ordering::Relaxed);
version
}
pub fn get(&self, key: &K) -> Option<V> {
self.chains
.get(key)
.and_then(|entry| entry.latest().cloned())
.filter(|v| !v.deleted)
.map(|v| v.value.clone())
}
pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
self.chains
.get(key)
.and_then(|entry| entry.version_at(epoch).cloned())
.map(|v| v.value.clone())
}
pub fn begin_read(&self) -> ReadGuard {
let epoch = self.current_epoch.load(Ordering::SeqCst);
let reader_id = self.readers.register(epoch).unwrap_or_else(|| {
panic!(
"EpochGC: all {} reader slots exhausted — too many concurrent readers. \
Ensure ReadGuards are dropped promptly.",
MAX_READER_SLOTS
)
});
ReadGuard {
epoch,
reader_id,
registry: Arc::clone(&self.readers),
}
}
pub fn watermark(&self) -> u64 {
let current = self.current_epoch.load(Ordering::SeqCst);
let min_reader = self.readers.min_active_epoch().unwrap_or(current);
let grace = current.saturating_sub(self.config.min_epochs_to_keep);
grace.min(min_reader)
}
pub fn try_gc(&self) -> GCResult {
let start = std::time::Instant::now();
let watermark = self.watermark();
let mut versions_collected = 0;
let mut bytes_freed = 0;
let mut chains_scanned = 0;
let keys: Vec<K> = self
.chains
.iter()
.map(|entry| entry.key().clone())
.collect();
for key in keys {
if chains_scanned >= self.config.max_versions_per_cycle {
break;
}
if let Some(mut entry) = self.chains.get_mut(&key) {
let (removed, freed) = entry.gc(watermark);
versions_collected += removed;
bytes_freed += freed;
chains_scanned += 1;
}
}
self.chains.retain(|_, chain| !chain.is_empty());
let duration = start.elapsed();
self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
self.stats
.versions_collected
.fetch_add(versions_collected as u64, Ordering::Relaxed);
self.stats
.bytes_freed
.fetch_add(bytes_freed as u64, Ordering::Relaxed);
self.stats
.chains_scanned
.fetch_add(chains_scanned as u64, Ordering::Relaxed);
self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
self.stats
.last_gc_duration_us
.store(duration.as_micros() as u64, Ordering::Relaxed);
self.pending_versions.store(0, Ordering::Relaxed);
GCResult {
versions_collected,
bytes_freed,
chains_scanned,
watermark,
duration_us: duration.as_micros() as u64,
}
}
pub fn force_gc(&self) -> GCResult {
let _old_limit = self.config.max_versions_per_cycle;
let _config = GCConfig {
max_versions_per_cycle: usize::MAX,
..self.config.clone()
};
let start = std::time::Instant::now();
let watermark = self.watermark();
let mut versions_collected = 0;
let mut bytes_freed = 0;
let mut chains_scanned = 0;
for mut entry in self.chains.iter_mut() {
let (removed, freed) = entry.value_mut().gc(watermark);
versions_collected += removed;
bytes_freed += freed;
chains_scanned += 1;
}
self.chains.retain(|_, chain| !chain.is_empty());
let duration = start.elapsed();
self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
self.stats
.versions_collected
.fetch_add(versions_collected as u64, Ordering::Relaxed);
self.stats
.bytes_freed
.fetch_add(bytes_freed as u64, Ordering::Relaxed);
self.stats
.chains_scanned
.fetch_add(chains_scanned as u64, Ordering::Relaxed);
self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
self.stats
.last_gc_duration_us
.store(duration.as_micros() as u64, Ordering::Relaxed);
self.pending_versions.store(0, Ordering::Relaxed);
GCResult {
versions_collected,
bytes_freed,
chains_scanned,
watermark,
duration_us: duration.as_micros() as u64,
}
}
pub fn stats(&self) -> GCStatsSnapshot {
self.stats.snapshot()
}
pub fn version_count(&self) -> usize {
self.chains.iter().map(|entry| entry.value().len()).sum()
}
pub fn chain_count(&self) -> usize {
self.chains.len()
}
}
impl<K, V> Default for EpochGC<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct GCResult {
pub versions_collected: usize,
pub bytes_freed: usize,
pub chains_scanned: usize,
pub watermark: u64,
pub duration_us: u64,
}
pub struct ReadGuard {
pub epoch: u64,
reader_id: u64,
registry: Arc<ReaderRegistry>,
}
impl Drop for ReadGuard {
fn drop(&mut self) {
self.registry.unregister(self.reader_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_id() {
let v1 = VersionId::new(1, 0);
let v2 = VersionId::new(2, 0);
assert!(v1 < v2);
assert!(v1.is_stale(2));
assert!(!v2.is_stale(2));
}
#[test]
fn test_version_chain_basic() {
let mut chain: VersionChain<String> = VersionChain::new();
chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
assert_eq!(chain.len(), 2);
assert_eq!(chain.latest().unwrap().value, "v2");
}
#[test]
fn test_version_chain_gc() {
let mut chain: VersionChain<String> = VersionChain::new();
for epoch in 0..5 {
chain.add_version(VersionedValue::new(
VersionId::new(epoch, 0),
format!("v{}", epoch),
));
}
assert_eq!(chain.len(), 5);
let (removed, _) = chain.gc(3);
assert!(removed > 0);
assert!(chain.len() < 5);
}
#[test]
fn test_reader_registry() {
let registry = ReaderRegistry::new();
let r1 = registry.register(10).unwrap();
let _r2 = registry.register(20).unwrap();
assert_eq!(registry.active_count(), 2);
assert_eq!(registry.min_active_epoch(), Some(10));
registry.unregister(r1);
assert_eq!(registry.active_count(), 1);
assert_eq!(registry.min_active_epoch(), Some(20));
}
#[test]
fn test_epoch_gc_basic() {
let gc: EpochGC<String, i32> = EpochGC::new();
let _v1 = gc.insert("key1".to_string(), 100);
let _v2 = gc.insert("key1".to_string(), 200);
assert_eq!(gc.get(&"key1".to_string()), Some(200));
assert_eq!(gc.version_count(), 2);
}
#[test]
fn test_epoch_gc_delete() {
let gc: EpochGC<String, i32> = EpochGC::new();
gc.insert("key1".to_string(), 100);
gc.delete("key1".to_string(), 0);
assert_eq!(gc.get(&"key1".to_string()), None);
}
#[test]
fn test_epoch_gc_at_epoch() {
let gc: EpochGC<String, i32> = EpochGC::new();
gc.insert("key1".to_string(), 100);
gc.advance_epoch();
gc.insert("key1".to_string(), 200);
gc.advance_epoch();
gc.insert("key1".to_string(), 300);
assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(100)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(200)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 3), Some(300)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), None); }
#[test]
fn test_read_guard() {
let gc: EpochGC<String, i32> = EpochGC::new();
gc.insert("key1".to_string(), 100);
{
let _guard = gc.begin_read();
assert_eq!(gc.readers.active_count(), 1);
}
assert_eq!(gc.readers.active_count(), 0);
}
#[test]
fn test_watermark_calculation() {
let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
min_epochs_to_keep: 2,
..Default::default()
});
gc.insert("k".to_string(), 1);
gc.advance_epoch(); gc.insert("k".to_string(), 2);
gc.advance_epoch(); gc.insert("k".to_string(), 3);
gc.advance_epoch(); gc.insert("k".to_string(), 4);
gc.advance_epoch();
assert!(gc.watermark() <= 2);
let _guard = gc.begin_read();
assert!(gc.watermark() <= gc.current_epoch());
}
#[test]
fn test_gc_cycle() {
let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
min_epochs_to_keep: 1,
gc_trigger_threshold: 100,
max_versions_per_cycle: 100,
});
for i in 0..10 {
gc.insert("key".to_string(), i);
gc.advance_epoch();
}
assert_eq!(gc.version_count(), 10);
let result = gc.try_gc();
assert!(result.versions_collected > 0 || gc.version_count() < 10);
}
#[test]
fn test_gc_stats() {
let gc: EpochGC<String, i32> = EpochGC::new();
for i in 0..5 {
gc.insert("key".to_string(), i);
gc.advance_epoch();
}
gc.try_gc();
let stats = gc.stats();
assert!(stats.gc_cycles >= 1);
}
#[test]
fn test_force_gc() {
let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
min_epochs_to_keep: 0,
gc_trigger_threshold: 1000,
max_versions_per_cycle: 1, });
for i in 0..20 {
gc.insert(format!("key{}", i), i);
}
gc.advance_epoch();
gc.advance_epoch();
let initial_count = gc.version_count();
gc.force_gc();
let final_count = gc.version_count();
assert!(final_count <= initial_count);
}
#[test]
fn test_chain_count() {
let gc: EpochGC<String, i32> = EpochGC::new();
gc.insert("key1".to_string(), 1);
gc.insert("key2".to_string(), 2);
gc.insert("key3".to_string(), 3);
assert_eq!(gc.chain_count(), 3);
}
#[test]
fn test_version_at_respects_tombstone() {
let mut chain: VersionChain<i32> = VersionChain::new();
chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
assert!(chain.version_at(2).is_none());
assert_eq!(chain.version_at(1).map(|v| v.value), Some(100));
assert!(chain.version_at(0).is_none());
}
#[test]
fn test_gc_result_fields() {
let gc: EpochGC<String, i32> = EpochGC::new();
for i in 0..5 {
gc.insert("key".to_string(), i);
gc.advance_epoch();
}
let result = gc.try_gc();
assert!(result.watermark <= gc.current_epoch());
assert!(result.chains_scanned <= gc.chain_count() + 1);
}
#[test]
fn test_lock_free_slot_registration() {
let registry = ReaderRegistry::new();
let id0 = registry.register(10).unwrap();
let id1 = registry.register(20).unwrap();
let id2 = registry.register(30).unwrap();
assert_ne!(id0, id1);
assert_ne!(id1, id2);
assert_eq!(registry.active_count(), 3);
assert_eq!(registry.min_active_epoch(), Some(10));
registry.unregister(id1);
assert_eq!(registry.active_count(), 2);
assert_eq!(registry.min_active_epoch(), Some(10));
registry.unregister(id0);
assert_eq!(registry.active_count(), 1);
assert_eq!(registry.min_active_epoch(), Some(30));
registry.unregister(id2);
assert_eq!(registry.active_count(), 0);
assert_eq!(registry.min_active_epoch(), None);
}
#[test]
fn test_concurrent_insert_and_gc() {
use std::sync::Arc;
use std::thread;
let gc = Arc::new(EpochGC::<u64, u64>::new());
let mut handles = Vec::new();
for t in 0..4 {
let gc = Arc::clone(&gc);
handles.push(thread::spawn(move || {
for i in 0..100 {
gc.insert(t * 1000 + i, i);
}
}));
}
{
let gc = Arc::clone(&gc);
handles.push(thread::spawn(move || {
for _ in 0..10 {
gc.advance_epoch();
gc.try_gc();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(gc.chain_count(), 400);
}
}