use super::augmented::fence as augmented_fence;
use super::augmented::thread_local as augmented_thread_local;
use super::augmented::AtomicPtr as AugmentedAtomicPtr;
use super::augmented::AtomicU8 as AugmentedAtomicU8;
use super::exit_guard::ExitGuard;
use super::{Collectible, Epoch, Tag};
use std::ptr::{self, NonNull};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
#[derive(Debug)]
pub(super) struct Collector {
state: AugmentedAtomicU8,
announcement: Epoch,
next_epoch_update: u8,
has_garbage: bool,
num_readers: u32,
previous_instance_link: Option<NonNull<dyn Collectible>>,
current_instance_link: Option<NonNull<dyn Collectible>>,
next_instance_link: Option<NonNull<dyn Collectible>>,
next_link: AtomicPtr<Collector>,
link: Option<NonNull<dyn Collectible>>,
}
impl Collector {
const CADENCE: u8 = u8::MAX;
const INACTIVE: u8 = 1_u8 << 2;
const INVALID: u8 = 1_u8 << 3;
#[inline]
pub(super) fn new_guard(&mut self, collect_garbage: bool) {
if self.num_readers == 0 {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, Self::INACTIVE);
self.num_readers = 1;
let new_epoch = Epoch::from_u8(epoch().load(Relaxed));
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
self.state.swap(new_epoch.into(), SeqCst);
} else {
self.state.store(new_epoch.into(), Relaxed);
augmented_fence(SeqCst);
}
if self.announcement != new_epoch {
self.announcement = new_epoch;
if collect_garbage {
let mut exit_guard = ExitGuard::new((self, false), |(guard, result)| {
if !result {
guard.end_guard();
}
});
exit_guard.0.epoch_updated();
exit_guard.1 = true;
}
}
} else {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
assert_ne!(self.num_readers, u32::MAX, "Too many EBR guards");
self.num_readers += 1;
}
}
#[inline]
pub(super) fn accelerate(&mut self) {
mark_scan_enforced();
self.next_epoch_update = 0;
}
#[inline]
pub(super) fn end_guard(&mut self) {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));
if self.num_readers == 1 {
if self.next_epoch_update == 0 {
if self.has_garbage || Tag::into_tag(global_anchor().load(Relaxed)) != Tag::First {
self.try_scan();
}
self.next_epoch_update = if self.has_garbage {
Self::CADENCE / 4
} else {
Self::CADENCE
};
} else {
self.next_epoch_update = self.next_epoch_update.saturating_sub(1);
}
self.state
.store(u8::from(self.announcement) | Self::INACTIVE, Release);
self.num_readers = 0;
} else {
self.num_readers -= 1;
}
}
#[inline]
pub(super) fn current_epoch() -> Epoch {
Epoch::from_u8(epoch().load(Relaxed))
}
#[inline]
pub(super) fn reclaim(&mut self, instance_ptr: *mut dyn Collectible) {
if let Some(mut ptr) = NonNull::new(instance_ptr) {
unsafe {
*ptr.as_mut().next_ptr_mut() = self.current_instance_link.take();
self.current_instance_link.replace(ptr);
self.next_epoch_update = self
.next_epoch_update
.saturating_sub(1)
.min(Self::CADENCE / 4);
self.has_garbage = true;
}
}
}
#[inline]
pub(super) fn current() -> *mut Collector {
LOCAL_COLLECTOR.with(|local_collector| {
let mut collector_ptr = local_collector.load(Relaxed);
if collector_ptr.is_null() {
collector_ptr = COLLECTOR_ANCHOR.with(CollectorAnchor::alloc);
local_collector.store(collector_ptr, Relaxed);
}
collector_ptr
})
}
#[inline]
pub(super) fn pass_garbage() -> bool {
LOCAL_COLLECTOR.with(|local_collector| {
let collector_ptr = local_collector.load(Relaxed);
if let Some(collector) = unsafe { collector_ptr.as_mut() } {
if collector.num_readers != 0 {
return false;
}
if collector.has_garbage {
collector.state.fetch_or(Collector::INVALID, Release);
local_collector.store(ptr::null_mut(), Relaxed);
mark_scan_enforced();
}
}
true
})
}
pub(super) fn epoch_updated(&mut self) {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));
let mut garbage_link = self.next_instance_link.take();
self.next_instance_link = self.previous_instance_link.take();
self.previous_instance_link = self.current_instance_link.take();
self.has_garbage =
self.next_instance_link.is_some() || self.previous_instance_link.is_some();
while let Some(mut instance_ptr) = garbage_link.take() {
garbage_link = unsafe { *instance_ptr.as_mut().next_ptr_mut() };
let mut guard = ExitGuard::new(garbage_link, |mut garbage_link| {
while let Some(mut instance_ptr) = garbage_link.take() {
garbage_link = unsafe { *instance_ptr.as_mut().next_ptr_mut() };
std::sync::atomic::compiler_fence(Acquire);
self.reclaim(instance_ptr.as_ptr());
}
});
std::sync::atomic::compiler_fence(Acquire);
unsafe {
drop(Box::from_raw(instance_ptr.as_ptr()));
}
garbage_link = guard.take();
}
}
pub(super) fn try_scan(&mut self) -> bool {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));
let lock_result = global_anchor()
.fetch_update(Acquire, Acquire, |p| {
let tag = Tag::into_tag(p);
if tag == Tag::First || tag == Tag::Both {
None
} else {
Some(Tag::update_tag(p, Tag::First).cast_mut())
}
})
.map(|p| Tag::unset_tag(p).cast_mut());
if let Ok(mut collector_ptr) = lock_result {
let _guard = ExitGuard::new(global_anchor(), |a| {
loop {
let result = a.fetch_update(Release, Relaxed, |p| {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
let new_tag = if tag == Tag::First {
Tag::None
} else {
Tag::Second
};
Some(Tag::update_tag(p, new_tag).cast_mut())
});
if result.is_ok() {
break;
}
}
});
let known_epoch = self.state.load(Relaxed);
let mut update_global_epoch = true;
let mut prev_collector_ptr: *mut Collector = ptr::null_mut();
while !collector_ptr.is_null() {
if ptr::eq(self, collector_ptr) {
prev_collector_ptr = collector_ptr;
collector_ptr = self.next_link.load(Relaxed);
continue;
}
let collector_state = unsafe { (*collector_ptr).state.load(Relaxed) };
let next_collector_ptr = unsafe { (*collector_ptr).next_link.load(Relaxed) };
if (collector_state & Self::INVALID) != 0 {
let result = if prev_collector_ptr.is_null() {
global_anchor()
.fetch_update(Release, Relaxed, |p| {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
if ptr::eq(Tag::unset_tag(p), collector_ptr) {
Some(Tag::update_tag(next_collector_ptr, tag).cast_mut())
} else {
None
}
})
.is_ok()
} else {
unsafe {
(*prev_collector_ptr)
.next_link
.store(next_collector_ptr, Relaxed);
}
true
};
if result {
self.reclaim(collector_ptr);
collector_ptr = next_collector_ptr;
continue;
}
} else if (collector_state & Self::INACTIVE) == 0 && collector_state != known_epoch
{
update_global_epoch = false;
break;
}
prev_collector_ptr = collector_ptr;
collector_ptr = next_collector_ptr;
}
if update_global_epoch {
augmented_fence(SeqCst);
epoch().store(Epoch::from_u8(known_epoch).next().into(), Relaxed);
return true;
}
}
false
}
fn alloc() -> *mut Collector {
let boxed = Box::new(Collector {
state: AugmentedAtomicU8::new(Self::INACTIVE),
announcement: Epoch::default(),
next_epoch_update: Self::CADENCE,
has_garbage: false,
num_readers: 0,
previous_instance_link: None,
current_instance_link: None,
next_instance_link: None,
next_link: AtomicPtr::default(),
link: None,
});
let ptr = Box::into_raw(boxed);
let mut current = global_anchor().load(Relaxed);
loop {
unsafe {
(*ptr)
.next_link
.store(Tag::unset_tag(current).cast_mut(), Relaxed);
}
let tag = Tag::into_tag(current);
let new = Tag::update_tag(ptr, tag).cast_mut();
if let Err(actual) =
global_anchor().compare_exchange_weak(current, new, Release, Relaxed)
{
current = actual;
} else {
break;
}
}
ptr
}
}
impl Drop for Collector {
#[inline]
fn drop(&mut self) {
self.state.store(0, Relaxed);
self.announcement = Epoch::default();
while self.has_garbage {
self.epoch_updated();
}
}
}
impl Collectible for Collector {
#[inline]
fn next_ptr_mut(&mut self) -> &mut Option<NonNull<dyn Collectible>> {
&mut self.link
}
}
struct CollectorAnchor;
impl CollectorAnchor {
fn alloc(&self) -> *mut Collector {
let _: &CollectorAnchor = self;
Collector::alloc()
}
}
impl Drop for CollectorAnchor {
#[inline]
fn drop(&mut self) {
unsafe {
try_drop_local_collector();
}
}
}
fn mark_scan_enforced() {
let _result = global_anchor().fetch_update(Release, Relaxed, |p| {
let new_tag = match Tag::into_tag(p) {
Tag::None => Tag::Second,
Tag::First => Tag::Both,
Tag::Second | Tag::Both => return None,
};
Some(Tag::update_tag(p, new_tag).cast_mut())
});
}
unsafe fn try_drop_local_collector() {
#[cfg(all(loom, test))]
{
return;
}
#[cfg(not(all(loom, test)))]
{
let collector_ptr = LOCAL_COLLECTOR.with(|local_collector| local_collector.load(Relaxed));
if collector_ptr.is_null() {
return;
}
let mut anchor_ptr = global_anchor().load(Relaxed);
if Tag::into_tag(anchor_ptr) == Tag::Second {
let guard = super::Guard::new_for_drop(collector_ptr);
(*collector_ptr).try_scan();
drop(guard);
anchor_ptr = global_anchor().load(Relaxed);
}
if (*collector_ptr).next_link.load(Relaxed).is_null()
&& ptr::eq(collector_ptr, anchor_ptr)
&& global_anchor()
.compare_exchange(anchor_ptr, ptr::null_mut(), Relaxed, Relaxed)
.is_ok()
{
while (*collector_ptr).has_garbage {
let guard = super::Guard::new_for_drop(collector_ptr);
(*collector_ptr).epoch_updated();
drop(guard);
}
drop(Box::from_raw(collector_ptr));
return;
}
(*collector_ptr).state.fetch_or(Collector::INVALID, Release);
mark_scan_enforced();
}
}
augmented_thread_local! {
#[allow(clippy::thread_local_initializer_can_be_made_const)]
static COLLECTOR_ANCHOR: CollectorAnchor = CollectorAnchor;
static LOCAL_COLLECTOR: AtomicPtr<Collector> = AtomicPtr::default();
}
fn epoch() -> &'static AugmentedAtomicU8 {
#[cfg(not(all(loom, test)))]
{
static EPOCH: AugmentedAtomicU8 = AugmentedAtomicU8::new(0);
&EPOCH
}
#[cfg(all(loom, test))]
{
static EPOCH: std::sync::OnceLock<AugmentedAtomicU8> = std::sync::OnceLock::new();
EPOCH.get_or_init(|| AugmentedAtomicU8::new(0))
}
}
fn global_anchor() -> &'static AugmentedAtomicPtr<Collector> {
#[cfg(not(all(loom, test)))]
{
static GLOBAL_ANCHOR: AugmentedAtomicPtr<Collector> =
AugmentedAtomicPtr::new(ptr::null_mut());
&GLOBAL_ANCHOR
}
#[cfg(all(loom, test))]
{
static GLOBAL_ANCHOR: std::sync::OnceLock<AugmentedAtomicPtr<Collector>> =
std::sync::OnceLock::new();
GLOBAL_ANCHOR.get_or_init(|| AugmentedAtomicPtr::new(ptr::null_mut()))
}
}