use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
const NUM_SLOTS: usize = 65536;
const MAX_EPOCHS: usize = 8;
#[repr(align(64))]
#[derive(Debug)]
struct CacheAlignedSlot {
key_hash: AtomicU64,
epoch: AtomicU32,
_padding: [u8; 52],
}
impl Default for CacheAlignedSlot {
fn default() -> Self {
Self {
key_hash: AtomicU64::new(0),
epoch: AtomicU32::new(u32::MAX), _padding: [0; 52],
}
}
}
pub struct LockFreeEpochTracker {
slots: Box<[CacheAlignedSlot; NUM_SLOTS]>,
write_cursor: AtomicU64,
epoch_boundaries: [AtomicU64; MAX_EPOCHS],
current_epoch: AtomicU32,
}
impl LockFreeEpochTracker {
pub fn new() -> Self {
let slots: Vec<CacheAlignedSlot> = (0..NUM_SLOTS)
.map(|_| CacheAlignedSlot::default())
.collect();
let slots_array: Box<[CacheAlignedSlot; NUM_SLOTS]> = slots
.into_boxed_slice()
.try_into()
.unwrap_or_else(|_| panic!("Failed to create slots array"));
Self {
slots: slots_array,
write_cursor: AtomicU64::new(0),
epoch_boundaries: std::array::from_fn(|_| AtomicU64::new(0)),
current_epoch: AtomicU32::new(0),
}
}
#[inline]
pub fn record_version_hash(&self, key_hash: u64) {
let epoch = self.current_epoch.load(Ordering::Acquire);
let slot_idx = self.write_cursor.fetch_add(1, Ordering::Relaxed) as usize % NUM_SLOTS;
self.slots[slot_idx].key_hash.store(key_hash, Ordering::Relaxed);
self.slots[slot_idx].epoch.store(epoch, Ordering::Release);
}
#[inline]
pub fn record_version(&self, key: &[u8]) {
let hash = Self::hash_key(key);
self.record_version_hash(hash);
}
pub fn advance_epoch(&self) -> EpochDrain<'_> {
let old_cursor = self.write_cursor.load(Ordering::Acquire);
let old_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel);
let boundary_idx = (old_epoch as usize) % MAX_EPOCHS;
self.epoch_boundaries[boundary_idx].store(old_cursor, Ordering::Release);
std::sync::atomic::fence(Ordering::SeqCst);
let prev_boundary_idx = (old_epoch.wrapping_sub(1) as usize) % MAX_EPOCHS;
let start = if old_epoch == 0 {
0
} else {
self.epoch_boundaries[prev_boundary_idx].load(Ordering::Acquire)
};
EpochDrain {
tracker: self,
epoch: old_epoch,
current: start,
end: old_cursor,
}
}
#[inline]
pub fn current(&self) -> u32 {
self.current_epoch.load(Ordering::Relaxed)
}
#[inline]
pub fn total_recorded(&self) -> u64 {
self.write_cursor.load(Ordering::Relaxed)
}
#[inline]
fn hash_key(key: &[u8]) -> u64 {
const K: u64 = 0x517cc1b727220a95;
let mut hash: u64 = 0;
let chunks = key.chunks_exact(8);
let remainder = chunks.remainder();
for chunk in chunks {
let word = u64::from_le_bytes(chunk.try_into().unwrap());
hash = hash.rotate_left(5) ^ word;
hash = hash.wrapping_mul(K);
}
if !remainder.is_empty() {
let mut last_word = 0u64;
for (i, &byte) in remainder.iter().enumerate() {
last_word |= (byte as u64) << (i * 8);
}
hash = hash.rotate_left(5) ^ last_word;
hash = hash.wrapping_mul(K);
}
hash
}
}
impl Default for LockFreeEpochTracker {
fn default() -> Self {
Self::new()
}
}
pub struct EpochDrain<'a> {
tracker: &'a LockFreeEpochTracker,
epoch: u32,
current: u64,
end: u64,
}
impl<'a> Iterator for EpochDrain<'a> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
while self.current < self.end {
let slot_idx = (self.current as usize) % NUM_SLOTS;
self.current += 1;
let slot_epoch = self.tracker.slots[slot_idx].epoch.load(Ordering::Acquire);
if slot_epoch == self.epoch {
let hash = self.tracker.slots[slot_idx].key_hash.load(Ordering::Relaxed);
return Some(hash);
}
}
None
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = (self.end - self.current) as usize;
(0, Some(remaining))
}
}
impl EpochDrain<'_> {
pub fn epoch(&self) -> u32 {
self.epoch
}
pub fn remaining_estimate(&self) -> u64 {
self.end.saturating_sub(self.current)
}
}
pub struct HybridEpochTracker {
slots: LockFreeEpochTracker,
bloom: AtomicBloomFilter,
epoch_bloom: [AtomicBloomFilter; MAX_EPOCHS],
}
impl HybridEpochTracker {
const BLOOM_BITS: usize = 1 << 20;
#[allow(dead_code)]
const BLOOM_K: usize = 3;
pub fn new() -> Self {
Self {
slots: LockFreeEpochTracker::new(),
bloom: AtomicBloomFilter::new(Self::BLOOM_BITS),
epoch_bloom: std::array::from_fn(|_| AtomicBloomFilter::new(Self::BLOOM_BITS)),
}
}
#[inline]
pub fn record_version(&self, key: &[u8]) {
let hash = LockFreeEpochTracker::hash_key(key);
self.slots.record_version_hash(hash);
self.bloom.insert(hash);
let epoch = self.slots.current() as usize % MAX_EPOCHS;
self.epoch_bloom[epoch].insert(hash);
}
#[inline]
pub fn might_be_dirty(&self, key: &[u8]) -> bool {
let hash = LockFreeEpochTracker::hash_key(key);
self.bloom.may_contain(hash)
}
#[inline]
pub fn might_be_dirty_in_epoch(&self, key: &[u8], epoch: u32) -> bool {
let hash = LockFreeEpochTracker::hash_key(key);
let epoch_idx = (epoch as usize) % MAX_EPOCHS;
self.epoch_bloom[epoch_idx].may_contain(hash)
}
pub fn advance_epoch(&self) -> EpochDrain<'_> {
let drain = self.slots.advance_epoch();
let old_epoch_idx = (drain.epoch() as usize) % MAX_EPOCHS;
self.epoch_bloom[old_epoch_idx].clear();
drain
}
#[inline]
pub fn current(&self) -> u32 {
self.slots.current()
}
}
impl Default for HybridEpochTracker {
fn default() -> Self {
Self::new()
}
}
pub struct AtomicBloomFilter {
bits: Box<[AtomicU64]>,
num_bits: usize,
}
impl AtomicBloomFilter {
pub fn new(num_bits: usize) -> Self {
let num_words = (num_bits + 63) / 64;
let bits: Vec<AtomicU64> = (0..num_words)
.map(|_| AtomicU64::new(0))
.collect();
Self {
bits: bits.into_boxed_slice(),
num_bits,
}
}
#[inline]
pub fn insert(&self, hash: u64) {
let h1 = hash;
let h2 = hash.rotate_left(21);
let h3 = hash.rotate_left(42);
self.set_bit((h1 as usize) % self.num_bits);
self.set_bit((h2 as usize) % self.num_bits);
self.set_bit((h3 as usize) % self.num_bits);
}
#[inline]
pub fn may_contain(&self, hash: u64) -> bool {
let h1 = hash;
let h2 = hash.rotate_left(21);
let h3 = hash.rotate_left(42);
self.get_bit((h1 as usize) % self.num_bits)
&& self.get_bit((h2 as usize) % self.num_bits)
&& self.get_bit((h3 as usize) % self.num_bits)
}
pub fn clear(&self) {
for word in self.bits.iter() {
word.store(0, Ordering::Relaxed);
}
}
#[inline]
fn set_bit(&self, bit: usize) {
let word_idx = bit / 64;
let bit_idx = bit % 64;
self.bits[word_idx].fetch_or(1 << bit_idx, Ordering::Relaxed);
}
#[inline]
fn get_bit(&self, bit: usize) -> bool {
let word_idx = bit / 64;
let bit_idx = bit % 64;
(self.bits[word_idx].load(Ordering::Relaxed) & (1 << bit_idx)) != 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::sync::Arc;
#[test]
fn test_lockfree_epoch_basic() {
let tracker = LockFreeEpochTracker::new();
tracker.record_version(b"key1");
tracker.record_version(b"key2");
tracker.record_version(b"key3");
assert_eq!(tracker.total_recorded(), 3);
assert_eq!(tracker.current(), 0);
let drain = tracker.advance_epoch();
assert_eq!(drain.epoch(), 0);
let hashes: Vec<u64> = drain.collect();
assert_eq!(hashes.len(), 3);
assert_eq!(tracker.current(), 1);
}
#[test]
fn test_lockfree_epoch_concurrent() {
let tracker = Arc::new(LockFreeEpochTracker::new());
let num_threads = 8;
let ops_per_thread = 10000;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tracker = Arc::clone(&tracker);
thread::spawn(move || {
for i in 0..ops_per_thread {
let key = format!("thread{}:key{}", t, i);
tracker.record_version(key.as_bytes());
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(tracker.total_recorded(), (num_threads * ops_per_thread) as u64);
}
#[test]
fn test_bloom_filter() {
let bloom = AtomicBloomFilter::new(1 << 16);
for i in 0..1000u64 {
bloom.insert(i * 12345);
}
for i in 0..1000u64 {
assert!(bloom.may_contain(i * 12345));
}
let mut false_positives = 0;
for i in 1000..2000u64 {
if bloom.may_contain(i * 12345 + 1) {
false_positives += 1;
}
}
assert!(false_positives < 100, "False positive rate too high: {}", false_positives);
}
#[test]
fn test_hybrid_tracker() {
let tracker = HybridEpochTracker::new();
tracker.record_version(b"users/1");
tracker.record_version(b"users/2");
tracker.record_version(b"orders/1");
assert!(tracker.might_be_dirty(b"users/1"));
assert!(tracker.might_be_dirty(b"users/2"));
assert!(tracker.might_be_dirty(b"orders/1"));
let non_existent_count = (0..100)
.filter(|i| tracker.might_be_dirty(format!("nonexistent/{}", i).as_bytes()))
.count();
assert!(non_existent_count < 10);
}
#[test]
fn test_epoch_boundary_handling() {
let tracker = LockFreeEpochTracker::new();
tracker.record_version(b"epoch0_key1");
tracker.record_version(b"epoch0_key2");
let drain0 = tracker.advance_epoch();
let epoch0_hashes: Vec<_> = drain0.collect();
assert_eq!(epoch0_hashes.len(), 2);
tracker.record_version(b"epoch1_key1");
tracker.record_version(b"epoch1_key2");
tracker.record_version(b"epoch1_key3");
let drain1 = tracker.advance_epoch();
let epoch1_hashes: Vec<_> = drain1.collect();
assert_eq!(epoch1_hashes.len(), 3);
assert_eq!(tracker.current(), 2);
}
#[test]
fn test_hash_consistency() {
let key = b"test_key_for_hashing";
let hash1 = LockFreeEpochTracker::hash_key(key);
let hash2 = LockFreeEpochTracker::hash_key(key);
assert_eq!(hash1, hash2);
let key2 = b"different_key";
let hash3 = LockFreeEpochTracker::hash_key(key2);
assert_ne!(hash1, hash3);
}
}