use crate::raw::{Pointer, Reclaim};
use crate::record::HazPtrRecord;
use crate::sync::atomic::{AtomicIsize, AtomicPtr, AtomicUsize};
use alloc::boxed::Box;
use alloc::collections::BTreeSet;
use core::marker::PhantomData;
use core::sync::atomic::Ordering;
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
const SYNC_TIME_PERIOD: u64 = std::time::Duration::from_nanos(2000000000).as_nanos() as u64;
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
use crate::sync::atomic::AtomicU64;
#[cfg(loom)]
const RCOUNT_THRESHOLD: isize = 5;
#[cfg(not(loom))]
const RCOUNT_THRESHOLD: isize = 1000;
const HCOUNT_MULTIPLIER: isize = 2;
#[cfg(loom)]
const NUM_SHARDS: usize = 2;
#[cfg(not(loom))]
const NUM_SHARDS: usize = 8;
const IGNORED_LOW_BITS: u8 = 8;
const SHARD_MASK: usize = NUM_SHARDS - 1;
const LOCK_BIT: usize = 1;
#[non_exhaustive]
pub struct Global;
impl Global {
const fn new() -> Self {
Global
}
}
pub unsafe trait Singleton {}
unsafe impl Singleton for Global {}
#[cfg(not(loom))]
static SHARED_DOMAIN: Domain<Global> = Domain::new(&Global::new());
#[cfg(loom)]
loom::lazy_static! {
static ref SHARED_DOMAIN: Domain<Global> = Domain::new(&Global::new());
static ref SHARD: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
}
trait WithMut<T> {
fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R;
}
impl<T> WithMut<T> for core::sync::atomic::AtomicPtr<T> {
fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R {
f(self.get_mut())
}
}
pub struct Domain<F> {
hazptrs: HazPtrRecords,
untagged: [RetiredList; NUM_SHARDS],
family: PhantomData<F>,
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
due_time: AtomicU64,
nbulk_reclaims: AtomicUsize,
count: AtomicIsize,
shutdown: bool,
}
#[cfg(miri)]
extern "Rust" {
fn miri_static_root(ptr: *const u8);
}
impl Domain<Global> {
pub fn global() -> &'static Self {
#[cfg(miri)]
unsafe {
miri_static_root(&SHARED_DOMAIN as *const _ as *const u8);
};
&SHARED_DOMAIN
}
}
#[macro_export]
macro_rules! unique_domain {
() => {{
struct UniqueFamily;
unsafe impl Singleton for UniqueFamily {}
Domain::new(&UniqueFamily)
}};
}
macro_rules! new {
($($decl:tt)*) => {
pub $($decl)*(_: &'_ F) -> Self {
#[cfg(not(loom))]
let untagged = {
#[allow(clippy::declare_interior_mutable_const)]
const RETIRED_LIST: RetiredList = RetiredList::new();
[RETIRED_LIST; NUM_SHARDS]
};
#[cfg(loom)]
let untagged = {
[(); NUM_SHARDS].map(|_| RetiredList::new())
};
Self {
hazptrs: HazPtrRecords {
head: AtomicPtr::new(core::ptr::null_mut()),
head_available: AtomicPtr::new(core::ptr::null_mut()),
count: AtomicIsize::new(0),
},
untagged,
count: AtomicIsize::new(0),
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
due_time: AtomicU64::new(0),
nbulk_reclaims: AtomicUsize::new(0),
family: PhantomData,
shutdown: false,
}
}
};
}
impl<F> Domain<F> {
#[cfg(not(loom))]
new!(const fn new);
#[cfg(loom)]
new!(fn new);
pub(crate) fn acquire(&self) -> &HazPtrRecord {
self.acquire_many::<1>()[0]
}
pub(crate) fn acquire_many<const N: usize>(&self) -> [&HazPtrRecord; N] {
debug_assert!(N >= 1);
let (mut head, n) = self.try_acquire_available::<N>();
assert!(n <= N);
let mut tail = core::ptr::null();
[(); N].map(|_| {
if !head.is_null() {
tail = head;
let rec = unsafe { &*head };
head = rec.available_next.load(Ordering::Relaxed);
rec
} else {
let rec = self.acquire_new();
if !tail.is_null() {
unsafe { &*tail }
.available_next
.store(rec as *const _ as *mut _, Ordering::Relaxed);
}
tail = rec as *const _;
rec
}
})
}
pub(crate) fn release(&self, rec: &HazPtrRecord) {
assert!(rec.available_next.load(Ordering::Relaxed).is_null());
self.push_available(rec, rec);
}
pub(crate) fn release_many<const N: usize>(&self, recs: [&HazPtrRecord; N]) {
let head = recs[0];
let tail = recs.last().expect("we only give out with N > 0");
assert!(tail.available_next.load(Ordering::Relaxed).is_null());
self.push_available(head, tail);
}
fn try_acquire_available<const N: usize>(&self) -> (*const HazPtrRecord, usize) {
debug_assert!(N >= 1);
debug_assert_eq!(core::ptr::null::<HazPtrRecord>() as usize, 0);
loop {
let avail = self.hazptrs.head_available.load(Ordering::Acquire);
if avail.is_null() {
return (avail, 0);
}
debug_assert_ne!(avail, LOCK_BIT as *mut _);
if (avail as usize & LOCK_BIT) == 0 {
if self
.hazptrs
.head_available
.compare_exchange_weak(
avail,
with_lock_bit(avail),
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
let (rec, n) = unsafe { self.try_acquire_available_locked::<N>(avail) };
debug_assert!(n >= 1, "head_available was not null");
debug_assert!(n <= N);
return (rec, n);
} else {
#[cfg(not(any(loom, feature = "std")))]
core::hint::spin_loop();
#[cfg(any(loom, feature = "std"))]
crate::sync::yield_now();
}
}
}
}
unsafe fn try_acquire_available_locked<const N: usize>(
&self,
head: *const HazPtrRecord,
) -> (*const HazPtrRecord, usize) {
debug_assert!(N >= 1);
debug_assert!(!head.is_null());
let mut tail = head;
let mut n = 1;
let mut next = unsafe { &*tail }.available_next.load(Ordering::Relaxed);
while !next.is_null() && n < N {
debug_assert_eq!((next as usize) & LOCK_BIT, 0);
tail = next;
next = unsafe { &*tail }.available_next.load(Ordering::Relaxed);
n += 1;
}
self.hazptrs.head_available.store(next, Ordering::Release);
unsafe { &*tail }
.available_next
.store(core::ptr::null_mut(), Ordering::Relaxed);
(head, n)
}
fn push_available(&self, head: &HazPtrRecord, tail: &HazPtrRecord) {
debug_assert!(tail.available_next.load(Ordering::Relaxed).is_null());
if cfg!(debug_assertions) {
}
debug_assert_eq!(head as *const _ as usize & LOCK_BIT, 0);
loop {
let avail = self.hazptrs.head_available.load(Ordering::Acquire);
if (avail as usize & LOCK_BIT) == 0 {
tail.available_next
.store(avail as *mut _, Ordering::Relaxed);
if self
.hazptrs
.head_available
.compare_exchange_weak(
avail,
head as *const _ as *mut _,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else {
#[cfg(not(any(loom, feature = "std")))]
core::hint::spin_loop();
#[cfg(any(loom, feature = "std"))]
crate::sync::yield_now();
}
}
}
pub(crate) fn acquire_new(&self) -> &HazPtrRecord {
let hazptr = Box::into_raw(Box::new(HazPtrRecord {
ptr: AtomicPtr::new(core::ptr::null_mut()),
next: AtomicPtr::new(core::ptr::null_mut()),
available_next: AtomicPtr::new(core::ptr::null_mut()),
}));
let mut head = self.hazptrs.head.load(Ordering::Acquire);
loop {
unsafe { &mut *hazptr }.next.with_mut(|p| *p = head);
match self.hazptrs.head.compare_exchange_weak(
head,
hazptr,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
self.hazptrs.count.fetch_add(1, Ordering::SeqCst);
break unsafe { &*hazptr };
}
Err(head_now) => {
head = head_now
}
}
}
}
pub unsafe fn retire_ptr<T, P>(&self, ptr: *mut T) -> usize
where
T: Send,
P: Pointer<T>,
{
let retired = Box::new(unsafe {
Retired::new(self, ptr, |ptr: *mut dyn Reclaim| {
let _ = P::from_raw(ptr as *mut T);
})
});
self.push_list(retired)
}
pub fn eager_reclaim(&self) -> usize {
self.nbulk_reclaims.fetch_add(1, Ordering::Acquire);
self.do_reclamation(0)
}
#[doc(hidden)]
pub fn cleanup(&self) {
self.eager_reclaim();
self.wait_for_zero_bulk_reclaims(); }
fn push_list(&self, mut retired: Box<Retired>) -> usize {
assert!(
retired.next.with_mut(|p| p.is_null()),
"only single item retiring is supported atm"
);
crate::asymmetric_light_barrier();
let retired = Box::into_raw(retired);
unsafe { self.untagged[Self::calc_shard(retired)].push(retired, retired) };
self.count.fetch_add(1, Ordering::Release);
self.check_threshold_and_reclaim()
}
fn threshold(&self) -> isize {
RCOUNT_THRESHOLD.max(HCOUNT_MULTIPLIER * self.hazptrs.count.load(Ordering::Acquire))
}
fn check_count_threshold(&self) -> isize {
let mut rcount = self.count.load(Ordering::Acquire);
while rcount > self.threshold() {
match self
.count
.compare_exchange_weak(rcount, 0, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(_) => {
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
{
self.due_time
.store(Self::now() + SYNC_TIME_PERIOD, Ordering::Release);
}
return rcount;
}
Err(rcount_now) => rcount = rcount_now,
}
}
0
}
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
fn check_due_time(&self) -> isize {
let time = Self::now();
let due = self.due_time.load(Ordering::Acquire);
if time < due
|| self
.due_time
.compare_exchange(
due,
time + SYNC_TIME_PERIOD,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_err()
{
return 0;
}
self.count.swap(0, Ordering::AcqRel)
}
fn check_threshold_and_reclaim(&self) -> usize {
#[allow(unused_mut)]
let mut rcount = self.check_count_threshold();
if rcount == 0 {
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom)))]
{
rcount = self.check_due_time();
}
if rcount == 0 {
return 0;
}
}
self.nbulk_reclaims.fetch_add(1, Ordering::Acquire);
self.do_reclamation(rcount)
}
fn do_reclamation(&self, mut rcount: isize) -> usize {
let mut total_reclaimed = 0;
loop {
let mut done = true;
let mut stolen_heads = [core::ptr::null_mut(); NUM_SHARDS];
let mut empty = true;
for (stolen_head, untagged) in stolen_heads.iter_mut().zip(&self.untagged) {
*stolen_head = untagged.pop_all();
if !stolen_head.is_null() {
empty = false;
}
}
if !empty {
crate::asymmetric_heavy_barrier(crate::HeavyBarrierKind::Expedited);
#[allow(clippy::mutable_key_type)]
let mut guarded_ptrs = BTreeSet::new();
let mut node = self.hazptrs.head.load(Ordering::Acquire);
while !node.is_null() {
let n = unsafe { &*node };
guarded_ptrs.insert(n.ptr.load(Ordering::Acquire));
node = n.next.load(Ordering::Relaxed);
}
let (nreclaimed, is_done) =
self.match_reclaim_untagged(stolen_heads, &guarded_ptrs);
done = is_done;
rcount -= nreclaimed as isize;
total_reclaimed += nreclaimed;
}
if rcount != 0 {
self.count.fetch_add(rcount, Ordering::Release);
}
rcount = self.check_count_threshold();
if rcount == 0 && done {
break;
}
}
self.nbulk_reclaims.fetch_sub(1, Ordering::Acquire);
total_reclaimed
}
fn match_reclaim_untagged(
&self,
stolen_heads: [*mut Retired; NUM_SHARDS],
guarded_ptrs: &BTreeSet<*mut u8>,
) -> (usize, bool) {
let mut unreclaimed = core::ptr::null_mut();
let mut unreclaimed_tail = unreclaimed;
let mut nreclaimed = 0;
for mut node in stolen_heads {
let mut reclaimable = core::ptr::null_mut();
while !node.is_null() {
let n = unsafe { &*node };
let next = n.next.load(Ordering::Relaxed);
debug_assert_ne!(node, next);
if !guarded_ptrs.contains(&(n.ptr as *mut u8)) {
n.next.store(reclaimable, Ordering::Relaxed);
reclaimable = node;
nreclaimed += 1;
} else {
n.next.store(unreclaimed, Ordering::Relaxed);
unreclaimed = node;
if unreclaimed_tail.is_null() {
unreclaimed_tail = unreclaimed;
}
}
node = next;
}
unsafe { self.reclaim_unprotected(reclaimable) };
}
let done = self.untagged.iter().all(|u| u.is_empty());
unsafe { self.untagged[0].push(unreclaimed, unreclaimed_tail) };
(nreclaimed, done)
}
unsafe fn reclaim_unprotected(&self, mut retired: *mut Retired) {
while !retired.is_null() {
let next = unsafe { &*retired }.next.load(Ordering::Relaxed);
let n = unsafe { Box::from_raw(retired) };
unsafe { (n.deleter)(n.ptr) };
retired = next;
}
}
#[cfg(any(loom, miri))]
fn now() -> u64 {
0
}
#[cfg(all(feature = "std", target_pointer_width = "64", not(loom), not(miri)))]
fn now() -> u64 {
u64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time is set to before the epoch")
.as_nanos(),
)
.expect("system time is too far into the future")
}
fn reclaim_all_objects(&mut self) {
for i in 0..NUM_SHARDS {
let head = self.untagged[i].pop_all();
unsafe { self.reclaim_list_transitive(head) };
}
}
unsafe fn reclaim_list_transitive(&self, head: *mut Retired) {
unsafe { self.reclaim_unconditional(head) };
}
unsafe fn reclaim_unconditional(&self, head: *mut Retired) {
unsafe { self.reclaim_unprotected(head) };
}
fn wait_for_zero_bulk_reclaims(&self) {
while self.nbulk_reclaims.load(Ordering::Acquire) > 0 {
#[cfg(not(any(loom, feature = "std")))]
core::hint::spin_loop();
#[cfg(any(loom, feature = "std"))]
crate::sync::yield_now();
}
}
fn free_hazptr_recs(&mut self) {
let mut node: *mut HazPtrRecord = self.hazptrs.head.with_mut(|p| *p);
while !node.is_null() {
let mut n: Box<HazPtrRecord> = unsafe { Box::from_raw(node) };
node = n.next.with_mut(|p| *p);
drop(n);
}
}
#[cfg(not(loom))]
fn calc_shard(input: *mut Retired) -> usize {
(input as usize >> IGNORED_LOW_BITS) & SHARD_MASK
}
#[cfg(loom)]
fn calc_shard(_input: *mut Retired) -> usize {
SHARD.fetch_add(1, Ordering::Relaxed) & SHARD_MASK
}
}
impl<F> Drop for Domain<F> {
fn drop(&mut self) {
self.shutdown = true;
self.reclaim_all_objects();
self.free_hazptr_recs();
}
}
struct HazPtrRecords {
head: AtomicPtr<HazPtrRecord>,
head_available: AtomicPtr<HazPtrRecord>,
count: AtomicIsize,
}
struct Retired {
ptr: *mut dyn Reclaim,
deleter: unsafe fn(ptr: *mut dyn Reclaim),
next: AtomicPtr<Retired>,
}
impl Retired {
unsafe fn new<'domain, F>(
_: &'domain Domain<F>,
ptr: *mut (dyn Reclaim + 'domain),
deleter: unsafe fn(ptr: *mut dyn Reclaim),
) -> Self {
Retired {
ptr: unsafe { core::mem::transmute::<_, *mut (dyn Reclaim + 'static)>(ptr) },
deleter,
next: AtomicPtr::new(core::ptr::null_mut()),
}
}
}
struct RetiredList {
head: AtomicPtr<Retired>,
}
impl RetiredList {
#[cfg(not(loom))]
const fn new() -> Self {
Self {
head: AtomicPtr::new(core::ptr::null_mut()),
}
}
#[cfg(loom)]
fn new() -> Self {
Self {
head: AtomicPtr::new(core::ptr::null_mut()),
}
}
unsafe fn push(&self, sublist_head: *mut Retired, sublist_tail: *mut Retired) {
if sublist_head.is_null() {
return;
}
let head_ptr = &self.head;
let mut head = head_ptr.load(Ordering::Acquire);
loop {
unsafe { &*sublist_tail }
.next
.store(head, Ordering::Release);
match head_ptr.compare_exchange_weak(
head,
sublist_head,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(head_now) => {
head = head_now
}
}
}
}
fn pop_all(&self) -> *mut Retired {
self.head.swap(core::ptr::null_mut(), Ordering::Acquire)
}
fn is_empty(&self) -> bool {
self.head.load(Ordering::Relaxed).is_null()
}
}
fn with_lock_bit(ptr: *mut HazPtrRecord) -> *mut HazPtrRecord {
int_to_ptr_with_provenance(ptr as usize | LOCK_BIT, ptr)
}
fn without_lock_bit(ptr: *mut HazPtrRecord) -> *mut HazPtrRecord {
int_to_ptr_with_provenance(ptr as usize & !LOCK_BIT, ptr)
}
fn int_to_ptr_with_provenance<T>(addr: usize, prov: *mut T) -> *mut T {
let ptr = prov.cast::<u8>();
ptr.wrapping_add(addr.wrapping_sub(ptr as usize)).cast()
}
#[cfg(doctest)]
struct CannotConfuseGlobalWriter;
#[cfg(doctest)]
struct CannotConfuseGlobalReader;
#[cfg(doctest)]
struct CannotConfuseAcrossFamilies;
#[cfg(test)]
mod tests {
use super::Domain;
use core::{ptr::null_mut, sync::atomic::Ordering};
#[test]
fn acquire_many_skips_used_nodes() {
let domain = Domain::new(&());
let rec1 = domain.acquire();
let rec2 = domain.acquire();
let rec3 = domain.acquire();
assert_eq!(
rec3.next.load(Ordering::Relaxed),
rec2 as *const _ as *mut _
);
assert_eq!(
rec2.next.load(Ordering::Relaxed),
rec1 as *const _ as *mut _
);
assert_eq!(rec1.next.load(Ordering::Relaxed), core::ptr::null_mut());
domain.release(rec1);
domain.release(rec3);
drop(rec1);
drop(rec3);
let [one, two, three] = domain.acquire_many();
assert_eq!(
one.available_next.load(Ordering::Relaxed),
two as *const _ as *mut _
);
assert_eq!(
two.available_next.load(Ordering::Relaxed),
three as *const _ as *mut _
);
assert_eq!(
three.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(
three.next.load(Ordering::Relaxed),
one as *const _ as *mut _
);
assert_eq!(one.next.load(Ordering::Relaxed), rec2 as *const _ as *mut _);
assert_eq!(rec2.next.load(Ordering::Relaxed), two as *const _ as *mut _);
}
#[test]
fn acquire_many_orders_nodes_between_acquires() {
let domain = Domain::new(&());
let rec1 = domain.acquire();
let rec2 = domain.acquire();
assert_eq!(
rec2.next.load(Ordering::Relaxed),
rec1 as *const _ as *mut _
);
domain.release(rec2);
drop(rec2);
let [one, two] = domain.acquire_many();
assert_eq!(
one.available_next.load(Ordering::Relaxed),
two as *const _ as *mut _
);
assert_eq!(
two.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(two.next.load(Ordering::Relaxed), one as *const _ as *mut _);
assert_eq!(one.next.load(Ordering::Relaxed), rec1 as *const _ as *mut _);
}
#[test]
fn acquire_many_properly_orders_reused_nodes() {
let domain = Domain::new(&());
let rec1 = domain.acquire();
let rec2 = domain.acquire();
let rec3 = domain.acquire();
assert_eq!(rec1.next.load(Ordering::Relaxed), core::ptr::null_mut(),);
assert_eq!(
rec2.next.load(Ordering::Relaxed),
rec1 as *const _ as *mut _
);
assert_eq!(
rec3.next.load(Ordering::Relaxed),
rec2 as *const _ as *mut _
);
domain.release(rec1);
domain.release(rec2);
domain.release(rec3);
drop(rec1);
drop(rec2);
drop(rec3);
let [one, two, three, four, five] = domain.acquire_many();
assert_eq!(
one.available_next.load(Ordering::Relaxed),
two as *const _ as *mut _
);
assert_eq!(
two.available_next.load(Ordering::Relaxed),
three as *const _ as *mut _
);
assert_eq!(
three.available_next.load(Ordering::Relaxed),
four as *const _ as *mut _
);
assert_eq!(
four.available_next.load(Ordering::Relaxed),
five as *const _ as *mut _
);
assert_eq!(
five.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(
five.next.load(Ordering::Relaxed),
four as *const _ as *mut _
);
assert_eq!(four.next.load(Ordering::Relaxed), one as *const _ as *mut _);
assert_eq!(one.next.load(Ordering::Relaxed), two as *const _ as *mut _);
assert_eq!(
two.next.load(Ordering::Relaxed),
three as *const _ as *mut _
);
assert_eq!(three.next.load(Ordering::Relaxed), null_mut());
}
}