#![no_std]
#![deny(clippy::correctness)]
#![deny(
clippy::perf,
clippy::complexity,
clippy::style,
clippy::nursery,
clippy::pedantic,
clippy::clone_on_ref_ptr,
clippy::decimal_literal_representation,
clippy::float_cmp_const,
clippy::missing_docs_in_private_items,
clippy::multiple_inherent_impl,
clippy::unwrap_used,
clippy::cargo_common_metadata,
clippy::used_underscore_binding
)]
extern crate alloc;
use alloc::boxed::Box;
use core::ptr::null_mut;
use core::sync::atomic::Ordering::SeqCst;
use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use defer_heavy::defer;
#[derive(Debug, Default)]
pub struct AtomicLifo<T: Sync + Send + 'static> {
concurrent_pop_count: AtomicUsize,
hazard_generation: AtomicUsize,
hazard_threshold: AtomicUsize,
hazard_lock: AtomicBool,
hazard_head: AtomicPtr<HazardNode<T>>,
head: AtomicPtr<Node<T>>,
}
impl<T: Sync + Send + 'static> Drop for AtomicLifo<T> {
fn drop(&mut self) {
unsafe {
let mut current_free = self.head.load(SeqCst);
loop {
if current_free.is_null() {
break;
}
let node = Box::from_raw(current_free);
current_free = node.next;
_ = Box::from_raw(node.value);
}
let hazard_head = self.hazard_head.load(SeqCst);
if !hazard_head.is_null() {
_ = Box::from_raw(hazard_head);
}
}
}
}
#[derive(Debug)]
struct HazardNode<T: Sync + Send + 'static> {
generation: usize,
node: *mut Node<T>,
next: *mut HazardNode<T>,
}
impl<T: Sync + Send + 'static> Drop for HazardNode<T> {
fn drop(&mut self) {
if !self.node.is_null() {
unsafe {
_ = Box::from_raw(self.node);
}
}
let mut cur_free = self.next;
while !cur_free.is_null() {
unsafe {
let mut cur_free_unbox = Box::from_raw(cur_free);
cur_free = cur_free_unbox.next;
cur_free_unbox.next = null_mut();
}
}
}
}
#[derive(Debug)]
struct Node<T: Sync + Send + 'static> {
next: *mut Node<T>,
value: *mut T,
}
impl<T: Sync + Send + 'static> AtomicLifo<T> {
#[must_use]
pub const fn new() -> Self {
Self {
concurrent_pop_count: AtomicUsize::new(0),
hazard_generation: AtomicUsize::new(0),
hazard_threshold: AtomicUsize::new(0),
hazard_lock: AtomicBool::new(false),
hazard_head: AtomicPtr::new(null_mut()),
head: AtomicPtr::new(null_mut()),
}
}
unsafe fn free_hazard_list(&self, count: usize) {
const MAX_DIFF: usize = usize::MAX / 2;
if self.hazard_lock.swap(true, SeqCst) {
return;
}
defer! {
self.hazard_lock.store(false, SeqCst);
}
self.hazard_threshold.store(0, SeqCst);
let mut cur_ptr = self.hazard_head.load(SeqCst);
while let Some(cur) = cur_ptr.as_mut() {
let next_ptr = cur.next;
let Some(next) = next_ptr.as_ref() else {
return;
};
if next.generation < count && next.generation.abs_diff(count) <= MAX_DIFF {
cur.next = null_mut();
_ = Box::from_raw(next_ptr);
return;
}
cur_ptr = next_ptr;
}
}
pub fn push(&self, value: T) {
let node = Box::into_raw(Box::new(Node {
value: Box::into_raw(Box::new(value)),
next: self.head.load(SeqCst),
}));
let node_ref = unsafe { node.as_mut().unwrap_unchecked() };
loop {
if self
.head
.compare_exchange(node_ref.next, node, SeqCst, SeqCst)
.is_err()
{
node_ref.next = self.head.load(SeqCst);
continue;
}
return;
}
}
pub fn pop(&self) -> Option<T> {
while self.hazard_threshold.load(SeqCst) > 500_000 {
core::hint::spin_loop();
}
assert_ne!(
self.concurrent_pop_count.fetch_add(1, SeqCst),
usize::MAX,
"Too many threads calling pop concurrently"
);
defer! {
let sub = self.concurrent_pop_count.fetch_sub(1, SeqCst);
debug_assert_ne!(sub, 0, "AtomicLifo::poll UNDERFLOW");
if sub != 1 {
return;
}
let haz_cnt = self.hazard_generation.fetch_add(1, SeqCst);
unsafe {
self.free_hazard_list(haz_cnt);
}
}
let removed = loop {
let head = self.head.load(SeqCst);
let next = unsafe { head.as_ref()?.next };
if self
.head
.compare_exchange(head, next, SeqCst, SeqCst)
.is_err()
{
continue;
}
break head;
};
let removed_obj = unsafe { Box::from_raw(removed.as_ref().unwrap_unchecked().value) };
let count = self.hazard_generation.load(SeqCst);
let hazard_node = Box::into_raw(Box::new(HazardNode {
generation: count,
node: removed,
next: self.hazard_head.load(SeqCst),
}));
loop {
let node_ref = unsafe { hazard_node.as_mut().unwrap_unchecked() };
if self
.hazard_head
.compare_exchange(node_ref.next, hazard_node, SeqCst, SeqCst)
.is_err()
{
node_ref.next = self.hazard_head.load(SeqCst);
continue;
}
break;
}
self.hazard_threshold.fetch_add(1, SeqCst);
Some(*removed_obj)
}
}