#![deny(unsafe_op_in_unsafe_fn)]
#![warn(missing_docs)]
#![warn(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![cfg_attr(not(feature = "std"), no_std)]
static DISCRIMINATOR: AtomicUsize = AtomicUsize::new(0);
extern crate alloc;
mod sync;
use crate::sync::atomic::{AtomicPtr, AtomicUsize};
use alloc::boxed::Box;
use core::{marker::PhantomData, sync::atomic::Ordering};
trait WithMut<T> {
fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R;
}
impl<T> WithMut<T> for core::sync::atomic::AtomicPtr<T> {
fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R {
f(self.get_mut())
}
}
const LOCK_BIT: usize = 1;
#[derive(Debug)]
pub struct Node<T> {
v: T,
next: AtomicPtr<Self>,
available_next: AtomicPtr<Self>,
discriminator: usize,
}
#[derive(Debug)]
#[repr(transparent)]
pub struct Acquired<'pool, T>(&'pool Node<T>);
impl<T> AsRef<T> for Acquired<'_, T> {
fn as_ref(&self) -> &T {
self.into_ref()
}
}
impl<T> core::ops::Deref for Acquired<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.into_ref()
}
}
impl<'pool, T> Acquired<'pool, T> {
#[inline]
pub fn into_ref(&self) -> &'pool T {
&self.0.v
}
}
macro_rules! new_node {
($($decl:tt)*) => {
$($decl)*(discriminator: usize, v: T) -> Self {
Self {
v,
next: AtomicPtr::new(core::ptr::null_mut()),
available_next: AtomicPtr::new(core::ptr::null_mut()),
discriminator,
}
}
};
}
impl<T> Node<T> {
#[cfg(not(loom))]
new_node!(const fn new);
#[cfg(loom)]
new_node!(fn new);
}
pub struct Pool<T> {
head: AtomicPtr<Node<T>>,
head_released: AtomicPtr<Node<T>>,
discriminator: usize,
count: AtomicUsize,
}
unsafe impl<T> Sync for Pool<T> where T: Sync {}
unsafe impl<T> Send for Pool<T> where T: Sync + Send {}
impl<T> core::fmt::Debug for Pool<T>
where
T: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl<T> Default for Pool<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Pool<T> {
pub fn new() -> Self {
Self {
head: AtomicPtr::new(core::ptr::null_mut()),
head_released: AtomicPtr::new(core::ptr::null_mut()),
count: AtomicUsize::new(0),
discriminator: DISCRIMINATOR.fetch_add(1, Ordering::AcqRel),
}
}
pub fn size(&self) -> usize {
self.count.load(Ordering::Acquire)
}
fn try_acquire_available<const N: usize>(&self) -> (*const Node<T>, usize) {
debug_assert!(N >= 1);
debug_assert_eq!(core::ptr::null::<Node<T>>() as usize, 0);
loop {
let avail = self.head_released.load(Ordering::Acquire);
if avail.is_null() {
return (avail, 0);
}
debug_assert_ne!(avail, LOCK_BIT as *mut _);
if (avail as usize & LOCK_BIT) == 0 {
if self
.head_released
.compare_exchange_weak(
avail,
with_lock_bit(avail),
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
let (rec, n) = unsafe { self.try_acquire_available_locked::<N>(avail) };
debug_assert!(n >= 1, "head_available was not null");
debug_assert!(n <= N);
return (rec, n);
} else {
#[cfg(not(any(loom, feature = "std")))]
core::hint::spin_loop();
#[cfg(any(loom, feature = "std"))]
crate::sync::yield_now();
}
}
}
}
unsafe fn try_acquire_available_locked<const N: usize>(
&self,
head: *const Node<T>,
) -> (*const Node<T>, usize) {
debug_assert!(N >= 1);
debug_assert!(!head.is_null());
let mut tail = head;
let mut n = 1;
let mut next = unsafe { &*tail }.available_next.load(Ordering::Relaxed);
while !next.is_null() && n < N {
debug_assert_eq!((next as usize) & LOCK_BIT, 0);
tail = next;
next = unsafe { &*tail }.available_next.load(Ordering::Relaxed);
n += 1;
}
self.head_released.store(next, Ordering::Release);
unsafe { &*tail }
.available_next
.store(core::ptr::null_mut(), Ordering::Relaxed);
(head, n)
}
}
impl<T> Pool<T>
where
T: Default,
{
pub fn acquire(&self) -> Acquired<'_, T> {
self.acquire_many::<1>().into_iter().next().expect("N = 1")
}
pub fn acquire_many<const N: usize>(&self) -> [Acquired<'_, T>; N] {
debug_assert!(N >= 1);
let (mut head, n) = self.try_acquire_available::<N>();
assert!(n <= N);
let mut tail = core::ptr::null();
[(); N].map(|_| {
if !head.is_null() {
tail = head;
let rec = unsafe { &*head };
head = rec.available_next.load(Ordering::Relaxed);
Acquired(rec)
} else {
let rec = self.acquire_new();
if !tail.is_null() {
unsafe { &*tail }
.available_next
.store(rec as *const _ as *mut _, Ordering::Relaxed);
}
tail = rec as *const _;
Acquired(rec)
}
})
}
pub(crate) fn acquire_new(&self) -> &Node<T>
where
T: Default,
{
let node = Box::into_raw(Box::new(Node::new(self.discriminator, T::default())));
let mut head = self.head.load(Ordering::Acquire);
loop {
unsafe { &mut *node }.next.with_mut(|p| *p = head);
match self.head.compare_exchange_weak(
head,
node,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
self.count.fetch_add(1, Ordering::SeqCst);
break unsafe { &*node };
}
Err(head_now) => {
head = head_now
}
}
}
}
}
impl<T> Pool<T> {
pub fn release(&self, rec: Acquired<'_, T>) {
let rec = rec.0;
assert!(rec.available_next.load(Ordering::Relaxed).is_null());
self.push_available(rec, rec);
}
pub fn release_many<const N: usize>(&self, recs: [Acquired<'_, T>; N]) {
if N == 0 {
return;
}
let head = recs[0].0;
let tail = recs.last().expect("N > 0").0;
assert!(tail.available_next.load(Ordering::Relaxed).is_null());
self.push_available(head, tail);
}
fn push_available(&self, head: &Node<T>, tail: &Node<T>) {
debug_assert!(tail.available_next.load(Ordering::Relaxed).is_null());
if cfg!(debug_assertions) {
let mut node = head;
loop {
assert_eq!(
self.discriminator, node.discriminator,
"Tried to call Pool::release with Acquired object \
that was obtained from a different Pool instance"
);
let next = node.available_next.load(Ordering::Acquire);
if next.is_null() {
break;
} else {
node = unsafe { &*next };
}
}
assert_eq!(node as *const _, tail as *const _);
}
debug_assert_eq!(head as *const _ as usize & LOCK_BIT, 0);
loop {
let avail = self.head_released.load(Ordering::Acquire);
if (avail as usize & LOCK_BIT) == 0 {
tail.available_next
.store(avail as *mut _, Ordering::Relaxed);
if self
.head_released
.compare_exchange_weak(
avail,
head as *const _ as *mut _,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else {
#[cfg(not(any(loom, feature = "std")))]
core::hint::spin_loop();
#[cfg(any(loom, feature = "std"))]
crate::sync::yield_now();
}
}
}
}
pub struct PoolIter<'a, T> {
list: PhantomData<&'a Pool<T>>,
head: *const Node<T>,
}
impl<T> Pool<T> {
pub fn iter(&self) -> PoolIter<'_, T> {
PoolIter {
list: PhantomData,
head: self.head.load(Ordering::Acquire),
}
}
}
impl<'a, T> IntoIterator for &'a Pool<T> {
type IntoIter = PoolIter<'a, T>;
type Item = &'a T;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'pool, T> Iterator for PoolIter<'pool, T> {
type Item = &'pool T;
fn next(&mut self) -> Option<Self::Item> {
if self.head.is_null() {
None
} else {
let n = unsafe { &*self.head };
let v = &n.v;
self.head = n.next.load(Ordering::Relaxed);
Some(v)
}
}
}
impl<T> Drop for Pool<T> {
fn drop(&mut self) {
let mut node: *mut Node<T> = self.head.with_mut(|p| *p);
while !node.is_null() {
let mut n: Box<Node<T>> = unsafe { Box::from_raw(node) };
node = n.next.with_mut(|p| *p);
drop(n);
}
}
}
fn with_lock_bit<T>(ptr: *mut Node<T>) -> *mut Node<T> {
int_to_ptr_with_provenance(ptr as usize | LOCK_BIT, ptr)
}
#[allow(dead_code)]
fn without_lock_bit<T>(ptr: *mut Node<T>) -> *mut Node<T> {
int_to_ptr_with_provenance(ptr as usize & !LOCK_BIT, ptr)
}
fn int_to_ptr_with_provenance<T>(addr: usize, prov: *mut T) -> *mut T {
let ptr = prov.cast::<u8>();
ptr.wrapping_add(addr.wrapping_sub(ptr as usize)).cast()
}
#[cfg(test)]
mod tests {
use super::Pool;
use core::{
ptr::null_mut,
sync::atomic::{AtomicU8, Ordering},
};
#[test]
fn simple() {
let list = Pool::<AtomicU8>::new();
let rec1 = list.acquire();
rec1.store(1, Ordering::Release);
let rec2 = list.acquire();
rec2.store(2, Ordering::Release);
list.release(rec1);
let rec3 = list.acquire();
assert_eq!(rec3.load(Ordering::Acquire), 1);
list.release(rec2);
}
#[test]
fn acquire_many_skips_used_nodes() {
let list = Pool::<()>::new();
let rec1 = list.acquire();
let rec2 = list.acquire();
let rec3 = list.acquire();
assert_eq!(
rec3.0.next.load(Ordering::Relaxed),
rec2.0 as *const _ as *mut _
);
assert_eq!(
rec2.0.next.load(Ordering::Relaxed),
rec1.0 as *const _ as *mut _
);
assert_eq!(rec1.0.next.load(Ordering::Relaxed), core::ptr::null_mut());
list.release(rec1);
list.release(rec3);
let [one, two, three] = list.acquire_many();
assert_eq!(
one.0.available_next.load(Ordering::Relaxed),
two.0 as *const _ as *mut _
);
assert_eq!(
two.0.available_next.load(Ordering::Relaxed),
three.0 as *const _ as *mut _
);
assert_eq!(
three.0.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(
three.0.next.load(Ordering::Relaxed),
one.0 as *const _ as *mut _
);
assert_eq!(
one.0.next.load(Ordering::Relaxed),
rec2.0 as *const _ as *mut _
);
assert_eq!(
rec2.0.next.load(Ordering::Relaxed),
two.0 as *const _ as *mut _
);
}
#[test]
fn acquire_many_orders_nodes_between_acquires() {
let list = Pool::<()>::new();
let rec1 = list.acquire();
let rec2 = list.acquire();
assert_eq!(
rec2.0.next.load(Ordering::Relaxed),
rec1.0 as *const _ as *mut _
);
list.release(rec2);
let [one, two] = list.acquire_many();
assert_eq!(
one.0.available_next.load(Ordering::Relaxed),
two.0 as *const _ as *mut _
);
assert_eq!(
two.0.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(
two.0.next.load(Ordering::Relaxed),
one.0 as *const _ as *mut _
);
assert_eq!(
one.0.next.load(Ordering::Relaxed),
rec1.0 as *const _ as *mut _
);
}
#[test]
fn acquire_many_properly_orders_reused_nodes() {
let list = Pool::<()>::new();
let rec1 = list.acquire();
let rec2 = list.acquire();
let rec3 = list.acquire();
assert_eq!(rec1.0.next.load(Ordering::Relaxed), core::ptr::null_mut(),);
assert_eq!(
rec2.0.next.load(Ordering::Relaxed),
rec1.0 as *const _ as *mut _
);
assert_eq!(
rec3.0.next.load(Ordering::Relaxed),
rec2.0 as *const _ as *mut _
);
list.release(rec1);
list.release(rec2);
list.release(rec3);
let [one, two, three, four, five] = list.acquire_many();
assert_eq!(
one.0.available_next.load(Ordering::Relaxed),
two.0 as *const _ as *mut _
);
assert_eq!(
two.0.available_next.load(Ordering::Relaxed),
three.0 as *const _ as *mut _
);
assert_eq!(
three.0.available_next.load(Ordering::Relaxed),
four.0 as *const _ as *mut _
);
assert_eq!(
four.0.available_next.load(Ordering::Relaxed),
five.0 as *const _ as *mut _
);
assert_eq!(
five.0.available_next.load(Ordering::Relaxed),
core::ptr::null_mut(),
);
assert_eq!(
five.0.next.load(Ordering::Relaxed),
four.0 as *const _ as *mut _
);
assert_eq!(
four.0.next.load(Ordering::Relaxed),
one.0 as *const _ as *mut _
);
assert_eq!(
one.0.next.load(Ordering::Relaxed),
two.0 as *const _ as *mut _
);
assert_eq!(
two.0.next.load(Ordering::Relaxed),
three.0 as *const _ as *mut _
);
assert_eq!(three.0.next.load(Ordering::Relaxed), null_mut());
}
}