use crate::{
atm_p::{CASErr, NonNullAtomicNode, NonNullAtomicWeakNode},
cursor::Cursor,
};
use std::{
borrow::Borrow,
fmt::{self, Debug},
hash::{Hash, Hasher},
hint,
mem::{ManuallyDrop, offset_of},
ops::Deref,
ptr::{self, NonNull},
sync::{
Arc, Weak,
atomic::{self, AtomicUsize, Ordering},
},
};
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
const INTERNAL_OVERFLOW_ERROR: &str = "Node counter overflow";
macro_rules! acquire {
($x:expr) => {
atomic::fence(Ordering::Acquire)
};
}
pub trait RawExt<T>
where
Self: Sized,
{
fn as_ptr(this: &Self) -> *const T;
fn ptr_eq(lhs: &Self, rhs: &Self) -> bool {
ptr::addr_eq(Self::as_ptr(lhs), Self::as_ptr(rhs))
}
fn into_raw(this: Self) -> *const T;
unsafe fn from_raw(ptr: *const T) -> Self;
}
impl<T> RawExt<T> for Arc<T> {
fn as_ptr(this: &Self) -> *const T {
Self::as_ptr(this)
}
fn ptr_eq(lhs: &Self, rhs: &Self) -> bool {
Self::ptr_eq(lhs, rhs)
}
fn into_raw(this: Self) -> *const T {
Self::into_raw(this)
}
unsafe fn from_raw(ptr: *const T) -> Self {
unsafe { Self::from_raw(ptr) }
}
}
impl<T> RawExt<T> for Weak<T> {
fn as_ptr(this: &Self) -> *const T {
Self::as_ptr(this)
}
fn ptr_eq(lhs: &Self, rhs: &Self) -> bool {
Self::ptr_eq(lhs, rhs)
}
fn into_raw(this: Self) -> *const T {
Self::into_raw(this)
}
unsafe fn from_raw(ptr: *const T) -> Self {
unsafe { Self::from_raw(ptr) }
}
}
impl<T> RawExt<T> for Node<T> {
fn as_ptr(this: &Self) -> *const T {
Self::as_ptr(this)
}
fn into_raw(this: Self) -> *const T {
Self::into_raw(this)
}
unsafe fn from_raw(ptr: *const T) -> Self {
unsafe { Self::from_raw(ptr) }
}
}
impl<T> RawExt<T> for WeakNode<T> {
fn as_ptr(this: &Self) -> *const T {
Self::as_ptr(this)
}
fn into_raw(this: Self) -> *const T {
Self::into_raw(this)
}
unsafe fn from_raw(ptr: *const T) -> Self {
unsafe { Self::from_raw(ptr) }
}
}
pub struct Next<'a, Strong, Weak> {
pub strong: &'a Strong,
pub weak: &'a Weak,
}
#[repr(C)]
struct NodeInner<T> {
strong: AtomicUsize,
weak: AtomicUsize,
next: NonNullAtomicNode<T>,
weak_next: NonNullAtomicWeakNode<T>,
data: T,
}
#[repr(transparent)]
pub struct Node<T> {
ptr: NonNull<NodeInner<T>>,
}
impl<T> Debug for Node<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Node").field("ptr", &self.ptr).finish()
}
}
pub struct UniqueNodeIter<T> {
start: Option<Node<T>>,
next: Option<Node<T>>,
}
pub struct NodeIter<T> {
current: Option<Node<T>>,
}
impl<T> Deref for Node<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner().data
}
}
impl<T> Node<T> {
pub fn new(data: T) -> Self {
let x = Box::new(NodeInner {
strong: AtomicUsize::new(1),
weak: AtomicUsize::new(1),
next: unsafe { NonNullAtomicNode::null() },
weak_next: unsafe { NonNullAtomicWeakNode::null() },
data,
});
let this = Self {
ptr: Box::leak(x).into(),
};
let Next { strong, weak } = Self::next(&this);
strong.store(this.clone(), Ordering::Relaxed);
weak.store(Self::downgrade(&this), Ordering::Relaxed);
this
}
pub fn into_new_ring(this: Self) -> Self {
drop(Self::remove(&this));
Self::next(&this)
.weak
.store(Self::downgrade(&this), Ordering::Release);
this
}
pub fn is_list(this: &Self) -> bool {
Self::ptr_eq(this, &Self::next(this).strong.load(Ordering::Acquire))
&& Self::weak_ptr_eq(this, &Self::next(this).weak.load(Ordering::Acquire))
}
pub fn push_before<F: Fn(&T) -> bool>(&self, elem: T, predicate: F) -> Result<Self, T> {
let new_node = Node::new(elem);
let mut current = self.clone();
loop {
let next = Self::load_next_strong(¤t);
if predicate(&next) {
let Next { strong, weak: _ } = Self::next(&new_node);
strong.store(next.clone(), Ordering::Release);
match Self::next(¤t).strong.compare_exchange(
&next,
new_node.clone(),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
break Ok(new_node);
}
Err(CASErr { new, .. }) => drop(new),
}
}
current = next;
if Node::ptr_eq(self, ¤t) {
break Err(Node::try_unwrap(new_node)
.map_err(|_| "unwrapping Node<T> failure")
.expect("new_node has not been given to any AtomicNode's"));
}
}
}
pub fn pop_when<F: Fn(&T) -> bool>(this: &Self, predicate: F) -> Option<Self> {
let mut current = this.clone();
loop {
let next = Self::load_next_strong(¤t);
if predicate(&next) {
let new_next = Self::load_next_strong(&next);
match Self::next(¤t).strong.compare_exchange(
&next,
new_next.clone(),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(old) => {
let Next { strong, weak } = Self::next(&old);
weak.store(Self::downgrade(&new_next), Ordering::Release);
strong.store(old.clone(), Ordering::Release);
break Some(old);
}
Err(CASErr {
actual: next,
new: _,
}) => {
current = next;
}
}
}
if Node::ptr_eq(this, ¤t) {
break None;
}
}
}
pub fn remove(this: &Self) -> Option<Self> {
let successor = Self::load_next_strong(this);
if Node::ptr_eq(this, &successor) {
Self::next(this)
.weak
.store(Self::downgrade(this), Ordering::Release);
Self::next(this).strong.store(successor, Ordering::Release);
return None;
}
let mut pred = successor.clone();
loop {
let next = Self::load_next_strong(&pred);
if Node::ptr_eq(&next, this) {
let new_next = successor.clone();
match Self::next(&pred).strong.compare_exchange(
&next,
new_next.clone(),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
let Next { strong, weak } = Self::next(this);
weak.store(Self::downgrade(&new_next), Ordering::Release);
strong.store(this.clone(), Ordering::Release);
return Some(new_next);
}
Err(CASErr { actual, .. }) => {
pred = actual;
continue;
}
}
}
pred = next;
if Node::ptr_eq(&pred, this) {
return None;
}
}
}
pub fn as_ptr(this: &Self) -> *const T {
let ptr: *mut NodeInner<T> = NonNull::as_ptr(this.ptr);
unsafe { &raw mut (*ptr).data }
}
pub fn ptr_eq(lhs: &Self, rhs: &Self) -> bool {
RawExt::ptr_eq(lhs, rhs)
}
pub fn into_inner(this: Self) -> Option<T> {
let this = ManuallyDrop::new(this);
match this.inner().strong.fetch_sub(1, Ordering::Release) {
1 => {
acquire!(this.inner().strong);
let elem = unsafe { ptr::read(&mut (*this.ptr.as_ptr()).data) };
drop(WeakNode { ptr: this.ptr });
Some(elem)
}
2 => {
let next = Self::next(&this).strong.load_noclone(Ordering::Acquire);
if next
.inner()
.strong
.compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return None;
}
let is_self = Self::ptr_eq(&next, &this);
next.inner().strong.store(1, Ordering::Release);
if !is_self {
return None;
}
return Self::into_inner(unsafe {
Self::next(&this).strong.take(Ordering::Relaxed)
});
}
_ => None,
}
}
pub fn try_unwrap(this: Self) -> Result<T, Self> {
match this
.inner()
.strong
.compare_exchange(1, 0, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => {
acquire!(this.inner().strong);
let this = ManuallyDrop::new(this);
let elem: T = unsafe { ptr::read(&this.ptr.as_ref().data) };
drop(WeakNode { ptr: this.ptr });
Ok(elem)
}
Err(actual) => {
if actual != 2 {
return Err(this);
}
let next = Self::next(&this).strong.load_noclone(Ordering::Acquire);
if !Self::ptr_eq(&next, &this) {
return Err(this);
}
let this = ManuallyDrop::new(this);
this.inner().strong.fetch_sub(1, Ordering::Release);
let next_self = unsafe { Self::next(&this).strong.take(Ordering::Relaxed) };
match Self::try_unwrap(next_self) {
Ok(value) => Ok(value),
Err(_) => unreachable!("self-referential Node::try_unwrap failed"),
}
}
}
}
#[must_use = "losing the pointer will leak memory"]
pub fn into_raw(this: Self) -> *const T {
let this = ManuallyDrop::new(this);
Self::as_ptr(&*this)
}
pub unsafe fn from_raw(ptr: *const T) -> Self {
Self {
ptr: unsafe {
NonNull::new_unchecked(
ptr.byte_sub(offset_of!(NodeInner<T>, data)) as *mut NodeInner<T>
)
},
}
}
fn inner(&self) -> &NodeInner<T> {
unsafe { self.ptr.as_ref() }
}
pub fn strong_count(this: &Self) -> usize {
this.inner().strong.load(Ordering::Relaxed)
}
pub fn weak_count(this: &Self) -> usize {
this.inner().weak.load(Ordering::Relaxed)
}
pub fn next(this: &Self) -> Next<'_, NonNullAtomicNode<T>, NonNullAtomicWeakNode<T>> {
let inner = this.inner();
Next {
strong: &inner.next,
weak: &inner.weak_next,
}
}
pub fn downgrade(this: &Self) -> WeakNode<T> {
let inner = this.inner();
let mut cur = inner.weak.load(Ordering::Relaxed);
loop {
if cur == usize::MAX {
hint::spin_loop();
cur = inner.weak.load(Ordering::Relaxed);
continue;
}
assert!(cur <= MAX_REFCOUNT, "{}", INTERNAL_OVERFLOW_ERROR);
match inner.weak.compare_exchange_weak(
cur,
cur + 1,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
return WeakNode { ptr: this.ptr };
}
Err(old) => cur = old,
}
}
}
pub fn load_next_strong(this: &Self) -> Self {
Self::next(this).strong.load(Ordering::Acquire)
}
pub fn load_next_strong_unique(this: &Self) -> Option<Self> {
let next = Self::load_next_strong(this);
if Self::ptr_eq(this, &next) {
None
} else {
Some(next)
}
}
pub fn load_next_weak(this: &Self) -> WeakNode<T> {
Self::next(this).weak.load(Ordering::Acquire)
}
pub fn load_next_weak_unique(this: &Self) -> Option<WeakNode<T>> {
let next = Self::load_next_weak(this);
if Self::weak_ptr_eq(this, &next) {
None
} else {
Some(next)
}
}
pub fn resolve_next(this: &Self) -> Option<Self> {
if let Some(strong_next) = Self::load_next_strong_unique(this) {
Some(strong_next)
} else {
let weak_next = Self::load_next_weak(this);
if let Some(strong_next) = weak_next.upgrade() {
Some(strong_next)
} else {
weak_next.find_next_strong()
}
}
}
pub fn unique_iter(&self) -> UniqueNodeIter<T> {
UniqueNodeIter {
start: Some(self.clone()),
next: Some(self.clone()),
}
}
pub fn cursor(&self) -> Cursor<T> {
Cursor::new(self.clone())
}
pub fn weak_ptr_eq(strong: &Self, weak: &WeakNode<T>) -> bool {
ptr::addr_eq(Node::as_ptr(strong), WeakNode::as_ptr(weak))
}
}
impl<T: PartialEq> PartialEq for Node<T> {
fn eq(&self, other: &Self) -> bool {
(**self).eq(&**other)
}
fn ne(&self, other: &Self) -> bool {
(**self).ne(&**other)
}
}
impl<T: Eq> Eq for Node<T> {}
impl<T: PartialOrd> PartialOrd for Node<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
(**self).partial_cmp(&**other)
}
fn lt(&self, other: &Self) -> bool {
(**self).lt(&**other)
}
fn le(&self, other: &Self) -> bool {
(**self).le(&**other)
}
fn gt(&self, other: &Self) -> bool {
(**self).gt(&**other)
}
fn ge(&self, other: &Self) -> bool {
(**self).ge(&**other)
}
}
impl<T: Ord> Ord for Node<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(**self).cmp(&**other)
}
}
impl<T: Hash> Hash for Node<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
(**self).hash(state)
}
}
impl<T: fmt::Display> fmt::Display for Node<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
impl<T> fmt::Pointer for Node<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = Self::as_ptr(self);
fmt::Pointer::fmt(&ptr, f)
}
}
impl<T: Default> Default for Node<T> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> From<T> for Node<T> {
fn from(value: T) -> Self {
Self::new(value)
}
}
impl<T> AsRef<T> for Node<T> {
fn as_ref(&self) -> &T {
self
}
}
impl<T> Borrow<T> for Node<T> {
fn borrow(&self) -> &T {
self
}
}
impl<T> Node<T> {
pub fn extend_using<I, F>(&mut self, iter: I, predicate: F) -> Vec<T>
where
I: IntoIterator<Item = T>,
F: Fn(&T) -> bool,
{
let mut tail = self.clone();
let mut failed = Vec::new();
for value in iter {
match tail.push_before(value, &predicate) {
Ok(_) => tail = Node::load_next_strong(&tail),
Err(v) => failed.push(v),
}
}
failed
}
pub fn extend_with_predicates<I, F>(&mut self, iter: I) -> Vec<(T, F)>
where
I: IntoIterator<Item = (T, F)>,
F: Fn(&T) -> bool,
{
let mut tail = self.clone();
let mut failed = Vec::new();
for (value, predicate) in iter {
match tail.push_before(value, &predicate) {
Ok(_) => tail = Node::load_next_strong(&tail),
Err(v) => failed.push((v, predicate)),
}
}
failed
}
}
impl<T> Extend<T> for Node<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
let _ = self.extend_using(iter, |_| true);
}
}
unsafe impl<T: Send + Sync> Send for Node<T> {}
unsafe impl<T: Send + Sync> Sync for Node<T> {}
impl<T> Clone for Node<T> {
fn clone(&self) -> Self {
let old_size = self.inner().strong.fetch_add(1, Ordering::Relaxed);
assert!(old_size <= MAX_REFCOUNT, "{}", INTERNAL_OVERFLOW_ERROR);
Self { ptr: self.ptr }
}
}
impl<T> Iterator for NodeIter<T> {
type Item = Node<T>;
fn next(&mut self) -> Option<Self::Item> {
let current = self.current.take()?;
self.current = Node::resolve_next(¤t);
self.current.clone()
}
}
impl<T> Iterator for UniqueNodeIter<T> {
type Item = Node<T>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.as_ref()?;
let current = self.next.take()?;
let successor = Node::resolve_next(¤t);
self.next = successor.and_then(|next| {
if Node::ptr_eq(&next, &start) {
None
} else {
Some(next)
}
});
if self.next.is_none() {
drop(self.start.take());
}
Some(current)
}
}
impl<T> Drop for Node<T> {
#[inline(never)]
fn drop(&mut self) {
match self.inner().strong.fetch_sub(1, Ordering::Release) {
1 => {
acquire!(self.inner().strong);
drop(WeakNode { ptr: self.ptr });
unsafe { ptr::drop_in_place(&raw mut self.ptr.as_mut().next) };
unsafe { ptr::drop_in_place(&raw mut self.ptr.as_mut().data) };
}
2 => {
let next = Self::next(self).strong.load_noclone(Ordering::Acquire);
if next
.inner()
.strong
.compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
if Self::ptr_eq(&next, &self) {
next.inner().strong.store(1, Ordering::Release);
unsafe { drop(Self::next(self).strong.take(Ordering::Relaxed)) }
} else {
next.inner().strong.store(1, Ordering::Release);
}
}
}
_ => return,
}
}
}
#[repr(transparent)]
pub struct WeakNode<T> {
ptr: NonNull<NodeInner<T>>,
}
struct WeakInner<'a, T> {
strong: &'a AtomicUsize,
weak: &'a AtomicUsize,
next: &'a NonNullAtomicWeakNode<T>,
}
impl<T> WeakNode<T> {
pub fn as_ptr(&self) -> *const T {
let ptr: *mut NodeInner<T> = NonNull::as_ptr(self.ptr);
unsafe { &raw mut (*ptr).data }
}
#[must_use = "losing the pointer will leak memory"]
pub fn into_raw(self) -> *const T {
ManuallyDrop::new(self).as_ptr()
}
pub unsafe fn from_raw(ptr: *const T) -> Self {
Self {
ptr: unsafe {
NonNull::new_unchecked(
ptr.byte_sub(offset_of!(NodeInner<T>, data)) as *mut NodeInner<T>
)
},
}
}
pub fn upgrade(&self) -> Option<Node<T>> {
#[inline]
fn checked_increment(n: usize) -> Option<usize> {
if n == 0 {
return None;
}
assert!(n <= MAX_REFCOUNT, "{}", INTERNAL_OVERFLOW_ERROR);
Some(n + 1)
}
if self
.inner()
.strong
.fetch_update(Ordering::Acquire, Ordering::Relaxed, checked_increment)
.is_ok()
{
Some(Node { ptr: self.ptr })
} else {
None
}
}
pub fn next(&self) -> &NonNullAtomicWeakNode<T> {
self.inner().next
}
pub fn load_next(&self) -> WeakNode<T> {
self.next().load(Ordering::Acquire)
}
pub fn load_next_unique(&self) -> Option<WeakNode<T>> {
let next = self.load_next();
if Self::ptr_eq(self, &next) {
None
} else {
Some(next)
}
}
pub fn find_next_strong(&self) -> Option<Node<T>> {
let mut next = self.load_next_unique()?;
loop {
if let Some(strong_next) = next.upgrade() {
break Some(strong_next);
} else {
if let Some(weak_next) = next.load_next_unique() {
next = weak_next;
continue;
} else {
break None;
}
}
}
}
#[inline]
fn inner(&self) -> WeakInner<'_, T> {
let ptr = self.ptr.as_ptr();
unsafe {
WeakInner {
strong: &(*ptr).strong,
weak: &(*ptr).weak,
next: &(*ptr).weak_next,
}
}
}
}
impl<T> fmt::Debug for WeakNode<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("(WeakNode)")
}
}
impl<T> fmt::Pointer for WeakNode<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.as_ptr();
fmt::Pointer::fmt(&ptr, f)
}
}
unsafe impl<T: Send + Sync> Send for WeakNode<T> {}
unsafe impl<T: Send + Sync> Sync for WeakNode<T> {}
impl<T> Clone for WeakNode<T> {
fn clone(&self) -> Self {
let old_size = self.inner().weak.fetch_add(1, Ordering::Relaxed);
assert!(old_size <= MAX_REFCOUNT, "{}", INTERNAL_OVERFLOW_ERROR);
WeakNode { ptr: self.ptr }
}
}
impl<T> Drop for WeakNode<T> {
fn drop(&mut self) {
match self.inner().weak.fetch_sub(1, Ordering::Release) {
1 => {
acquire!(self.inner().weak);
let NodeInner { next, data, .. } = unsafe { *Box::from_raw(self.ptr.as_ptr()) };
let _ = ManuallyDrop::new(next);
let _ = ManuallyDrop::new(data);
}
2 => {
let next = self.next().load_noclone(Ordering::Acquire);
if next
.inner()
.weak
.compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
if Self::ptr_eq(&next, &self) {
next.inner().weak.store(1, Ordering::Release);
unsafe { drop(self.next().take(Ordering::Relaxed)) }
} else {
next.inner().weak.store(1, Ordering::Release);
}
}
}
_ => return,
}
}
}