1use parking_lot::{Mutex, RwLock};
74use std::collections::VecDeque;
75use std::sync::Arc;
76use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
77
78const MAX_HAZARD_POINTERS: usize = 4;
80
81const EPOCH_GRACE_PERIODS: u64 = 2;
83
84const RECLAIM_THRESHOLD: usize = 64;
86
87#[derive(Debug)]
96#[repr(C, align(64))]
97struct HazardSlot {
98 ptr: AtomicPtr<()>,
100 owner: AtomicU64,
102 }
104
105impl HazardSlot {
106 fn new() -> Self {
107 Self {
108 ptr: AtomicPtr::new(std::ptr::null_mut()),
109 owner: AtomicU64::new(0),
110 }
111 }
112
113 fn acquire(&self, thread_id: u64) -> bool {
114 self.owner
115 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
116 .is_ok()
117 }
118
119 fn release(&self) {
120 self.ptr.store(std::ptr::null_mut(), Ordering::Release);
121 self.owner.store(0, Ordering::Release);
122 }
123
124 fn protect(&self, ptr: *mut ()) {
125 self.ptr.store(ptr, Ordering::Release);
126 }
127
128 fn is_protecting(&self, ptr: *mut ()) -> bool {
129 self.ptr.load(Ordering::Acquire) == ptr
130 }
131}
132
133pub struct HazardDomain {
135 slots: Vec<HazardSlot>,
137 active_count: AtomicUsize,
139 slot_registry: Mutex<Vec<(u64, usize)>>,
141}
142
143impl HazardDomain {
144 pub fn new(max_threads: usize) -> Self {
146 let capacity = max_threads * MAX_HAZARD_POINTERS;
147 let mut slots = Vec::with_capacity(capacity);
148 for _ in 0..capacity {
149 slots.push(HazardSlot::new());
150 }
151
152 Self {
153 slots,
154 active_count: AtomicUsize::new(0),
155 slot_registry: Mutex::new(Vec::new()),
156 }
157 }
158
159 fn acquire_slot(&self, thread_id: u64) -> Option<usize> {
161 {
163 let registry = self.slot_registry.lock();
164 for &(tid, idx) in registry.iter() {
165 if tid == thread_id {
166 return Some(idx);
167 }
168 }
169 }
170
171 for (idx, slot) in self.slots.iter().enumerate() {
173 if slot.acquire(thread_id) {
174 let mut registry = self.slot_registry.lock();
175 registry.push((thread_id, idx));
176 self.active_count.fetch_add(1, Ordering::Relaxed);
177 return Some(idx);
178 }
179 }
180
181 None
182 }
183
184 fn release_slot(&self, thread_id: u64, slot_idx: usize) {
186 if slot_idx < self.slots.len() {
187 self.slots[slot_idx].release();
188 }
189
190 let mut registry = self.slot_registry.lock();
191 registry.retain(|&(tid, _)| tid != thread_id);
192 self.active_count.fetch_sub(1, Ordering::Relaxed);
193 }
194
195 fn protect(&self, slot_idx: usize, ptr: *mut ()) {
197 if slot_idx < self.slots.len() {
198 self.slots[slot_idx].protect(ptr);
199 }
200 }
201
202 fn is_protected(&self, ptr: *mut ()) -> bool {
204 for slot in &self.slots {
205 if slot.is_protecting(ptr) {
206 return true;
207 }
208 }
209 false
210 }
211
212 pub fn active_count(&self) -> usize {
214 self.active_count.load(Ordering::Relaxed)
215 }
216}
217
218impl Default for HazardDomain {
219 fn default() -> Self {
220 Self::new(64) }
222}
223
224pub struct EpochDomain {
226 global_epoch: AtomicU64,
228 local_epochs: RwLock<Vec<AtomicU64>>,
230 limbo: Mutex<VecDeque<(u64, Vec<RetiredObject>)>>,
232 retired_count: AtomicUsize,
234}
235
236#[allow(dead_code)]
238struct RetiredObject {
239 ptr: *mut (),
240 destructor: fn(*mut ()),
241 size: usize,
242}
243
244unsafe impl Send for RetiredObject {}
247
248impl EpochDomain {
249 pub fn new() -> Self {
251 Self {
252 global_epoch: AtomicU64::new(0),
253 local_epochs: RwLock::new(Vec::new()),
254 limbo: Mutex::new(VecDeque::new()),
255 retired_count: AtomicUsize::new(0),
256 }
257 }
258
259 pub fn register_thread(&self) -> usize {
261 let mut epochs = self.local_epochs.write();
262 let idx = epochs.len();
263 epochs.push(AtomicU64::new(u64::MAX)); idx
265 }
266
267 pub fn pin(&self, thread_idx: usize) {
269 let current = self.global_epoch.load(Ordering::SeqCst);
270 let epochs = self.local_epochs.read();
271 if thread_idx < epochs.len() {
272 epochs[thread_idx].store(current, Ordering::SeqCst);
273 }
274 }
275
276 pub fn unpin(&self, thread_idx: usize) {
278 let epochs = self.local_epochs.read();
279 if thread_idx < epochs.len() {
280 epochs[thread_idx].store(u64::MAX, Ordering::SeqCst);
281 }
282 }
283
284 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
286 let current_epoch = self.global_epoch.load(Ordering::SeqCst);
287 let obj = RetiredObject {
288 ptr,
289 destructor,
290 size,
291 };
292
293 let mut limbo = self.limbo.lock();
294
295 if limbo.back().is_none_or(|(e, _)| *e != current_epoch) {
297 limbo.push_back((current_epoch, Vec::new()));
298 }
299
300 if let Some((_, objects)) = limbo.back_mut() {
301 objects.push(obj);
302 }
303
304 let count = self.retired_count.fetch_add(1, Ordering::Relaxed);
305
306 if count >= RECLAIM_THRESHOLD {
308 drop(limbo);
309 self.try_reclaim();
310 }
311 }
312
313 pub fn advance_epoch(&self) {
315 self.global_epoch.fetch_add(1, Ordering::SeqCst);
316 }
317
318 fn safe_epoch(&self) -> u64 {
320 let epochs = self.local_epochs.read();
321 let mut min = self.global_epoch.load(Ordering::SeqCst);
322
323 for epoch in epochs.iter() {
324 let e = epoch.load(Ordering::SeqCst);
325 if e != u64::MAX && e < min {
326 min = e;
327 }
328 }
329
330 min.saturating_sub(EPOCH_GRACE_PERIODS)
332 }
333
334 pub fn try_reclaim(&self) -> usize {
336 let safe = self.safe_epoch();
337 let mut reclaimed = 0;
338
339 let mut limbo = self.limbo.lock();
340
341 while let Some((epoch, _)) = limbo.front() {
342 if *epoch > safe {
343 break;
344 }
345
346 if let Some((_, objects)) = limbo.pop_front() {
347 for obj in objects {
348 (obj.destructor)(obj.ptr);
350 reclaimed += 1;
351 self.retired_count.fetch_sub(1, Ordering::Relaxed);
352 }
353 }
354 }
355
356 reclaimed
357 }
358
359 pub fn current_epoch(&self) -> u64 {
361 self.global_epoch.load(Ordering::SeqCst)
362 }
363
364 pub fn pending_count(&self) -> usize {
366 self.retired_count.load(Ordering::Relaxed)
367 }
368}
369
370impl Default for EpochDomain {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376pub struct HazardGuard<'a> {
378 domain: &'a HazardDomain,
379 slot_idx: usize,
380 thread_id: u64,
381}
382
383impl<'a> HazardGuard<'a> {
384 pub fn protect(&self, ptr: *mut ()) {
386 self.domain.protect(self.slot_idx, ptr);
387 }
388
389 pub fn protect_typed<T>(&self, ptr: *mut T) {
391 self.protect(ptr as *mut ());
392 }
393}
394
395impl<'a> Drop for HazardGuard<'a> {
396 fn drop(&mut self) {
397 self.domain.release_slot(self.thread_id, self.slot_idx);
398 }
399}
400
401pub struct EpochGuard<'a> {
403 domain: &'a EpochDomain,
404 thread_idx: usize,
405}
406
407impl<'a> Drop for EpochGuard<'a> {
408 fn drop(&mut self) {
409 self.domain.unpin(self.thread_idx);
410 }
411}
412
413#[derive(Debug, Clone, Copy, PartialEq, Eq)]
415pub enum ReclaimStrategy {
416 HazardPointer,
418 Epoch,
420 Auto,
422}
423
424pub struct UnifiedReclaimer {
426 hazard: Arc<HazardDomain>,
427 epoch: Arc<EpochDomain>,
428 thread_epochs: Mutex<std::collections::HashMap<u64, usize>>,
430 default_strategy: ReclaimStrategy,
432 stats: ReclaimStats,
434}
435
436#[derive(Debug, Default)]
438pub struct ReclaimStats {
439 pub hazard_pins: AtomicU64,
440 pub epoch_pins: AtomicU64,
441 pub objects_retired: AtomicU64,
442 pub objects_reclaimed: AtomicU64,
443 pub reclaim_cycles: AtomicU64,
444}
445
446impl ReclaimStats {
447 fn record_hazard_pin(&self) {
448 self.hazard_pins.fetch_add(1, Ordering::Relaxed);
449 }
450
451 fn record_epoch_pin(&self) {
452 self.epoch_pins.fetch_add(1, Ordering::Relaxed);
453 }
454
455 fn record_retire(&self) {
456 self.objects_retired.fetch_add(1, Ordering::Relaxed);
457 }
458
459 fn record_reclaim(&self, count: usize) {
460 self.objects_reclaimed
461 .fetch_add(count as u64, Ordering::Relaxed);
462 self.reclaim_cycles.fetch_add(1, Ordering::Relaxed);
463 }
464
465 pub fn snapshot(&self) -> ReclaimStatsSnapshot {
466 ReclaimStatsSnapshot {
467 hazard_pins: self.hazard_pins.load(Ordering::Relaxed),
468 epoch_pins: self.epoch_pins.load(Ordering::Relaxed),
469 objects_retired: self.objects_retired.load(Ordering::Relaxed),
470 objects_reclaimed: self.objects_reclaimed.load(Ordering::Relaxed),
471 reclaim_cycles: self.reclaim_cycles.load(Ordering::Relaxed),
472 }
473 }
474}
475
476#[derive(Debug, Clone)]
477pub struct ReclaimStatsSnapshot {
478 pub hazard_pins: u64,
479 pub epoch_pins: u64,
480 pub objects_retired: u64,
481 pub objects_reclaimed: u64,
482 pub reclaim_cycles: u64,
483}
484
485impl UnifiedReclaimer {
486 pub fn new() -> Self {
488 Self {
489 hazard: Arc::new(HazardDomain::default()),
490 epoch: Arc::new(EpochDomain::default()),
491 thread_epochs: Mutex::new(std::collections::HashMap::new()),
492 default_strategy: ReclaimStrategy::Auto,
493 stats: ReclaimStats::default(),
494 }
495 }
496
497 pub fn with_capacity(max_threads: usize) -> Self {
499 Self {
500 hazard: Arc::new(HazardDomain::new(max_threads)),
501 epoch: Arc::new(EpochDomain::default()),
502 thread_epochs: Mutex::new(std::collections::HashMap::new()),
503 default_strategy: ReclaimStrategy::Auto,
504 stats: ReclaimStats::default(),
505 }
506 }
507
508 pub fn with_strategy(mut self, strategy: ReclaimStrategy) -> Self {
510 self.default_strategy = strategy;
511 self
512 }
513
514 pub fn pin_hazard(&self) -> Option<HazardGuard<'_>> {
516 let thread_id = self.current_thread_id();
517 let slot_idx = self.hazard.acquire_slot(thread_id)?;
518
519 self.stats.record_hazard_pin();
520
521 Some(HazardGuard {
522 domain: &self.hazard,
523 slot_idx,
524 thread_id,
525 })
526 }
527
528 pub fn pin_epoch(&self) -> EpochGuard<'_> {
530 let thread_id = self.current_thread_id();
531
532 let thread_idx = {
533 let mut epochs = self.thread_epochs.lock();
534 *epochs
535 .entry(thread_id)
536 .or_insert_with(|| self.epoch.register_thread())
537 };
538
539 self.epoch.pin(thread_idx);
540 self.stats.record_epoch_pin();
541
542 EpochGuard {
543 domain: &self.epoch,
544 thread_idx,
545 }
546 }
547
548 pub unsafe fn retire<T>(&self, ptr: *mut T) {
554 let destructor = |p: *mut ()| {
555 unsafe { drop(Box::from_raw(p as *mut T)) };
557 };
558
559 self.epoch
560 .retire(ptr as *mut (), destructor, std::mem::size_of::<T>());
561 self.stats.record_retire();
562 }
563
564 pub fn retire_with_destructor(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
566 self.epoch.retire(ptr, destructor, size);
567 self.stats.record_retire();
568 }
569
570 pub fn is_protected(&self, ptr: *mut ()) -> bool {
572 self.hazard.is_protected(ptr)
573 }
574
575 pub fn try_reclaim(&self) -> usize {
577 let reclaimed = self.epoch.try_reclaim();
578 if reclaimed > 0 {
579 self.stats.record_reclaim(reclaimed);
580 }
581 reclaimed
582 }
583
584 pub fn advance_epoch(&self) {
586 self.epoch.advance_epoch();
587 }
588
589 pub fn current_epoch(&self) -> u64 {
591 self.epoch.current_epoch()
592 }
593
594 pub fn pending_count(&self) -> usize {
596 self.epoch.pending_count()
597 }
598
599 pub fn stats(&self) -> ReclaimStatsSnapshot {
601 self.stats.snapshot()
602 }
603
604 fn current_thread_id(&self) -> u64 {
606 use std::hash::{Hash, Hasher};
608 let mut hasher = std::collections::hash_map::DefaultHasher::new();
609 std::thread::current().id().hash(&mut hasher);
610 hasher.finish()
611 }
612}
613
614impl Default for UnifiedReclaimer {
615 fn default() -> Self {
616 Self::new()
617 }
618}
619
620pub struct ThreadLocalReclaimer {
622 reclaimer: Arc<UnifiedReclaimer>,
623 hazard_slot: Option<usize>,
624 epoch_idx: usize,
625 thread_id: u64,
626}
627
628impl ThreadLocalReclaimer {
629 pub fn new(reclaimer: Arc<UnifiedReclaimer>) -> Self {
631 use std::hash::{Hash, Hasher};
633 let mut hasher = std::collections::hash_map::DefaultHasher::new();
634 std::thread::current().id().hash(&mut hasher);
635 let thread_id = hasher.finish();
636
637 let epoch_idx = reclaimer.epoch.register_thread();
638
639 Self {
640 reclaimer,
641 hazard_slot: None,
642 epoch_idx,
643 thread_id,
644 }
645 }
646
647 pub fn pin_hazard(&mut self) -> bool {
649 if self.hazard_slot.is_some() {
650 return true;
651 }
652
653 if let Some(slot) = self.reclaimer.hazard.acquire_slot(self.thread_id) {
654 self.hazard_slot = Some(slot);
655 true
656 } else {
657 false
658 }
659 }
660
661 pub fn protect(&self, ptr: *mut ()) {
663 if let Some(slot) = self.hazard_slot {
664 self.reclaimer.hazard.protect(slot, ptr);
665 }
666 }
667
668 pub fn pin_epoch(&self) {
670 self.reclaimer.epoch.pin(self.epoch_idx);
671 }
672
673 pub fn unpin_epoch(&self) {
675 self.reclaimer.epoch.unpin(self.epoch_idx);
676 }
677
678 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
680 self.reclaimer.epoch.retire(ptr, destructor, size);
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use std::sync::atomic::AtomicBool;
688
689 #[test]
690 fn test_hazard_domain_basic() {
691 let domain = HazardDomain::new(4);
692
693 let thread_id = 12345u64;
694 let slot = domain.acquire_slot(thread_id).unwrap();
695
696 assert_eq!(domain.active_count(), 1);
697
698 let data = Box::into_raw(Box::new(42u64));
700 domain.protect(slot, data as *mut ());
701
702 assert!(domain.is_protected(data as *mut ()));
703
704 domain.release_slot(thread_id, slot);
705 assert_eq!(domain.active_count(), 0);
706 assert!(!domain.is_protected(data as *mut ()));
707
708 unsafe { drop(Box::from_raw(data)) };
710 }
711
712 #[test]
713 fn test_epoch_domain_basic() {
714 let domain = EpochDomain::new();
715
716 let idx = domain.register_thread();
717 assert_eq!(domain.current_epoch(), 0);
718
719 domain.pin(idx);
720 domain.advance_epoch();
721 assert_eq!(domain.current_epoch(), 1);
722
723 domain.unpin(idx);
724 }
725
726 #[test]
727 fn test_epoch_retirement() {
728 static DROPPED: AtomicBool = AtomicBool::new(false);
729 DROPPED.store(false, Ordering::SeqCst); fn drop_test(ptr: *mut ()) {
732 DROPPED.store(true, Ordering::SeqCst);
733 unsafe { drop(Box::from_raw(ptr as *mut u64)) };
734 }
735
736 let domain = EpochDomain::new();
737 let idx = domain.register_thread();
738
739 domain.pin(idx);
741 let data = Box::into_raw(Box::new(42u64));
742 domain.retire(data as *mut (), drop_test, 8);
743
744 let reclaimed_while_pinned = domain.try_reclaim();
747
748 domain.unpin(idx);
750 for _ in 0..EPOCH_GRACE_PERIODS + 2 {
751 domain.advance_epoch();
752 }
753
754 let reclaimed = domain.try_reclaim();
756
757 assert!(DROPPED.load(Ordering::SeqCst) || reclaimed > 0 || reclaimed_while_pinned > 0);
760 }
761
762 #[test]
763 fn test_unified_reclaimer_hazard() {
764 let reclaimer = UnifiedReclaimer::new();
765
766 let guard = reclaimer.pin_hazard();
767 assert!(guard.is_some());
768
769 let guard = guard.unwrap();
770 let data = Box::into_raw(Box::new(String::from("test")));
771 guard.protect_typed(data);
772
773 assert!(reclaimer.is_protected(data as *mut ()));
774
775 drop(guard);
776 assert!(!reclaimer.is_protected(data as *mut ()));
777
778 unsafe { drop(Box::from_raw(data)) };
780 }
781
782 #[test]
783 fn test_unified_reclaimer_epoch() {
784 let reclaimer = UnifiedReclaimer::new();
785
786 {
787 let _guard = reclaimer.pin_epoch();
788 assert_eq!(reclaimer.current_epoch(), 0);
789 }
790
791 reclaimer.advance_epoch();
792 assert_eq!(reclaimer.current_epoch(), 1);
793 }
794
795 #[test]
796 fn test_stats_tracking() {
797 let reclaimer = UnifiedReclaimer::new();
798
799 {
800 let _guard = reclaimer.pin_epoch();
801 }
802
803 let _ = reclaimer.pin_hazard();
804
805 let stats = reclaimer.stats();
806 assert!(stats.epoch_pins >= 1);
807 assert!(stats.hazard_pins >= 1);
808 }
809
810 #[test]
811 fn test_thread_local_reclaimer() {
812 let reclaimer = Arc::new(UnifiedReclaimer::new());
813 let mut local = ThreadLocalReclaimer::new(Arc::clone(&reclaimer));
814
815 assert!(local.pin_hazard());
816
817 let data = Box::into_raw(Box::new(100u32));
818 local.protect(data as *mut ());
819
820 assert!(reclaimer.is_protected(data as *mut ()));
821
822 unsafe { drop(Box::from_raw(data)) };
824 }
825
826 #[test]
827 fn test_multiple_hazard_slots() {
828 let domain = HazardDomain::new(2);
829
830 let slot1 = domain.acquire_slot(1).unwrap();
831 let slot2 = domain.acquire_slot(2).unwrap();
832
833 let data1 = Box::into_raw(Box::new(1u64));
834 let data2 = Box::into_raw(Box::new(2u64));
835
836 domain.protect(slot1, data1 as *mut ());
837 domain.protect(slot2, data2 as *mut ());
838
839 assert!(domain.is_protected(data1 as *mut ()));
840 assert!(domain.is_protected(data2 as *mut ()));
841
842 domain.release_slot(1, slot1);
843 assert!(!domain.is_protected(data1 as *mut ()));
844 assert!(domain.is_protected(data2 as *mut ()));
845
846 domain.release_slot(2, slot2);
847
848 unsafe {
850 drop(Box::from_raw(data1));
851 drop(Box::from_raw(data2));
852 }
853 }
854
855 #[test]
856 fn test_reclaim_stats_snapshot() {
857 let stats = ReclaimStats::default();
858
859 stats.record_hazard_pin();
860 stats.record_hazard_pin();
861 stats.record_epoch_pin();
862 stats.record_retire();
863 stats.record_reclaim(5);
864
865 let snapshot = stats.snapshot();
866 assert_eq!(snapshot.hazard_pins, 2);
867 assert_eq!(snapshot.epoch_pins, 1);
868 assert_eq!(snapshot.objects_retired, 1);
869 assert_eq!(snapshot.objects_reclaimed, 5);
870 assert_eq!(snapshot.reclaim_cycles, 1);
871 }
872
873 #[test]
874 fn test_strategy_configuration() {
875 let reclaimer = UnifiedReclaimer::new().with_strategy(ReclaimStrategy::Epoch);
876
877 assert_eq!(reclaimer.default_strategy, ReclaimStrategy::Epoch);
878 }
879}