#[cfg(feature = "std")]
use std::error;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, vec::Vec};
use core::cell::UnsafeCell;
use core::fmt;
use core::mem::ManuallyDrop;
use core::ptr::{self, NonNull};
use core::sync::atomic::{
self,
Ordering::{Release, SeqCst},
};
use arrayvec::{ArrayVec, CapacityError};
use crate::global::GLOBAL;
use crate::hazard::{Hazard, Protected};
use crate::retired::{ReclaimOnDrop, Retired, RetiredBag};
use crate::sanitize;
const HAZARD_CACHE: usize = 16;
const SCAN_CACHE: usize = 128;
include!(concat!(env!("OUT_DIR"), "/build_constants.rs"));
const fn scan_threshold() -> u32 {
SCAN_THRESHOLD
}
pub trait LocalAccess
where
Self: Clone + Copy + Sized,
{
fn get_hazard(self, protect: Option<NonNull<()>>) -> &'static Hazard;
fn try_recycle_hazard(self, hazard: &'static Hazard) -> Result<(), RecycleError>;
fn increase_ops_count(self);
}
#[derive(Debug)]
pub struct Local(UnsafeCell<LocalInner>);
impl Default for Local {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl Local {
#[inline]
pub fn new() -> Self {
Self(UnsafeCell::new(LocalInner {
ops_count: 0,
hazard_cache: ArrayVec::new(),
scan_cache: Vec::with_capacity(SCAN_CACHE),
retired_bag: match GLOBAL.try_adopt_abandoned_records() {
Some(boxed) => ManuallyDrop::new(boxed),
None => ManuallyDrop::new(Box::new(RetiredBag::new())),
},
}))
}
#[inline]
pub(crate) fn try_flush(&self) {
unsafe { &mut *self.0.get() }.try_flush();
}
#[inline]
pub(crate) fn retire_record(&self, record: Retired) {
let local = unsafe { &mut *self.0.get() };
local.retired_bag.inner.push(ReclaimOnDrop::from(record));
#[cfg(not(feature = "count-release"))]
local.increase_ops_count();
}
}
impl<'a> LocalAccess for &'a Local {
#[inline]
fn get_hazard(self, protect: Option<NonNull<()>>) -> &'static Hazard {
let local = unsafe { &mut *self.0.get() };
match local.hazard_cache.pop() {
Some(hazard) => hazard,
None => GLOBAL.get_hazard(protect),
}
}
#[inline]
fn try_recycle_hazard(self, hazard: &'static Hazard) -> Result<(), RecycleError> {
unsafe { &mut *self.0.get() }.hazard_cache.try_push(hazard)?;
hazard.set_thread_reserved(Release);
Ok(())
}
#[inline]
fn increase_ops_count(self) {
unsafe { &mut *self.0.get() }.increase_ops_count();
}
}
#[derive(Debug)]
struct LocalInner {
ops_count: u32,
hazard_cache: ArrayVec<[&'static Hazard; HAZARD_CACHE]>,
scan_cache: Vec<Protected>,
retired_bag: ManuallyDrop<Box<RetiredBag>>,
}
impl LocalInner {
#[inline]
fn try_flush(&mut self) {
if let Some(abandoned_bag) = GLOBAL.try_adopt_abandoned_records() {
self.retired_bag.merge(abandoned_bag.inner);
}
let _ = self.scan_hazards();
}
#[inline]
fn increase_ops_count(&mut self) {
self.ops_count += 1;
if self.ops_count == scan_threshold() {
self.try_flush();
self.ops_count = 0;
}
}
#[inline]
fn scan_hazards(&mut self) -> usize {
let len = self.retired_bag.inner.len();
if len == 0 {
return 0;
}
GLOBAL.collect_protected_hazards(&mut self.scan_cache, SeqCst);
self.scan_cache.sort_unstable();
unsafe { self.reclaim_unprotected_records() };
len - self.retired_bag.inner.len()
}
#[allow(unused_unsafe)]
unsafe fn reclaim_unprotected_records(&mut self) {
let scan_cache = &self.scan_cache;
self.retired_bag.inner.retain(|retired| {
scan_cache.binary_search_by(|&protected| retired.compare_with(protected)).is_ok()
});
}
}
impl Drop for LocalInner {
#[inline]
fn drop(&mut self) {
for hazard in &self.hazard_cache {
hazard.set_free(sanitize::RELAXED_STORE);
}
atomic::fence(Release);
let _ = self.scan_hazards();
let bag = unsafe { ptr::read(&*self.retired_bag) };
if !bag.inner.is_empty() {
GLOBAL.abandon_retired_bag(bag);
}
}
}
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub enum RecycleError {
Access,
Capacity,
}
impl From<CapacityError<&'static Hazard>> for RecycleError {
#[inline]
fn from(_: CapacityError<&'static Hazard>) -> Self {
RecycleError::Capacity
}
}
impl fmt::Display for RecycleError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RecycleError::Access => {
write!(f, "failed to access already destroyed thread local storage")
}
RecycleError::Capacity => {
write!(f, "thread local cache for hazard pointer already full")
}
}
}
}
#[cfg(feature = "std")]
impl error::Error for RecycleError {}
#[cfg(test)]
mod tests {
use std::mem;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::retired::Retired;
use super::{scan_threshold, Local, LocalAccess, HAZARD_CACHE, SCAN_CACHE};
struct DropCount<'a>(&'a AtomicUsize);
impl Drop for DropCount<'_> {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn acquire_local() {
let local = Local::new();
let ptr = NonNull::from(&());
(0..HAZARD_CACHE)
.map(|_| local.get_hazard(Some(ptr.cast())))
.collect::<Box<[_]>>()
.iter()
.try_for_each(|hazard| local.try_recycle_hazard(hazard))
.unwrap();
{
let inner = unsafe { &*local.0.get() };
assert_eq!(0, inner.ops_count);
assert_eq!(HAZARD_CACHE, inner.hazard_cache.len());
assert_eq!(SCAN_CACHE, inner.scan_cache.capacity());
assert_eq!(0, inner.scan_cache.len());
}
let hazards: Box<[_]> =
(0..HAZARD_CACHE).map(|_| local.get_hazard(Some(ptr.cast()))).collect();
let extra = local.get_hazard(Some(ptr.cast()));
{
let inner = unsafe { &*local.0.get() };
assert_eq!(0, inner.ops_count);
assert_eq!(0, inner.hazard_cache.len());
assert_eq!(SCAN_CACHE, inner.scan_cache.capacity());
assert_eq!(0, inner.scan_cache.len());
}
hazards.iter().try_for_each(|hazard| local.try_recycle_hazard(*hazard)).unwrap();
local.try_recycle_hazard(extra).unwrap_err();
}
#[test]
#[cfg_attr(feature = "count-release", ignore)]
fn retire() {
const THRESHOLD: usize = scan_threshold() as usize;
let count = AtomicUsize::new(0);
let local = Local::new();
(0..THRESHOLD - 1)
.map(|_| Box::new(DropCount(&count)))
.map(|record| unsafe { Retired::new_unchecked(NonNull::from(Box::leak(record))) })
.for_each(|retired| local.retire_record(retired));
{
let inner = unsafe { &*local.0.get() };
assert_eq!(THRESHOLD - 1, inner.ops_count as usize);
assert_eq!(THRESHOLD - 1, inner.retired_bag.inner.len());
}
assert_eq!(0, count.load(Ordering::Relaxed));
local.retire_record(unsafe {
Retired::new_unchecked(NonNull::from(Box::leak(Box::new(DropCount(&count)))))
});
{
let inner = unsafe { &*local.0.get() };
assert_eq!(0, inner.ops_count as usize);
assert_eq!(0, inner.retired_bag.inner.len());
}
assert_eq!(THRESHOLD, count.load(Ordering::Relaxed));
}
#[test]
fn drop() {
const BELOW_THRESHOLD: usize = scan_threshold() as usize / 2;
let count = AtomicUsize::new(0);
let local = Local::new();
(0..BELOW_THRESHOLD)
.map(|_| Box::new(DropCount(&count)))
.map(|record| unsafe { Retired::new_unchecked(NonNull::from(Box::leak(record))) })
.for_each(|retired| local.retire_record(retired));
mem::drop(local);
assert_eq!(BELOW_THRESHOLD, count.load(Ordering::Relaxed));
}
}