use super::RawShared;
use core::cell::{Cell, UnsafeCell};
use core::mem::{forget, replace, ManuallyDrop};
use core::sync::atomic::{compiler_fence, Ordering};
use core::{fmt, ptr};
use crossbeam_utils::CachePadded;
use memoffset::offset_of;
use super::collector::{Collector, LocalHandle};
use super::deferred::Deferred;
use super::epoch::{AtomicEpoch, Epoch};
use super::guard::{unprotected, Guard};
use super::sync::list::{Entry, IsElement, IterError, List};
use super::sync::queue::Queue;
static mut MAX_OBJECTS: usize = 64;
static mut MANUAL_EVENTS_BETWEEN_COLLECT: usize = 64;
pub(crate) struct Bag(Vec<Deferred>);
unsafe impl Send for Bag {}
impl Bag {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
if self.0.len() < self.0.capacity() {
self.0.push(deferred);
Ok(())
} else {
Err(deferred)
}
}
fn seal(self, epoch: Epoch) -> SealedBag {
SealedBag { epoch, _bag: self }
}
}
impl Default for Bag {
fn default() -> Self {
Bag(Vec::with_capacity(unsafe { MAX_OBJECTS }))
}
}
impl Drop for Bag {
fn drop(&mut self) {
for deferred in self.0.drain(..) {
deferred.call();
}
}
}
impl fmt::Debug for Bag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Bag").field("deferreds", &self.0).finish()
}
}
#[derive(Default, Debug)]
struct SealedBag {
epoch: Epoch,
_bag: Bag,
}
unsafe impl Sync for SealedBag {}
impl SealedBag {
fn is_expired(&self, global_epoch: Epoch) -> bool {
global_epoch.wrapping_sub(self.epoch) >= 3
}
}
pub(crate) struct Global {
locals: List<Local>,
queue: Queue<SealedBag>,
pub(crate) epoch: CachePadded<AtomicEpoch>,
}
impl Global {
const COLLECTS_TRIALS: usize = 16;
#[inline]
pub(crate) fn new() -> Self {
Self {
locals: List::new(),
queue: Queue::new(),
epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
}
}
pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let bag = replace(bag, Bag::new());
atomic::fence(Ordering::SeqCst);
let epoch = self.epoch.load(Ordering::Relaxed);
self.queue.push(bag.seal(epoch), guard);
}
#[cold]
pub(crate) fn collect(&self, guard: &Guard) {
if let Some(local) = unsafe { guard.local.as_ref() } {
local.manual_count.set(0);
local.pin_count.set(0);
}
self.try_advance(guard);
debug_assert!(
!guard.local.is_null(),
"An unprotected guard cannot be used to collect global garbages."
);
for _ in 0..Self::COLLECTS_TRIALS {
match self.queue.try_pop_if(
|sealed_bag: &SealedBag| sealed_bag.is_expired(self.epoch.load(Ordering::Relaxed)),
guard,
) {
None => break,
Some(sealed_bag) => {
drop(sealed_bag);
}
}
}
}
#[cold]
pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
for local in self.locals.iter(guard) {
match local {
Err(IterError::Stalled) => {
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Ordering::Relaxed);
if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
return global_epoch;
}
}
}
}
atomic::fence(Ordering::Acquire);
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}
pub(crate) struct Local {
entry: Entry,
collector: UnsafeCell<ManuallyDrop<Collector>>,
pub(crate) bag: UnsafeCell<Bag>,
guard_count: Cell<usize>,
handle_count: Cell<usize>,
advance_count: Cell<usize>,
prev_epoch: Cell<Epoch>,
pin_count: Cell<usize>,
manual_count: Cell<usize>,
must_collect: Cell<bool>,
collecting: Cell<bool>,
epoch: CachePadded<AtomicEpoch>,
}
impl Local {
const COUNTS_BETWEEN_ADVANCE: usize = 64;
pub(crate) fn register(collector: &Collector) -> LocalHandle {
unsafe {
let local = RawShared::from_owned(Local {
entry: Entry::default(),
collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
bag: UnsafeCell::new(Bag::new()),
guard_count: Cell::new(0),
handle_count: Cell::new(1),
advance_count: Cell::new(0),
prev_epoch: Cell::new(Epoch::starting()),
pin_count: Cell::new(0),
manual_count: Cell::new(0),
must_collect: Cell::new(false),
collecting: Cell::new(false),
epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
});
collector.global.locals.insert(local, &unprotected());
LocalHandle {
local: local.as_raw(),
}
}
}
#[inline]
pub(crate) fn global(&self) -> &Global {
&self.collector().global
}
#[inline]
pub(crate) fn collector(&self) -> &Collector {
unsafe { &*self.collector.get() }
}
#[inline]
#[cfg(test)]
pub(crate) fn is_pinned(&self) -> bool {
self.guard_count.get() > 0
}
pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
let bag = &mut *self.bag.get();
while let Err(d) = bag.try_push(deferred) {
self.global().push_bag(bag, guard);
deferred = d;
self.schedule_collection();
}
self.incr_advance(guard);
}
pub(crate) fn flush(&self, guard: &Guard) {
self.push_to_global(guard);
self.schedule_collection();
}
pub(crate) fn push_to_global(&self, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
if !bag.is_empty() {
self.global().push_bag(bag, guard);
}
}
pub(crate) fn schedule_collection(&self) {
self.must_collect.set(true);
if self.collecting.get() {
self.repin_without_collect();
}
}
pub(crate) fn incr_advance(&self, guard: &Guard) {
let advance_count = self.advance_count.get().wrapping_add(1);
self.advance_count.set(advance_count);
if advance_count % Self::COUNTS_BETWEEN_ADVANCE == 0 {
self.global().try_advance(guard);
}
}
#[inline]
pub(crate) fn pin(&self) -> Guard {
let guard = Guard { local: self };
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count.checked_add(1).unwrap());
if guard_count == 0 {
let new_epoch = loop {
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
let new_epoch = global_epoch.pinned();
if cfg!(all(
any(target_arch = "x86", target_arch = "x86_64"),
not(miri)
)) {
let current = Epoch::starting();
let res = self.epoch.compare_exchange(
current,
new_epoch,
Ordering::SeqCst,
Ordering::SeqCst,
);
debug_assert!(res.is_ok(), "participant was expected to be unpinned");
compiler_fence(Ordering::SeqCst);
} else {
self.epoch.store(new_epoch, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}
if new_epoch.value() == self.global().epoch.load(Ordering::Acquire).value() {
break new_epoch;
}
self.epoch.store(Epoch::starting(), Ordering::Release);
};
if new_epoch != self.prev_epoch.get() {
self.prev_epoch.set(new_epoch);
self.advance_count.set(0);
}
}
guard
}
#[inline]
pub(crate) fn unpin(&self) {
let guard_count = self.guard_count.get();
if guard_count == 1 && !self.collecting.get() {
self.collecting.set(true);
while self.must_collect.get() {
self.must_collect.set(false);
debug_assert!(self.epoch.load(Ordering::Relaxed).is_pinned());
let guard = ManuallyDrop::new(Guard { local: self });
self.global().collect(&guard);
self.repin_without_collect();
}
self.collecting.set(false);
}
self.guard_count.set(guard_count - 1);
if guard_count == 1 {
self.epoch.store(Epoch::starting(), Ordering::Release);
if self.handle_count.get() == 0 {
self.finalize();
}
}
}
#[inline]
pub(crate) fn repin(&self) {
self.acquire_handle();
self.unpin();
compiler_fence(Ordering::SeqCst);
forget(self.pin());
self.release_handle();
}
#[inline]
pub(crate) fn repin_without_collect(&self) -> Epoch {
let epoch = self.epoch.load(Ordering::Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
if epoch != global_epoch {
self.epoch.store(global_epoch, Ordering::Release);
}
global_epoch
}
#[inline]
pub(crate) fn acquire_handle(&self) {
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count + 1);
}
#[inline]
pub(crate) fn release_handle(&self) {
let guard_count = self.guard_count.get();
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count - 1);
if guard_count == 0 && handle_count == 1 {
self.finalize();
}
}
#[cold]
fn finalize(&self) {
debug_assert_eq!(self.guard_count.get(), 0);
debug_assert_eq!(self.handle_count.get(), 0);
self.handle_count.set(1);
{
let guard = &self.pin();
self.push_to_global(guard);
}
self.handle_count.set(0);
unsafe {
let collector: Collector = ptr::read(&**self.collector.get());
self.entry.delete(&unprotected());
drop(collector);
}
}
pub(crate) fn incr_manual_collection(&self, guard: &Guard) {
let manual_count = self.manual_count.get().wrapping_add(1);
self.manual_count.set(manual_count);
if manual_count % unsafe { MANUAL_EVENTS_BETWEEN_COLLECT } == 0 {
self.flush(guard);
}
}
}
impl IsElement<Local> for Local {
fn entry_of(local: &Local) -> &Entry {
let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
unsafe { &*entry_ptr }
}
unsafe fn element_of(entry: &Entry) -> &Local {
#[allow(unused_unsafe)]
let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
&*local_ptr
}
unsafe fn finalize(entry: &Entry, guard: &Guard) {
guard.defer_destroy(RawShared::from(Self::element_of(entry) as *const Local));
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
#[test]
fn check_defer() {
static FLAG: AtomicUsize = AtomicUsize::new(0);
fn set() {
FLAG.store(42, Ordering::Relaxed);
}
let d = Deferred::new(set);
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
d.call();
assert_eq!(FLAG.load(Ordering::Relaxed), 42);
}
#[test]
fn check_bag() {
static FLAG: AtomicUsize = AtomicUsize::new(0);
fn incr() {
FLAG.fetch_add(1, Ordering::Relaxed);
}
let mut bag = Bag::new();
assert!(bag.is_empty());
for _ in 0..unsafe { MAX_OBJECTS } {
assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
assert!(!bag.is_empty());
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
}
let result = unsafe { bag.try_push(Deferred::new(incr)) };
assert!(result.is_err());
assert!(!bag.is_empty());
assert_eq!(FLAG.load(Ordering::Relaxed), 0);
drop(bag);
assert_eq!(FLAG.load(Ordering::Relaxed), unsafe { MAX_OBJECTS });
}
}