1use parking_lot::{Mutex, RwLock};
74use std::collections::VecDeque;
75use std::sync::Arc;
76use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
77
78const MAX_HAZARD_POINTERS: usize = 4;
80
81const EPOCH_GRACE_PERIODS: u64 = 2;
83
84const RECLAIM_THRESHOLD: usize = 64;
86
87#[derive(Debug)]
89struct HazardSlot {
90 ptr: AtomicPtr<()>,
92 owner: AtomicU64,
94}
95
96impl HazardSlot {
97 fn new() -> Self {
98 Self {
99 ptr: AtomicPtr::new(std::ptr::null_mut()),
100 owner: AtomicU64::new(0),
101 }
102 }
103
104 fn acquire(&self, thread_id: u64) -> bool {
105 self.owner
106 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
107 .is_ok()
108 }
109
110 fn release(&self) {
111 self.ptr.store(std::ptr::null_mut(), Ordering::Release);
112 self.owner.store(0, Ordering::Release);
113 }
114
115 fn protect(&self, ptr: *mut ()) {
116 self.ptr.store(ptr, Ordering::Release);
117 }
118
119 fn is_protecting(&self, ptr: *mut ()) -> bool {
120 self.ptr.load(Ordering::Acquire) == ptr
121 }
122}
123
124pub struct HazardDomain {
126 slots: Vec<HazardSlot>,
128 active_count: AtomicUsize,
130 slot_registry: Mutex<Vec<(u64, usize)>>,
132}
133
134impl HazardDomain {
135 pub fn new(max_threads: usize) -> Self {
137 let capacity = max_threads * MAX_HAZARD_POINTERS;
138 let mut slots = Vec::with_capacity(capacity);
139 for _ in 0..capacity {
140 slots.push(HazardSlot::new());
141 }
142
143 Self {
144 slots,
145 active_count: AtomicUsize::new(0),
146 slot_registry: Mutex::new(Vec::new()),
147 }
148 }
149
150 fn acquire_slot(&self, thread_id: u64) -> Option<usize> {
152 {
154 let registry = self.slot_registry.lock();
155 for &(tid, idx) in registry.iter() {
156 if tid == thread_id {
157 return Some(idx);
158 }
159 }
160 }
161
162 for (idx, slot) in self.slots.iter().enumerate() {
164 if slot.acquire(thread_id) {
165 let mut registry = self.slot_registry.lock();
166 registry.push((thread_id, idx));
167 self.active_count.fetch_add(1, Ordering::Relaxed);
168 return Some(idx);
169 }
170 }
171
172 None
173 }
174
175 fn release_slot(&self, thread_id: u64, slot_idx: usize) {
177 if slot_idx < self.slots.len() {
178 self.slots[slot_idx].release();
179 }
180
181 let mut registry = self.slot_registry.lock();
182 registry.retain(|&(tid, _)| tid != thread_id);
183 self.active_count.fetch_sub(1, Ordering::Relaxed);
184 }
185
186 fn protect(&self, slot_idx: usize, ptr: *mut ()) {
188 if slot_idx < self.slots.len() {
189 self.slots[slot_idx].protect(ptr);
190 }
191 }
192
193 fn is_protected(&self, ptr: *mut ()) -> bool {
195 for slot in &self.slots {
196 if slot.is_protecting(ptr) {
197 return true;
198 }
199 }
200 false
201 }
202
203 pub fn active_count(&self) -> usize {
205 self.active_count.load(Ordering::Relaxed)
206 }
207}
208
209impl Default for HazardDomain {
210 fn default() -> Self {
211 Self::new(64) }
213}
214
215pub struct EpochDomain {
217 global_epoch: AtomicU64,
219 local_epochs: RwLock<Vec<AtomicU64>>,
221 limbo: Mutex<VecDeque<(u64, Vec<RetiredObject>)>>,
223 retired_count: AtomicUsize,
225}
226
227#[allow(dead_code)]
229struct RetiredObject {
230 ptr: *mut (),
231 destructor: fn(*mut ()),
232 size: usize,
233}
234
235unsafe impl Send for RetiredObject {}
238
239impl EpochDomain {
240 pub fn new() -> Self {
242 Self {
243 global_epoch: AtomicU64::new(0),
244 local_epochs: RwLock::new(Vec::new()),
245 limbo: Mutex::new(VecDeque::new()),
246 retired_count: AtomicUsize::new(0),
247 }
248 }
249
250 pub fn register_thread(&self) -> usize {
252 let mut epochs = self.local_epochs.write();
253 let idx = epochs.len();
254 epochs.push(AtomicU64::new(u64::MAX)); idx
256 }
257
258 pub fn pin(&self, thread_idx: usize) {
260 let current = self.global_epoch.load(Ordering::SeqCst);
261 let epochs = self.local_epochs.read();
262 if thread_idx < epochs.len() {
263 epochs[thread_idx].store(current, Ordering::SeqCst);
264 }
265 }
266
267 pub fn unpin(&self, thread_idx: usize) {
269 let epochs = self.local_epochs.read();
270 if thread_idx < epochs.len() {
271 epochs[thread_idx].store(u64::MAX, Ordering::SeqCst);
272 }
273 }
274
275 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
277 let current_epoch = self.global_epoch.load(Ordering::SeqCst);
278 let obj = RetiredObject {
279 ptr,
280 destructor,
281 size,
282 };
283
284 let mut limbo = self.limbo.lock();
285
286 if limbo.back().is_none_or(|(e, _)| *e != current_epoch) {
288 limbo.push_back((current_epoch, Vec::new()));
289 }
290
291 if let Some((_, objects)) = limbo.back_mut() {
292 objects.push(obj);
293 }
294
295 let count = self.retired_count.fetch_add(1, Ordering::Relaxed);
296
297 if count >= RECLAIM_THRESHOLD {
299 drop(limbo);
300 self.try_reclaim();
301 }
302 }
303
304 pub fn advance_epoch(&self) {
306 self.global_epoch.fetch_add(1, Ordering::SeqCst);
307 }
308
309 fn safe_epoch(&self) -> u64 {
311 let epochs = self.local_epochs.read();
312 let mut min = self.global_epoch.load(Ordering::SeqCst);
313
314 for epoch in epochs.iter() {
315 let e = epoch.load(Ordering::SeqCst);
316 if e != u64::MAX && e < min {
317 min = e;
318 }
319 }
320
321 min.saturating_sub(EPOCH_GRACE_PERIODS)
323 }
324
325 pub fn try_reclaim(&self) -> usize {
327 let safe = self.safe_epoch();
328 let mut reclaimed = 0;
329
330 let mut limbo = self.limbo.lock();
331
332 while let Some((epoch, _)) = limbo.front() {
333 if *epoch > safe {
334 break;
335 }
336
337 if let Some((_, objects)) = limbo.pop_front() {
338 for obj in objects {
339 (obj.destructor)(obj.ptr);
341 reclaimed += 1;
342 self.retired_count.fetch_sub(1, Ordering::Relaxed);
343 }
344 }
345 }
346
347 reclaimed
348 }
349
350 pub fn current_epoch(&self) -> u64 {
352 self.global_epoch.load(Ordering::SeqCst)
353 }
354
355 pub fn pending_count(&self) -> usize {
357 self.retired_count.load(Ordering::Relaxed)
358 }
359}
360
361impl Default for EpochDomain {
362 fn default() -> Self {
363 Self::new()
364 }
365}
366
367pub struct HazardGuard<'a> {
369 domain: &'a HazardDomain,
370 slot_idx: usize,
371 thread_id: u64,
372}
373
374impl<'a> HazardGuard<'a> {
375 pub fn protect(&self, ptr: *mut ()) {
377 self.domain.protect(self.slot_idx, ptr);
378 }
379
380 pub fn protect_typed<T>(&self, ptr: *mut T) {
382 self.protect(ptr as *mut ());
383 }
384}
385
386impl<'a> Drop for HazardGuard<'a> {
387 fn drop(&mut self) {
388 self.domain.release_slot(self.thread_id, self.slot_idx);
389 }
390}
391
392pub struct EpochGuard<'a> {
394 domain: &'a EpochDomain,
395 thread_idx: usize,
396}
397
398impl<'a> Drop for EpochGuard<'a> {
399 fn drop(&mut self) {
400 self.domain.unpin(self.thread_idx);
401 }
402}
403
404#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum ReclaimStrategy {
407 HazardPointer,
409 Epoch,
411 Auto,
413}
414
415pub struct UnifiedReclaimer {
417 hazard: Arc<HazardDomain>,
418 epoch: Arc<EpochDomain>,
419 thread_epochs: Mutex<std::collections::HashMap<u64, usize>>,
421 default_strategy: ReclaimStrategy,
423 stats: ReclaimStats,
425}
426
427#[derive(Debug, Default)]
429pub struct ReclaimStats {
430 pub hazard_pins: AtomicU64,
431 pub epoch_pins: AtomicU64,
432 pub objects_retired: AtomicU64,
433 pub objects_reclaimed: AtomicU64,
434 pub reclaim_cycles: AtomicU64,
435}
436
437impl ReclaimStats {
438 fn record_hazard_pin(&self) {
439 self.hazard_pins.fetch_add(1, Ordering::Relaxed);
440 }
441
442 fn record_epoch_pin(&self) {
443 self.epoch_pins.fetch_add(1, Ordering::Relaxed);
444 }
445
446 fn record_retire(&self) {
447 self.objects_retired.fetch_add(1, Ordering::Relaxed);
448 }
449
450 fn record_reclaim(&self, count: usize) {
451 self.objects_reclaimed
452 .fetch_add(count as u64, Ordering::Relaxed);
453 self.reclaim_cycles.fetch_add(1, Ordering::Relaxed);
454 }
455
456 pub fn snapshot(&self) -> ReclaimStatsSnapshot {
457 ReclaimStatsSnapshot {
458 hazard_pins: self.hazard_pins.load(Ordering::Relaxed),
459 epoch_pins: self.epoch_pins.load(Ordering::Relaxed),
460 objects_retired: self.objects_retired.load(Ordering::Relaxed),
461 objects_reclaimed: self.objects_reclaimed.load(Ordering::Relaxed),
462 reclaim_cycles: self.reclaim_cycles.load(Ordering::Relaxed),
463 }
464 }
465}
466
467#[derive(Debug, Clone)]
468pub struct ReclaimStatsSnapshot {
469 pub hazard_pins: u64,
470 pub epoch_pins: u64,
471 pub objects_retired: u64,
472 pub objects_reclaimed: u64,
473 pub reclaim_cycles: u64,
474}
475
476impl UnifiedReclaimer {
477 pub fn new() -> Self {
479 Self {
480 hazard: Arc::new(HazardDomain::default()),
481 epoch: Arc::new(EpochDomain::default()),
482 thread_epochs: Mutex::new(std::collections::HashMap::new()),
483 default_strategy: ReclaimStrategy::Auto,
484 stats: ReclaimStats::default(),
485 }
486 }
487
488 pub fn with_capacity(max_threads: usize) -> Self {
490 Self {
491 hazard: Arc::new(HazardDomain::new(max_threads)),
492 epoch: Arc::new(EpochDomain::default()),
493 thread_epochs: Mutex::new(std::collections::HashMap::new()),
494 default_strategy: ReclaimStrategy::Auto,
495 stats: ReclaimStats::default(),
496 }
497 }
498
499 pub fn with_strategy(mut self, strategy: ReclaimStrategy) -> Self {
501 self.default_strategy = strategy;
502 self
503 }
504
505 pub fn pin_hazard(&self) -> Option<HazardGuard<'_>> {
507 let thread_id = self.current_thread_id();
508 let slot_idx = self.hazard.acquire_slot(thread_id)?;
509
510 self.stats.record_hazard_pin();
511
512 Some(HazardGuard {
513 domain: &self.hazard,
514 slot_idx,
515 thread_id,
516 })
517 }
518
519 pub fn pin_epoch(&self) -> EpochGuard<'_> {
521 let thread_id = self.current_thread_id();
522
523 let thread_idx = {
524 let mut epochs = self.thread_epochs.lock();
525 *epochs
526 .entry(thread_id)
527 .or_insert_with(|| self.epoch.register_thread())
528 };
529
530 self.epoch.pin(thread_idx);
531 self.stats.record_epoch_pin();
532
533 EpochGuard {
534 domain: &self.epoch,
535 thread_idx,
536 }
537 }
538
539 pub unsafe fn retire<T>(&self, ptr: *mut T) {
545 let destructor = |p: *mut ()| {
546 unsafe { drop(Box::from_raw(p as *mut T)) };
548 };
549
550 self.epoch
551 .retire(ptr as *mut (), destructor, std::mem::size_of::<T>());
552 self.stats.record_retire();
553 }
554
555 pub fn retire_with_destructor(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
557 self.epoch.retire(ptr, destructor, size);
558 self.stats.record_retire();
559 }
560
561 pub fn is_protected(&self, ptr: *mut ()) -> bool {
563 self.hazard.is_protected(ptr)
564 }
565
566 pub fn try_reclaim(&self) -> usize {
568 let reclaimed = self.epoch.try_reclaim();
569 if reclaimed > 0 {
570 self.stats.record_reclaim(reclaimed);
571 }
572 reclaimed
573 }
574
575 pub fn advance_epoch(&self) {
577 self.epoch.advance_epoch();
578 }
579
580 pub fn current_epoch(&self) -> u64 {
582 self.epoch.current_epoch()
583 }
584
585 pub fn pending_count(&self) -> usize {
587 self.epoch.pending_count()
588 }
589
590 pub fn stats(&self) -> ReclaimStatsSnapshot {
592 self.stats.snapshot()
593 }
594
595 fn current_thread_id(&self) -> u64 {
597 use std::hash::{Hash, Hasher};
599 let mut hasher = std::collections::hash_map::DefaultHasher::new();
600 std::thread::current().id().hash(&mut hasher);
601 hasher.finish()
602 }
603}
604
605impl Default for UnifiedReclaimer {
606 fn default() -> Self {
607 Self::new()
608 }
609}
610
611pub struct ThreadLocalReclaimer {
613 reclaimer: Arc<UnifiedReclaimer>,
614 hazard_slot: Option<usize>,
615 epoch_idx: usize,
616 thread_id: u64,
617}
618
619impl ThreadLocalReclaimer {
620 pub fn new(reclaimer: Arc<UnifiedReclaimer>) -> Self {
622 use std::hash::{Hash, Hasher};
624 let mut hasher = std::collections::hash_map::DefaultHasher::new();
625 std::thread::current().id().hash(&mut hasher);
626 let thread_id = hasher.finish();
627
628 let epoch_idx = reclaimer.epoch.register_thread();
629
630 Self {
631 reclaimer,
632 hazard_slot: None,
633 epoch_idx,
634 thread_id,
635 }
636 }
637
638 pub fn pin_hazard(&mut self) -> bool {
640 if self.hazard_slot.is_some() {
641 return true;
642 }
643
644 if let Some(slot) = self.reclaimer.hazard.acquire_slot(self.thread_id) {
645 self.hazard_slot = Some(slot);
646 true
647 } else {
648 false
649 }
650 }
651
652 pub fn protect(&self, ptr: *mut ()) {
654 if let Some(slot) = self.hazard_slot {
655 self.reclaimer.hazard.protect(slot, ptr);
656 }
657 }
658
659 pub fn pin_epoch(&self) {
661 self.reclaimer.epoch.pin(self.epoch_idx);
662 }
663
664 pub fn unpin_epoch(&self) {
666 self.reclaimer.epoch.unpin(self.epoch_idx);
667 }
668
669 pub fn retire(&self, ptr: *mut (), destructor: fn(*mut ()), size: usize) {
671 self.reclaimer.epoch.retire(ptr, destructor, size);
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use std::sync::atomic::AtomicBool;
679
680 #[test]
681 fn test_hazard_domain_basic() {
682 let domain = HazardDomain::new(4);
683
684 let thread_id = 12345u64;
685 let slot = domain.acquire_slot(thread_id).unwrap();
686
687 assert_eq!(domain.active_count(), 1);
688
689 let data = Box::into_raw(Box::new(42u64));
691 domain.protect(slot, data as *mut ());
692
693 assert!(domain.is_protected(data as *mut ()));
694
695 domain.release_slot(thread_id, slot);
696 assert_eq!(domain.active_count(), 0);
697 assert!(!domain.is_protected(data as *mut ()));
698
699 unsafe { drop(Box::from_raw(data)) };
701 }
702
703 #[test]
704 fn test_epoch_domain_basic() {
705 let domain = EpochDomain::new();
706
707 let idx = domain.register_thread();
708 assert_eq!(domain.current_epoch(), 0);
709
710 domain.pin(idx);
711 domain.advance_epoch();
712 assert_eq!(domain.current_epoch(), 1);
713
714 domain.unpin(idx);
715 }
716
717 #[test]
718 fn test_epoch_retirement() {
719 static DROPPED: AtomicBool = AtomicBool::new(false);
720 DROPPED.store(false, Ordering::SeqCst); fn drop_test(ptr: *mut ()) {
723 DROPPED.store(true, Ordering::SeqCst);
724 unsafe { drop(Box::from_raw(ptr as *mut u64)) };
725 }
726
727 let domain = EpochDomain::new();
728 let idx = domain.register_thread();
729
730 domain.pin(idx);
732 let data = Box::into_raw(Box::new(42u64));
733 domain.retire(data as *mut (), drop_test, 8);
734
735 let reclaimed_while_pinned = domain.try_reclaim();
738
739 domain.unpin(idx);
741 for _ in 0..EPOCH_GRACE_PERIODS + 2 {
742 domain.advance_epoch();
743 }
744
745 let reclaimed = domain.try_reclaim();
747
748 assert!(DROPPED.load(Ordering::SeqCst) || reclaimed > 0 || reclaimed_while_pinned > 0);
751 }
752
753 #[test]
754 fn test_unified_reclaimer_hazard() {
755 let reclaimer = UnifiedReclaimer::new();
756
757 let guard = reclaimer.pin_hazard();
758 assert!(guard.is_some());
759
760 let guard = guard.unwrap();
761 let data = Box::into_raw(Box::new(String::from("test")));
762 guard.protect_typed(data);
763
764 assert!(reclaimer.is_protected(data as *mut ()));
765
766 drop(guard);
767 assert!(!reclaimer.is_protected(data as *mut ()));
768
769 unsafe { drop(Box::from_raw(data)) };
771 }
772
773 #[test]
774 fn test_unified_reclaimer_epoch() {
775 let reclaimer = UnifiedReclaimer::new();
776
777 {
778 let _guard = reclaimer.pin_epoch();
779 assert_eq!(reclaimer.current_epoch(), 0);
780 }
781
782 reclaimer.advance_epoch();
783 assert_eq!(reclaimer.current_epoch(), 1);
784 }
785
786 #[test]
787 fn test_stats_tracking() {
788 let reclaimer = UnifiedReclaimer::new();
789
790 {
791 let _guard = reclaimer.pin_epoch();
792 }
793
794 let _ = reclaimer.pin_hazard();
795
796 let stats = reclaimer.stats();
797 assert!(stats.epoch_pins >= 1);
798 assert!(stats.hazard_pins >= 1);
799 }
800
801 #[test]
802 fn test_thread_local_reclaimer() {
803 let reclaimer = Arc::new(UnifiedReclaimer::new());
804 let mut local = ThreadLocalReclaimer::new(Arc::clone(&reclaimer));
805
806 assert!(local.pin_hazard());
807
808 let data = Box::into_raw(Box::new(100u32));
809 local.protect(data as *mut ());
810
811 assert!(reclaimer.is_protected(data as *mut ()));
812
813 unsafe { drop(Box::from_raw(data)) };
815 }
816
817 #[test]
818 fn test_multiple_hazard_slots() {
819 let domain = HazardDomain::new(2);
820
821 let slot1 = domain.acquire_slot(1).unwrap();
822 let slot2 = domain.acquire_slot(2).unwrap();
823
824 let data1 = Box::into_raw(Box::new(1u64));
825 let data2 = Box::into_raw(Box::new(2u64));
826
827 domain.protect(slot1, data1 as *mut ());
828 domain.protect(slot2, data2 as *mut ());
829
830 assert!(domain.is_protected(data1 as *mut ()));
831 assert!(domain.is_protected(data2 as *mut ()));
832
833 domain.release_slot(1, slot1);
834 assert!(!domain.is_protected(data1 as *mut ()));
835 assert!(domain.is_protected(data2 as *mut ()));
836
837 domain.release_slot(2, slot2);
838
839 unsafe {
841 drop(Box::from_raw(data1));
842 drop(Box::from_raw(data2));
843 }
844 }
845
846 #[test]
847 fn test_reclaim_stats_snapshot() {
848 let stats = ReclaimStats::default();
849
850 stats.record_hazard_pin();
851 stats.record_hazard_pin();
852 stats.record_epoch_pin();
853 stats.record_retire();
854 stats.record_reclaim(5);
855
856 let snapshot = stats.snapshot();
857 assert_eq!(snapshot.hazard_pins, 2);
858 assert_eq!(snapshot.epoch_pins, 1);
859 assert_eq!(snapshot.objects_retired, 1);
860 assert_eq!(snapshot.objects_reclaimed, 5);
861 assert_eq!(snapshot.reclaim_cycles, 1);
862 }
863
864 #[test]
865 fn test_strategy_configuration() {
866 let reclaimer = UnifiedReclaimer::new().with_strategy(ReclaimStrategy::Epoch);
867
868 assert_eq!(reclaimer.default_strategy, ReclaimStrategy::Epoch);
869 }
870}