use parking_lot::{Mutex, RwLock};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
const MAX_HAZARD_POINTERS: usize = 4;
const EPOCH_GRACE_PERIODS: u64 = 2;
const RECLAIM_THRESHOLD: usize = 64;
#[derive(Debug)]
#[repr(C, align(64))]
struct HazardSlot {
ptr: AtomicPtr<()>,
owner: AtomicU64,
}
impl HazardSlot {
fn new() -> Self {
Self {
ptr: AtomicPtr::new(std::ptr::null_mut()),
owner: AtomicU64::new(0),
}
}
fn acquire(&self, thread_id: u64) -> bool {
self.owner
.compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
}
fn release(&self) {
self.ptr.store(std::ptr::null_mut(), Ordering::Release);
self.owner.store(0, Ordering::Release);
}
fn protect(&self, ptr: *mut ()) {
self.ptr.store(ptr, Ordering::Release);
}
fn is_protecting(&self, ptr: *mut ()) -> bool {
self.ptr.load(Ordering::Acquire) == ptr
}
}
pub struct HazardDomain {
slots: Vec<HazardSlot>,
active_count: AtomicUsize,
slot_registry: Mutex<Vec<(u64, usize)>>,
}
impl HazardDomain {
pub fn new(max_threads: usize) -> Self {
let capacity = max_threads * MAX_HAZARD_POINTERS;
let mut slots = Vec::with_capacity(capacity);
for _ in 0..capacity {
slots.push(HazardSlot::new());
}
Self {
slots,
active_count: AtomicUsize::new(0),
slot_registry: Mutex::new(Vec::new()),
}
}
fn acquire_slot(&self, thread_id: u64) -> Option<usize> {
{
let registry = self.slot_registry.lock();
for &(tid, idx) in registry.iter() {
if tid == thread_id {
return Some(idx);
}
}
}
for (idx, slot) in self.slots.iter().enumerate() {
if slot.acquire(thread_id) {
let mut registry = self.slot_registry.lock();
registry.push((thread_id, idx));
self.active_count.fetch_add(1, Ordering::Relaxed);
return Some(idx);
}
}
None
}
fn release_slot(&self, thread_id: u64, slot_idx: usize) {
if slot_idx < self.slots.len() {
self.slots[slot_idx].release();
}
let mut registry = self.slot_registry.lock();
registry.retain(|&(tid, _)| tid != thread_id);
self.active_count.fetch_sub(1, Ordering::Relaxed);
}
fn protect(&self, slot_idx: usize, ptr: *mut ()) {
if slot_idx < self.slots.len() {
self.slots[slot_idx].protect(ptr);
}
}
fn is_protected(&self, ptr: *mut ()) -> bool {
for slot in &self.slots {
if slot.is_protecting(ptr) {
return true;
}
}
false
}
pub fn active_count(&self) -> usize {
self.active_count.load(Ordering::Relaxed)
}
}
impl Default for HazardDomain {
fn default() -> Self {
Self::new(64) }
}
pub struct EpochDomain {
global_epoch: AtomicU64,
local_epochs: RwLock<Vec<AtomicU64>>,
limbo: Mutex<VecDeque<(u64, Vec<RetiredObject>)>>,
retired_count: AtomicUsize,
}
#[allow(dead_code)]
struct RetiredObject {
ptr: *mut (),
destructor: fn(*mut ()),
size: usize,
}
unsafe impl Send for RetiredObject {}
impl EpochDomain {
pub fn new() -> Self {
Self {
global_epoch: AtomicU64::new(0),
local_epochs: RwLock::new(Vec::new()),
limbo: Mutex::new(VecDeque::new()),
retired_count: AtomicUsize::new(0),
}
}
pub fn register_thread(&self) -> usize {
let mut epochs = self.local_epochs.write();
let idx = epochs.len();
epochs.push(AtomicU64::new(u64::MAX)); idx
}
pub fn pin(&self, thread_idx: usize) {
let current = self.global_epoch.load(Ordering::SeqCst);
let epochs = self.local_epochs.read();
if thread_idx < epochs.len() {
epochs[thread_idx].store(current, Ordering::SeqCst);
}
}
pub fn unpin(&self, thread_idx: usize) {
let epochs = self.local_epochs.read();
if thread_idx < epochs.len() {
epochs[thread_idx].store(u64::MAX, Ordering::SeqCst);
}
}
pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
let current_epoch = self.global_epoch.load(Ordering::SeqCst);
let obj = RetiredObject {
ptr,
destructor,
size,
};
let mut limbo = self.limbo.lock();
if limbo.back().is_none_or(|(e, _)| *e != current_epoch) {
limbo.push_back((current_epoch, Vec::new()));
}
if let Some((_, objects)) = limbo.back_mut() {
objects.push(obj);
}
let count = self.retired_count.fetch_add(1, Ordering::Relaxed);
if count >= RECLAIM_THRESHOLD {
drop(limbo);
self.try_reclaim();
}
}
pub fn advance_epoch(&self) {
self.global_epoch.fetch_add(1, Ordering::SeqCst);
}
fn safe_epoch(&self) -> u64 {
let epochs = self.local_epochs.read();
let mut min = self.global_epoch.load(Ordering::SeqCst);
for epoch in epochs.iter() {
let e = epoch.load(Ordering::SeqCst);
if e != u64::MAX && e < min {
min = e;
}
}
min.saturating_sub(EPOCH_GRACE_PERIODS)
}
pub fn try_reclaim(&self) -> usize {
let safe = self.safe_epoch();
let mut reclaimed = 0;
let mut limbo = self.limbo.lock();
while let Some((epoch, _)) = limbo.front() {
if *epoch > safe {
break;
}
if let Some((_, objects)) = limbo.pop_front() {
for obj in objects {
(obj.destructor)(obj.ptr);
reclaimed += 1;
self.retired_count.fetch_sub(1, Ordering::Relaxed);
}
}
}
reclaimed
}
pub fn current_epoch(&self) -> u64 {
self.global_epoch.load(Ordering::SeqCst)
}
pub fn pending_count(&self) -> usize {
self.retired_count.load(Ordering::Relaxed)
}
}
impl Default for EpochDomain {
fn default() -> Self {
Self::new()
}
}
pub struct HazardGuard<'a> {
domain: &'a HazardDomain,
slot_idx: usize,
thread_id: u64,
}
impl<'a> HazardGuard<'a> {
pub fn protect(&self, ptr: *mut ()) {
self.domain.protect(self.slot_idx, ptr);
}
pub fn protect_typed<T>(&self, ptr: *mut T) {
self.protect(ptr as *mut ());
}
}
impl<'a> Drop for HazardGuard<'a> {
fn drop(&mut self) {
self.domain.release_slot(self.thread_id, self.slot_idx);
}
}
pub struct EpochGuard<'a> {
domain: &'a EpochDomain,
thread_idx: usize,
}
impl<'a> Drop for EpochGuard<'a> {
fn drop(&mut self) {
self.domain.unpin(self.thread_idx);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReclaimStrategy {
HazardPointer,
Epoch,
Auto,
}
pub struct UnifiedReclaimer {
hazard: Arc<HazardDomain>,
epoch: Arc<EpochDomain>,
thread_epochs: Mutex<std::collections::HashMap<u64, usize>>,
default_strategy: ReclaimStrategy,
stats: ReclaimStats,
}
#[derive(Debug, Default)]
pub struct ReclaimStats {
pub hazard_pins: AtomicU64,
pub epoch_pins: AtomicU64,
pub objects_retired: AtomicU64,
pub objects_reclaimed: AtomicU64,
pub reclaim_cycles: AtomicU64,
}
impl ReclaimStats {
fn record_hazard_pin(&self) {
self.hazard_pins.fetch_add(1, Ordering::Relaxed);
}
fn record_epoch_pin(&self) {
self.epoch_pins.fetch_add(1, Ordering::Relaxed);
}
fn record_retire(&self) {
self.objects_retired.fetch_add(1, Ordering::Relaxed);
}
fn record_reclaim(&self, count: usize) {
self.objects_reclaimed
.fetch_add(count as u64, Ordering::Relaxed);
self.reclaim_cycles.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> ReclaimStatsSnapshot {
ReclaimStatsSnapshot {
hazard_pins: self.hazard_pins.load(Ordering::Relaxed),
epoch_pins: self.epoch_pins.load(Ordering::Relaxed),
objects_retired: self.objects_retired.load(Ordering::Relaxed),
objects_reclaimed: self.objects_reclaimed.load(Ordering::Relaxed),
reclaim_cycles: self.reclaim_cycles.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct ReclaimStatsSnapshot {
pub hazard_pins: u64,
pub epoch_pins: u64,
pub objects_retired: u64,
pub objects_reclaimed: u64,
pub reclaim_cycles: u64,
}
impl UnifiedReclaimer {
pub fn new() -> Self {
Self {
hazard: Arc::new(HazardDomain::default()),
epoch: Arc::new(EpochDomain::default()),
thread_epochs: Mutex::new(std::collections::HashMap::new()),
default_strategy: ReclaimStrategy::Auto,
stats: ReclaimStats::default(),
}
}
pub fn with_capacity(max_threads: usize) -> Self {
Self {
hazard: Arc::new(HazardDomain::new(max_threads)),
epoch: Arc::new(EpochDomain::default()),
thread_epochs: Mutex::new(std::collections::HashMap::new()),
default_strategy: ReclaimStrategy::Auto,
stats: ReclaimStats::default(),
}
}
pub fn with_strategy(mut self, strategy: ReclaimStrategy) -> Self {
self.default_strategy = strategy;
self
}
pub fn pin_hazard(&self) -> Option<HazardGuard<'_>> {
let thread_id = self.current_thread_id();
let slot_idx = self.hazard.acquire_slot(thread_id)?;
self.stats.record_hazard_pin();
Some(HazardGuard {
domain: &self.hazard,
slot_idx,
thread_id,
})
}
pub fn pin_epoch(&self) -> EpochGuard<'_> {
let thread_id = self.current_thread_id();
let thread_idx = {
let mut epochs = self.thread_epochs.lock();
*epochs
.entry(thread_id)
.or_insert_with(|| self.epoch.register_thread())
};
self.epoch.pin(thread_idx);
self.stats.record_epoch_pin();
EpochGuard {
domain: &self.epoch,
thread_idx,
}
}
pub unsafe fn retire<T>(&self, ptr: *mut T) {
let destructor = |p: *mut ()| {
unsafe { drop(Box::from_raw(p as *mut T)) };
};
self.epoch
.retire(ptr as *mut (), destructor, std::mem::size_of::<T>());
self.stats.record_retire();
}
pub fn retire_with_destructor(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
self.epoch.retire(ptr, destructor, size);
self.stats.record_retire();
}
pub fn is_protected(&self, ptr: *mut ()) -> bool {
self.hazard.is_protected(ptr)
}
pub fn try_reclaim(&self) -> usize {
let reclaimed = self.epoch.try_reclaim();
if reclaimed > 0 {
self.stats.record_reclaim(reclaimed);
}
reclaimed
}
pub fn advance_epoch(&self) {
self.epoch.advance_epoch();
}
pub fn current_epoch(&self) -> u64 {
self.epoch.current_epoch()
}
pub fn pending_count(&self) -> usize {
self.epoch.pending_count()
}
pub fn stats(&self) -> ReclaimStatsSnapshot {
self.stats.snapshot()
}
fn current_thread_id(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::thread::current().id().hash(&mut hasher);
hasher.finish()
}
}
impl Default for UnifiedReclaimer {
fn default() -> Self {
Self::new()
}
}
pub struct ThreadLocalReclaimer {
reclaimer: Arc<UnifiedReclaimer>,
hazard_slot: Option<usize>,
epoch_idx: usize,
thread_id: u64,
}
impl ThreadLocalReclaimer {
pub fn new(reclaimer: Arc<UnifiedReclaimer>) -> Self {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::thread::current().id().hash(&mut hasher);
let thread_id = hasher.finish();
let epoch_idx = reclaimer.epoch.register_thread();
Self {
reclaimer,
hazard_slot: None,
epoch_idx,
thread_id,
}
}
pub fn pin_hazard(&mut self) -> bool {
if self.hazard_slot.is_some() {
return true;
}
if let Some(slot) = self.reclaimer.hazard.acquire_slot(self.thread_id) {
self.hazard_slot = Some(slot);
true
} else {
false
}
}
pub fn protect(&self, ptr: *mut ()) {
if let Some(slot) = self.hazard_slot {
self.reclaimer.hazard.protect(slot, ptr);
}
}
pub fn pin_epoch(&self) {
self.reclaimer.epoch.pin(self.epoch_idx);
}
pub fn unpin_epoch(&self) {
self.reclaimer.epoch.unpin(self.epoch_idx);
}
pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
self.reclaimer.epoch.retire(ptr, destructor, size);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
#[test]
fn test_hazard_domain_basic() {
let domain = HazardDomain::new(4);
let thread_id = 12345u64;
let slot = domain.acquire_slot(thread_id).unwrap();
assert_eq!(domain.active_count(), 1);
let data = Box::into_raw(Box::new(42u64));
domain.protect(slot, data as *mut ());
assert!(domain.is_protected(data as *mut ()));
domain.release_slot(thread_id, slot);
assert_eq!(domain.active_count(), 0);
assert!(!domain.is_protected(data as *mut ()));
unsafe { drop(Box::from_raw(data)) };
}
#[test]
fn test_epoch_domain_basic() {
let domain = EpochDomain::new();
let idx = domain.register_thread();
assert_eq!(domain.current_epoch(), 0);
domain.pin(idx);
domain.advance_epoch();
assert_eq!(domain.current_epoch(), 1);
domain.unpin(idx);
}
#[test]
fn test_epoch_retirement() {
static DROPPED: AtomicBool = AtomicBool::new(false);
DROPPED.store(false, Ordering::SeqCst);
fn drop_test(ptr: *mut ()) {
DROPPED.store(true, Ordering::SeqCst);
unsafe { drop(Box::from_raw(ptr as *mut u64)) };
}
let domain = EpochDomain::new();
let idx = domain.register_thread();
domain.pin(idx);
let data = Box::into_raw(Box::new(42u64));
domain.retire(data as *mut (), drop_test, 8);
let reclaimed_while_pinned = domain.try_reclaim();
domain.unpin(idx);
for _ in 0..EPOCH_GRACE_PERIODS + 2 {
domain.advance_epoch();
}
let reclaimed = domain.try_reclaim();
assert!(DROPPED.load(Ordering::SeqCst) || reclaimed > 0 || reclaimed_while_pinned > 0);
}
#[test]
fn test_unified_reclaimer_hazard() {
let reclaimer = UnifiedReclaimer::new();
let guard = reclaimer.pin_hazard();
assert!(guard.is_some());
let guard = guard.unwrap();
let data = Box::into_raw(Box::new(String::from("test")));
guard.protect_typed(data);
assert!(reclaimer.is_protected(data as *mut ()));
drop(guard);
assert!(!reclaimer.is_protected(data as *mut ()));
unsafe { drop(Box::from_raw(data)) };
}
#[test]
fn test_unified_reclaimer_epoch() {
let reclaimer = UnifiedReclaimer::new();
{
let _guard = reclaimer.pin_epoch();
assert_eq!(reclaimer.current_epoch(), 0);
}
reclaimer.advance_epoch();
assert_eq!(reclaimer.current_epoch(), 1);
}
#[test]
fn test_stats_tracking() {
let reclaimer = UnifiedReclaimer::new();
{
let _guard = reclaimer.pin_epoch();
}
let _ = reclaimer.pin_hazard();
let stats = reclaimer.stats();
assert!(stats.epoch_pins >= 1);
assert!(stats.hazard_pins >= 1);
}
#[test]
fn test_thread_local_reclaimer() {
let reclaimer = Arc::new(UnifiedReclaimer::new());
let mut local = ThreadLocalReclaimer::new(Arc::clone(&reclaimer));
assert!(local.pin_hazard());
let data = Box::into_raw(Box::new(100u32));
local.protect(data as *mut ());
assert!(reclaimer.is_protected(data as *mut ()));
unsafe { drop(Box::from_raw(data)) };
}
#[test]
fn test_multiple_hazard_slots() {
let domain = HazardDomain::new(2);
let slot1 = domain.acquire_slot(1).unwrap();
let slot2 = domain.acquire_slot(2).unwrap();
let data1 = Box::into_raw(Box::new(1u64));
let data2 = Box::into_raw(Box::new(2u64));
domain.protect(slot1, data1 as *mut ());
domain.protect(slot2, data2 as *mut ());
assert!(domain.is_protected(data1 as *mut ()));
assert!(domain.is_protected(data2 as *mut ()));
domain.release_slot(1, slot1);
assert!(!domain.is_protected(data1 as *mut ()));
assert!(domain.is_protected(data2 as *mut ()));
domain.release_slot(2, slot2);
unsafe {
drop(Box::from_raw(data1));
drop(Box::from_raw(data2));
}
}
#[test]
fn test_reclaim_stats_snapshot() {
let stats = ReclaimStats::default();
stats.record_hazard_pin();
stats.record_hazard_pin();
stats.record_epoch_pin();
stats.record_retire();
stats.record_reclaim(5);
let snapshot = stats.snapshot();
assert_eq!(snapshot.hazard_pins, 2);
assert_eq!(snapshot.epoch_pins, 1);
assert_eq!(snapshot.objects_retired, 1);
assert_eq!(snapshot.objects_reclaimed, 5);
assert_eq!(snapshot.reclaim_cycles, 1);
}
#[test]
fn test_strategy_configuration() {
let reclaimer = UnifiedReclaimer::new().with_strategy(ReclaimStrategy::Epoch);
assert_eq!(reclaimer.default_strategy, ReclaimStrategy::Epoch);
}
}