1use parking_lot::{Mutex, RwLock};
71use std::collections::VecDeque;
72use std::sync::Arc;
73use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
74
75const MAX_HAZARD_POINTERS: usize = 4;
77
78const EPOCH_GRACE_PERIODS: u64 = 2;
80
81const RECLAIM_THRESHOLD: usize = 64;
83
84#[derive(Debug)]
86struct HazardSlot {
87 ptr: AtomicPtr<()>,
89 owner: AtomicU64,
91}
92
93impl HazardSlot {
94 fn new() -> Self {
95 Self {
96 ptr: AtomicPtr::new(std::ptr::null_mut()),
97 owner: AtomicU64::new(0),
98 }
99 }
100
101 fn acquire(&self, thread_id: u64) -> bool {
102 self.owner
103 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
104 .is_ok()
105 }
106
107 fn release(&self) {
108 self.ptr.store(std::ptr::null_mut(), Ordering::Release);
109 self.owner.store(0, Ordering::Release);
110 }
111
112 fn protect(&self, ptr: *mut ()) {
113 self.ptr.store(ptr, Ordering::Release);
114 }
115
116 fn is_protecting(&self, ptr: *mut ()) -> bool {
117 self.ptr.load(Ordering::Acquire) == ptr
118 }
119}
120
121pub struct HazardDomain {
123 slots: Vec<HazardSlot>,
125 active_count: AtomicUsize,
127 slot_registry: Mutex<Vec<(u64, usize)>>,
129}
130
131impl HazardDomain {
132 pub fn new(max_threads: usize) -> Self {
134 let capacity = max_threads * MAX_HAZARD_POINTERS;
135 let mut slots = Vec::with_capacity(capacity);
136 for _ in 0..capacity {
137 slots.push(HazardSlot::new());
138 }
139
140 Self {
141 slots,
142 active_count: AtomicUsize::new(0),
143 slot_registry: Mutex::new(Vec::new()),
144 }
145 }
146
147 fn acquire_slot(&self, thread_id: u64) -> Option<usize> {
149 {
151 let registry = self.slot_registry.lock();
152 for &(tid, idx) in registry.iter() {
153 if tid == thread_id {
154 return Some(idx);
155 }
156 }
157 }
158
159 for (idx, slot) in self.slots.iter().enumerate() {
161 if slot.acquire(thread_id) {
162 let mut registry = self.slot_registry.lock();
163 registry.push((thread_id, idx));
164 self.active_count.fetch_add(1, Ordering::Relaxed);
165 return Some(idx);
166 }
167 }
168
169 None
170 }
171
172 fn release_slot(&self, thread_id: u64, slot_idx: usize) {
174 if slot_idx < self.slots.len() {
175 self.slots[slot_idx].release();
176 }
177
178 let mut registry = self.slot_registry.lock();
179 registry.retain(|&(tid, _)| tid != thread_id);
180 self.active_count.fetch_sub(1, Ordering::Relaxed);
181 }
182
183 fn protect(&self, slot_idx: usize, ptr: *mut ()) {
185 if slot_idx < self.slots.len() {
186 self.slots[slot_idx].protect(ptr);
187 }
188 }
189
190 fn is_protected(&self, ptr: *mut ()) -> bool {
192 for slot in &self.slots {
193 if slot.is_protecting(ptr) {
194 return true;
195 }
196 }
197 false
198 }
199
200 pub fn active_count(&self) -> usize {
202 self.active_count.load(Ordering::Relaxed)
203 }
204}
205
206impl Default for HazardDomain {
207 fn default() -> Self {
208 Self::new(64) }
210}
211
212pub struct EpochDomain {
214 global_epoch: AtomicU64,
216 local_epochs: RwLock<Vec<AtomicU64>>,
218 limbo: Mutex<VecDeque<(u64, Vec<RetiredObject>)>>,
220 retired_count: AtomicUsize,
222}
223
224#[allow(dead_code)]
226struct RetiredObject {
227 ptr: *mut (),
228 destructor: fn(*mut ()),
229 size: usize,
230}
231
232unsafe impl Send for RetiredObject {}
235
236impl EpochDomain {
237 pub fn new() -> Self {
239 Self {
240 global_epoch: AtomicU64::new(0),
241 local_epochs: RwLock::new(Vec::new()),
242 limbo: Mutex::new(VecDeque::new()),
243 retired_count: AtomicUsize::new(0),
244 }
245 }
246
247 pub fn register_thread(&self) -> usize {
249 let mut epochs = self.local_epochs.write();
250 let idx = epochs.len();
251 epochs.push(AtomicU64::new(u64::MAX)); idx
253 }
254
255 pub fn pin(&self, thread_idx: usize) {
257 let current = self.global_epoch.load(Ordering::SeqCst);
258 let epochs = self.local_epochs.read();
259 if thread_idx < epochs.len() {
260 epochs[thread_idx].store(current, Ordering::SeqCst);
261 }
262 }
263
264 pub fn unpin(&self, thread_idx: usize) {
266 let epochs = self.local_epochs.read();
267 if thread_idx < epochs.len() {
268 epochs[thread_idx].store(u64::MAX, Ordering::SeqCst);
269 }
270 }
271
272 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
274 let current_epoch = self.global_epoch.load(Ordering::SeqCst);
275 let obj = RetiredObject {
276 ptr,
277 destructor,
278 size,
279 };
280
281 let mut limbo = self.limbo.lock();
282
283 if limbo.back().is_none_or(|(e, _)| *e != current_epoch) {
285 limbo.push_back((current_epoch, Vec::new()));
286 }
287
288 if let Some((_, objects)) = limbo.back_mut() {
289 objects.push(obj);
290 }
291
292 let count = self.retired_count.fetch_add(1, Ordering::Relaxed);
293
294 if count >= RECLAIM_THRESHOLD {
296 drop(limbo);
297 self.try_reclaim();
298 }
299 }
300
301 pub fn advance_epoch(&self) {
303 self.global_epoch.fetch_add(1, Ordering::SeqCst);
304 }
305
306 fn safe_epoch(&self) -> u64 {
308 let epochs = self.local_epochs.read();
309 let mut min = self.global_epoch.load(Ordering::SeqCst);
310
311 for epoch in epochs.iter() {
312 let e = epoch.load(Ordering::SeqCst);
313 if e != u64::MAX && e < min {
314 min = e;
315 }
316 }
317
318 min.saturating_sub(EPOCH_GRACE_PERIODS)
320 }
321
322 pub fn try_reclaim(&self) -> usize {
324 let safe = self.safe_epoch();
325 let mut reclaimed = 0;
326
327 let mut limbo = self.limbo.lock();
328
329 while let Some((epoch, _)) = limbo.front() {
330 if *epoch > safe {
331 break;
332 }
333
334 if let Some((_, objects)) = limbo.pop_front() {
335 for obj in objects {
336 (obj.destructor)(obj.ptr);
338 reclaimed += 1;
339 self.retired_count.fetch_sub(1, Ordering::Relaxed);
340 }
341 }
342 }
343
344 reclaimed
345 }
346
347 pub fn current_epoch(&self) -> u64 {
349 self.global_epoch.load(Ordering::SeqCst)
350 }
351
352 pub fn pending_count(&self) -> usize {
354 self.retired_count.load(Ordering::Relaxed)
355 }
356}
357
358impl Default for EpochDomain {
359 fn default() -> Self {
360 Self::new()
361 }
362}
363
364pub struct HazardGuard<'a> {
366 domain: &'a HazardDomain,
367 slot_idx: usize,
368 thread_id: u64,
369}
370
371impl<'a> HazardGuard<'a> {
372 pub fn protect(&self, ptr: *mut ()) {
374 self.domain.protect(self.slot_idx, ptr);
375 }
376
377 pub fn protect_typed<T>(&self, ptr: *mut T) {
379 self.protect(ptr as *mut ());
380 }
381}
382
383impl<'a> Drop for HazardGuard<'a> {
384 fn drop(&mut self) {
385 self.domain.release_slot(self.thread_id, self.slot_idx);
386 }
387}
388
389pub struct EpochGuard<'a> {
391 domain: &'a EpochDomain,
392 thread_idx: usize,
393}
394
395impl<'a> Drop for EpochGuard<'a> {
396 fn drop(&mut self) {
397 self.domain.unpin(self.thread_idx);
398 }
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub enum ReclaimStrategy {
404 HazardPointer,
406 Epoch,
408 Auto,
410}
411
412pub struct UnifiedReclaimer {
414 hazard: Arc<HazardDomain>,
415 epoch: Arc<EpochDomain>,
416 thread_epochs: Mutex<std::collections::HashMap<u64, usize>>,
418 default_strategy: ReclaimStrategy,
420 stats: ReclaimStats,
422}
423
424#[derive(Debug, Default)]
426pub struct ReclaimStats {
427 pub hazard_pins: AtomicU64,
428 pub epoch_pins: AtomicU64,
429 pub objects_retired: AtomicU64,
430 pub objects_reclaimed: AtomicU64,
431 pub reclaim_cycles: AtomicU64,
432}
433
434impl ReclaimStats {
435 fn record_hazard_pin(&self) {
436 self.hazard_pins.fetch_add(1, Ordering::Relaxed);
437 }
438
439 fn record_epoch_pin(&self) {
440 self.epoch_pins.fetch_add(1, Ordering::Relaxed);
441 }
442
443 fn record_retire(&self) {
444 self.objects_retired.fetch_add(1, Ordering::Relaxed);
445 }
446
447 fn record_reclaim(&self, count: usize) {
448 self.objects_reclaimed
449 .fetch_add(count as u64, Ordering::Relaxed);
450 self.reclaim_cycles.fetch_add(1, Ordering::Relaxed);
451 }
452
453 pub fn snapshot(&self) -> ReclaimStatsSnapshot {
454 ReclaimStatsSnapshot {
455 hazard_pins: self.hazard_pins.load(Ordering::Relaxed),
456 epoch_pins: self.epoch_pins.load(Ordering::Relaxed),
457 objects_retired: self.objects_retired.load(Ordering::Relaxed),
458 objects_reclaimed: self.objects_reclaimed.load(Ordering::Relaxed),
459 reclaim_cycles: self.reclaim_cycles.load(Ordering::Relaxed),
460 }
461 }
462}
463
464#[derive(Debug, Clone)]
465pub struct ReclaimStatsSnapshot {
466 pub hazard_pins: u64,
467 pub epoch_pins: u64,
468 pub objects_retired: u64,
469 pub objects_reclaimed: u64,
470 pub reclaim_cycles: u64,
471}
472
473impl UnifiedReclaimer {
474 pub fn new() -> Self {
476 Self {
477 hazard: Arc::new(HazardDomain::default()),
478 epoch: Arc::new(EpochDomain::default()),
479 thread_epochs: Mutex::new(std::collections::HashMap::new()),
480 default_strategy: ReclaimStrategy::Auto,
481 stats: ReclaimStats::default(),
482 }
483 }
484
485 pub fn with_capacity(max_threads: usize) -> Self {
487 Self {
488 hazard: Arc::new(HazardDomain::new(max_threads)),
489 epoch: Arc::new(EpochDomain::default()),
490 thread_epochs: Mutex::new(std::collections::HashMap::new()),
491 default_strategy: ReclaimStrategy::Auto,
492 stats: ReclaimStats::default(),
493 }
494 }
495
496 pub fn with_strategy(mut self, strategy: ReclaimStrategy) -> Self {
498 self.default_strategy = strategy;
499 self
500 }
501
502 pub fn pin_hazard(&self) -> Option<HazardGuard<'_>> {
504 let thread_id = self.current_thread_id();
505 let slot_idx = self.hazard.acquire_slot(thread_id)?;
506
507 self.stats.record_hazard_pin();
508
509 Some(HazardGuard {
510 domain: &self.hazard,
511 slot_idx,
512 thread_id,
513 })
514 }
515
516 pub fn pin_epoch(&self) -> EpochGuard<'_> {
518 let thread_id = self.current_thread_id();
519
520 let thread_idx = {
521 let mut epochs = self.thread_epochs.lock();
522 *epochs
523 .entry(thread_id)
524 .or_insert_with(|| self.epoch.register_thread())
525 };
526
527 self.epoch.pin(thread_idx);
528 self.stats.record_epoch_pin();
529
530 EpochGuard {
531 domain: &self.epoch,
532 thread_idx,
533 }
534 }
535
536 pub unsafe fn retire<T>(&self, ptr: *mut T) {
542 let destructor = |p: *mut ()| {
543 unsafe { drop(Box::from_raw(p as *mut T)) };
545 };
546
547 self.epoch
548 .retire(ptr as *mut (), destructor, std::mem::size_of::<T>());
549 self.stats.record_retire();
550 }
551
552 pub fn retire_with_destructor(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
554 self.epoch.retire(ptr, destructor, size);
555 self.stats.record_retire();
556 }
557
558 pub fn is_protected(&self, ptr: *mut ()) -> bool {
560 self.hazard.is_protected(ptr)
561 }
562
563 pub fn try_reclaim(&self) -> usize {
565 let reclaimed = self.epoch.try_reclaim();
566 if reclaimed > 0 {
567 self.stats.record_reclaim(reclaimed);
568 }
569 reclaimed
570 }
571
572 pub fn advance_epoch(&self) {
574 self.epoch.advance_epoch();
575 }
576
577 pub fn current_epoch(&self) -> u64 {
579 self.epoch.current_epoch()
580 }
581
582 pub fn pending_count(&self) -> usize {
584 self.epoch.pending_count()
585 }
586
587 pub fn stats(&self) -> ReclaimStatsSnapshot {
589 self.stats.snapshot()
590 }
591
592 fn current_thread_id(&self) -> u64 {
594 use std::hash::{Hash, Hasher};
596 let mut hasher = std::collections::hash_map::DefaultHasher::new();
597 std::thread::current().id().hash(&mut hasher);
598 hasher.finish()
599 }
600}
601
602impl Default for UnifiedReclaimer {
603 fn default() -> Self {
604 Self::new()
605 }
606}
607
608pub struct ThreadLocalReclaimer {
610 reclaimer: Arc<UnifiedReclaimer>,
611 hazard_slot: Option<usize>,
612 epoch_idx: usize,
613 thread_id: u64,
614}
615
616impl ThreadLocalReclaimer {
617 pub fn new(reclaimer: Arc<UnifiedReclaimer>) -> Self {
619 use std::hash::{Hash, Hasher};
621 let mut hasher = std::collections::hash_map::DefaultHasher::new();
622 std::thread::current().id().hash(&mut hasher);
623 let thread_id = hasher.finish();
624
625 let epoch_idx = reclaimer.epoch.register_thread();
626
627 Self {
628 reclaimer,
629 hazard_slot: None,
630 epoch_idx,
631 thread_id,
632 }
633 }
634
635 pub fn pin_hazard(&mut self) -> bool {
637 if self.hazard_slot.is_some() {
638 return true;
639 }
640
641 if let Some(slot) = self.reclaimer.hazard.acquire_slot(self.thread_id) {
642 self.hazard_slot = Some(slot);
643 true
644 } else {
645 false
646 }
647 }
648
649 pub fn protect(&self, ptr: *mut ()) {
651 if let Some(slot) = self.hazard_slot {
652 self.reclaimer.hazard.protect(slot, ptr);
653 }
654 }
655
656 pub fn pin_epoch(&self) {
658 self.reclaimer.epoch.pin(self.epoch_idx);
659 }
660
661 pub fn unpin_epoch(&self) {
663 self.reclaimer.epoch.unpin(self.epoch_idx);
664 }
665
666 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
668 self.reclaimer.epoch.retire(ptr, destructor, size);
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675 use std::sync::atomic::AtomicBool;
676
677 #[test]
678 fn test_hazard_domain_basic() {
679 let domain = HazardDomain::new(4);
680
681 let thread_id = 12345u64;
682 let slot = domain.acquire_slot(thread_id).unwrap();
683
684 assert_eq!(domain.active_count(), 1);
685
686 let data = Box::into_raw(Box::new(42u64));
688 domain.protect(slot, data as *mut ());
689
690 assert!(domain.is_protected(data as *mut ()));
691
692 domain.release_slot(thread_id, slot);
693 assert_eq!(domain.active_count(), 0);
694 assert!(!domain.is_protected(data as *mut ()));
695
696 unsafe { drop(Box::from_raw(data)) };
698 }
699
700 #[test]
701 fn test_epoch_domain_basic() {
702 let domain = EpochDomain::new();
703
704 let idx = domain.register_thread();
705 assert_eq!(domain.current_epoch(), 0);
706
707 domain.pin(idx);
708 domain.advance_epoch();
709 assert_eq!(domain.current_epoch(), 1);
710
711 domain.unpin(idx);
712 }
713
714 #[test]
715 fn test_epoch_retirement() {
716 static DROPPED: AtomicBool = AtomicBool::new(false);
717 DROPPED.store(false, Ordering::SeqCst); fn drop_test(ptr: *mut ()) {
720 DROPPED.store(true, Ordering::SeqCst);
721 unsafe { drop(Box::from_raw(ptr as *mut u64)) };
722 }
723
724 let domain = EpochDomain::new();
725 let idx = domain.register_thread();
726
727 domain.pin(idx);
729 let data = Box::into_raw(Box::new(42u64));
730 domain.retire(data as *mut (), drop_test, 8);
731
732 let reclaimed_while_pinned = domain.try_reclaim();
735
736 domain.unpin(idx);
738 for _ in 0..EPOCH_GRACE_PERIODS + 2 {
739 domain.advance_epoch();
740 }
741
742 let reclaimed = domain.try_reclaim();
744
745 assert!(DROPPED.load(Ordering::SeqCst) || reclaimed > 0 || reclaimed_while_pinned > 0);
748 }
749
750 #[test]
751 fn test_unified_reclaimer_hazard() {
752 let reclaimer = UnifiedReclaimer::new();
753
754 let guard = reclaimer.pin_hazard();
755 assert!(guard.is_some());
756
757 let guard = guard.unwrap();
758 let data = Box::into_raw(Box::new(String::from("test")));
759 guard.protect_typed(data);
760
761 assert!(reclaimer.is_protected(data as *mut ()));
762
763 drop(guard);
764 assert!(!reclaimer.is_protected(data as *mut ()));
765
766 unsafe { drop(Box::from_raw(data)) };
768 }
769
770 #[test]
771 fn test_unified_reclaimer_epoch() {
772 let reclaimer = UnifiedReclaimer::new();
773
774 {
775 let _guard = reclaimer.pin_epoch();
776 assert_eq!(reclaimer.current_epoch(), 0);
777 }
778
779 reclaimer.advance_epoch();
780 assert_eq!(reclaimer.current_epoch(), 1);
781 }
782
783 #[test]
784 fn test_stats_tracking() {
785 let reclaimer = UnifiedReclaimer::new();
786
787 {
788 let _guard = reclaimer.pin_epoch();
789 }
790
791 let _ = reclaimer.pin_hazard();
792
793 let stats = reclaimer.stats();
794 assert!(stats.epoch_pins >= 1);
795 assert!(stats.hazard_pins >= 1);
796 }
797
798 #[test]
799 fn test_thread_local_reclaimer() {
800 let reclaimer = Arc::new(UnifiedReclaimer::new());
801 let mut local = ThreadLocalReclaimer::new(Arc::clone(&reclaimer));
802
803 assert!(local.pin_hazard());
804
805 let data = Box::into_raw(Box::new(100u32));
806 local.protect(data as *mut ());
807
808 assert!(reclaimer.is_protected(data as *mut ()));
809
810 unsafe { drop(Box::from_raw(data)) };
812 }
813
814 #[test]
815 fn test_multiple_hazard_slots() {
816 let domain = HazardDomain::new(2);
817
818 let slot1 = domain.acquire_slot(1).unwrap();
819 let slot2 = domain.acquire_slot(2).unwrap();
820
821 let data1 = Box::into_raw(Box::new(1u64));
822 let data2 = Box::into_raw(Box::new(2u64));
823
824 domain.protect(slot1, data1 as *mut ());
825 domain.protect(slot2, data2 as *mut ());
826
827 assert!(domain.is_protected(data1 as *mut ()));
828 assert!(domain.is_protected(data2 as *mut ()));
829
830 domain.release_slot(1, slot1);
831 assert!(!domain.is_protected(data1 as *mut ()));
832 assert!(domain.is_protected(data2 as *mut ()));
833
834 domain.release_slot(2, slot2);
835
836 unsafe {
838 drop(Box::from_raw(data1));
839 drop(Box::from_raw(data2));
840 }
841 }
842
843 #[test]
844 fn test_reclaim_stats_snapshot() {
845 let stats = ReclaimStats::default();
846
847 stats.record_hazard_pin();
848 stats.record_hazard_pin();
849 stats.record_epoch_pin();
850 stats.record_retire();
851 stats.record_reclaim(5);
852
853 let snapshot = stats.snapshot();
854 assert_eq!(snapshot.hazard_pins, 2);
855 assert_eq!(snapshot.epoch_pins, 1);
856 assert_eq!(snapshot.objects_retired, 1);
857 assert_eq!(snapshot.objects_reclaimed, 5);
858 assert_eq!(snapshot.reclaim_cycles, 1);
859 }
860
861 #[test]
862 fn test_strategy_configuration() {
863 let reclaimer = UnifiedReclaimer::new().with_strategy(ReclaimStrategy::Epoch);
864
865 assert_eq!(reclaimer.default_strategy, ReclaimStrategy::Epoch);
866 }
867}