use arc_swap::ArcSwapOption;
use std::{
fmt::{Debug, Display, Write},
sync::Arc,
};
type CellInner<T> = Option<Arc<TreiberCell<T>>>;
pub struct TreiberStack<T: Send + Sync> {
head: ArcSwapOption<TreiberCell<T>>,
}
struct TreiberCell<T: Send + Sync> {
value: Arc<T>,
next: CellInner<T>,
}
impl<T: Send + Sync> Default for TreiberStack<T> {
fn default() -> Self {
Self {
head: ArcSwapOption::empty(),
}
}
}
impl<T: Send + Sync> Into<Vec<Arc<T>>> for TreiberStack<T> {
fn into(self) -> Vec<Arc<T>> {
self.drain()
}
}
impl<T: Send + Sync, I: IntoIterator<Item = J>, J: Into<T>> From<I> for TreiberStack<T> {
fn from(value: I) -> Self {
let result = Self::default();
for node in value.into_iter() {
result.push(node)
}
result
}
}
impl<T: Send + Sync> TreiberStack<T> {
pub fn initialized_with(item: T) -> Self {
Self {
head: ArcSwapOption::new(Some(Arc::new(TreiberCell {
value: Arc::new(item),
next: None,
}))),
}
}
pub fn push<I: Into<T>>(&self, val: I) {
let a = Arc::new(val.into());
self.push_arc(a);
}
pub fn push_arc(&self, a: Arc<T>) {
self.head.rcu(|old| {
if let Some(curr_head) = old {
let new_head = prepend(curr_head.clone(), a.clone());
Some(Arc::new(new_head))
} else {
Some(Arc::new(TreiberCell {
value: a.clone(),
next: None,
}))
}
});
}
pub fn pop(&self) -> Option<Arc<T>> {
let popped = self.head.rcu(|old| match old {
Some(curr_head) => {
let mm = curr_head.next.clone();
if let Some(old_next) = mm {
Some(old_next)
} else {
None
}
}
None => None,
});
if let Some(v) = popped {
let result = Some(v.value.clone());
result
} else {
None
}
}
pub fn pop_raw(&self) -> Option<T>
where
T: Copy,
{
if let Some(result) = self.pop() {
Some(*result)
} else {
None
}
}
pub fn drain_all_into<F: FnMut(Arc<T>)>(&self, mut f: F) -> usize {
let mut head = self.head.swap(None);
let mut processed = 0_usize;
while let Some(curr) = head {
f(curr.value.clone());
processed += 1;
head = curr.next.clone()
}
processed
}
pub fn drain_all_copy<F: FnMut(T)>(&self, mut f: F) -> usize
where
T: Copy,
{
let mut head = self.head.swap(None);
let mut processed = 0_usize;
while let Some(curr) = head {
let val = *curr.value;
f(val);
processed += 1;
head = curr.next.clone()
}
processed
}
pub fn drain_into<F: FnMut(Arc<T>) -> bool>(&self, mut f: F) -> usize {
let mut processed = 0_usize;
loop {
if let Some(item) = self.pop() {
processed += 1;
if !f(item) {
break;
}
} else {
break;
}
}
processed
}
#[inline]
pub fn is_empty(&self) -> bool {
self.head.load_full().is_none()
}
pub fn len(&self) -> usize {
if let Some(head) = self.head.load_full() {
head.len()
} else {
0
}
}
pub fn clear(&self) {
self.head.store(None);
}
pub fn contains<F: FnMut(&T) -> bool>(&self, mut predicate: F) -> bool {
if let Some(head) = self.head.load_full().as_ref() {
predicate(&head.value) || {
let mut maybe_next = &head.next;
while let Some(next) = maybe_next {
if predicate(&next.value) {
return true;
}
maybe_next = &next.next
}
false
}
} else {
false
}
}
pub fn drain(&self) -> Vec<Arc<T>> {
if let Some(head) = self.head.swap(None) {
let mut result = Vec::new();
result.push(head.value.clone());
let mut maybe_next = &head.next;
while let Some(next) = maybe_next {
result.push(next.value.clone());
maybe_next = &next.next
}
result
} else {
vec![]
}
}
pub fn drain_replace(&self, new_head: T) -> Vec<Arc<T>> {
if let Some(head) = self.head.swap(Some(Arc::new(TreiberCell {
value: Arc::new(new_head),
next: None,
}))) {
let mut result = Vec::new();
result.push(head.value.clone());
let mut maybe_next = &head.next;
while let Some(next) = maybe_next {
result.push(next.value.clone());
maybe_next = &next.next
}
result
} else {
vec![]
}
}
pub fn drain_transforming<R, F: FnMut(Arc<T>) -> R>(&self, mut transform: F) -> Vec<R> {
if let Some(head) = self.head.swap(None) {
let mut result = Vec::new();
result.push(transform(head.value.clone()));
let mut nxt = &head.next;
while let Some(next) = nxt {
result.push(transform(next.value.clone()));
nxt = &next.next
}
result
} else {
vec![]
}
}
pub fn snapshot(&self) -> Vec<Arc<T>> {
if let Some(head) = self.head.load_full().as_ref() {
let mut result = Vec::new();
head.copy_into(&mut result);
result
} else {
vec![]
}
}
pub fn peek(&self) -> Option<Arc<T>> {
if let Some(head) = self.head.load_full() {
Some(head.value.clone())
} else {
None
}
}
pub fn iter(&self) -> TreiberStackIterator<T> {
TreiberStackIterator {
curr: self.head.load().clone(),
}
}
pub fn exchange_contents(&self, other: &Self) {
let _ = self.head.rcu(|my_head| other.head.rcu(|_| my_head.clone()));
}
}
impl<T: Send + Sync> TreiberCell<T> {
fn len(&self) -> usize {
let mut result = 1;
let mut nxt = &self.next;
while let Some(next) = nxt {
result += 1;
nxt = &next.next;
}
result
}
fn copy_into(&self, into: &mut Vec<Arc<T>>) {
into.push(self.value.clone());
let mut nxt = &self.next;
while let Some(next) = nxt {
into.push(next.value.clone());
nxt = &next.next;
}
}
}
pub struct TreiberStackIterator<T: Send + Sync> {
curr: Option<Arc<TreiberCell<T>>>,
}
impl<'l, T: Send + Sync + 'l> Iterator for TreiberStackIterator<T> {
type Item = Arc<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut old: Option<Arc<TreiberCell<T>>> = None;
std::mem::swap(&mut old, &mut self.curr);
if let Some(node) = old {
self.curr = node.next.clone();
Some(node.value.clone())
} else {
None
}
}
}
fn prepend<T: Send + Sync>(old: Arc<TreiberCell<T>>, t: Arc<T>) -> TreiberCell<T> {
let op: CellInner<T> = Some(old);
TreiberCell { value: t, next: op }
}
impl<T: Send + Sync> TreiberCell<T>
where
T: Display,
{
fn stringify(&self, into: &mut String) {
into.push_str(self.value.to_string().as_str());
let mut nxt = &self.next;
while let Some(next) = nxt {
into.push(',');
into.push_str(next.value.to_string().as_str());
nxt = &next.next;
}
}
}
impl<T: Send + Sync> TreiberCell<T>
where
T: Debug,
{
fn debugify(&self, into: &mut String) {
into.push_str(format!("{:?}", self.value).as_str());
let mut nxt = &self.next;
while let Some(next) = nxt {
into.push(',');
into.push_str(format!("{:?}", next.value).as_str());
nxt = &next.next;
}
}
}
impl<T: Send + Sync> Display for TreiberCell<T>
where
T: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut into = String::new();
self.stringify(&mut into);
f.write_str(into.as_str())
}
}
impl<T: Send + Sync + Debug> Debug for TreiberCell<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut into = String::new();
self.debugify(&mut into);
f.write_str(into.as_str())
}
}
impl<T: Send + Sync> Display for TreiberStack<T>
where
T: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_char('(')?;
if let Some(head) = self.head.load().as_ref() {
let mut text = String::new();
head.stringify(&mut text);
f.write_str(&text)?;
}
f.write_char(')')
}
}
impl<T: Send + Sync> Debug for TreiberStack<T>
where
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_char('(')?;
if let Some(head) = self.head.load().as_ref() {
let mut text = String::new();
head.debugify(&mut text);
f.write_str(&text)?;
}
f.write_char(')')
}
}
#[cfg(test)]
#[allow(unused_imports, dead_code, clippy::vec_init_then_push)]
mod treiber_stack_tests {
use std::{
fmt::Display,
sync::{atomic::AtomicUsize, Arc},
thread,
};
use super::TreiberStack;
#[test]
fn test_simple() {
let ts: TreiberStack<&str> = TreiberStack::default();
assert!(ts.is_empty());
assert!(ts.peek().is_none());
ts.push("one");
assert!(!ts.is_empty());
assert_eq!(1_usize, ts.len());
assert!(ts.peek().is_some());
assert_eq!(Some(Arc::new("one")), ts.peek());
assert!(
ts.contains(|o| "one".eq(*o)),
"Not present or unequal: 'one'"
);
assert_eq!(1_usize, ts.len());
ts.push("two");
assert!(!ts.is_empty());
assert_eq!(2_usize, ts.len());
ts.push("three");
assert!(!ts.is_empty());
assert_eq!(3_usize, ts.len());
ts.push("four");
assert!(!ts.is_empty());
assert_eq!(4_usize, ts.len());
assert!(ts.peek().is_some());
assert_eq!(Some(Arc::new("four")), ts.peek());
let text = ts.to_string();
assert_eq!("(four,three,two,one)", &text);
let dbg_text = format!("{:?}", ts);
assert_eq!("(\"four\",\"three\",\"two\",\"one\")", &dbg_text);
let a = ts.pop();
assert!(a.is_some());
assert_eq!(&"four", a.as_ref().unwrap().as_ref());
let a = ts.pop();
assert!(a.is_some());
assert_eq!(&"three", a.as_ref().unwrap().as_ref());
let b = ts.pop();
assert_eq!(&"two", b.as_ref().unwrap().as_ref());
let c = ts.pop();
assert_eq!(&"one", c.as_ref().unwrap().as_ref());
assert_eq!(None, ts.pop());
ts.clear();
ts.push("five");
assert_eq!(1, ts.len());
assert!(!ts.is_empty());
ts.clear();
assert_eq!(0, ts.len());
assert!(ts.is_empty());
}
#[test]
fn test_from_and_into() {
let v: Vec<usize> = vec![6, 5, 4, 3, 2, 1];
let stack: TreiberStack<usize> = TreiberStack::from(v);
assert!(!stack.is_empty());
assert_eq!(6, stack.len());
let v: Vec<Arc<usize>> = stack.into();
assert_eq!(6, v.len());
println!("{:?}", v);
let mut vv: Vec<usize> = Vec::with_capacity(v.len());
for item in v.into_iter() {
vv.push(*item);
}
assert_eq!(vec![1, 2, 3, 4, 5, 6], vv);
}
#[test]
fn test_pop_fn() {
let stack: TreiberStack<usize> = TreiberStack::from(vec![6_usize, 5, 4, 3, 2, 1]);
let mut v = Vec::with_capacity(6);
stack.drain_into(|item| {
v.push(*item);
true
});
assert_eq!(6, v.len());
assert_eq!(vec![1, 2, 3, 4, 5, 6], v);
}
#[test]
fn test_pop_fn_filter() {
let stack: TreiberStack<usize> = TreiberStack::from(vec![6_usize, 5, 4, 3, 2, 1]);
let mut v = Vec::with_capacity(6);
stack.drain_into(|item| {
v.push(*item);
*item < 3
});
assert_eq!(vec![1, 2, 3], v);
assert_eq!(3, stack.len());
assert_eq!(vec![4_usize, 5, 6], stack.drain_transforming(|item| *item));
}
#[test]
fn test_threaded() {
const THREADS: usize = 8;
const MAX: usize = 1000;
let ts: TreiberStack<Thing> = TreiberStack::default();
let counter = AtomicUsize::new(0);
let thread_id = AtomicUsize::new(0);
thread::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|| {
let id = thread_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut count: usize = 0;
loop {
let next = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if next > MAX {
break;
}
thread::yield_now();
ts.push(Thing { value: next });
count += 1;
}
assert!(count > 0, "No items added by thread {}", id);
});
}
});
let mut from_iter = Vec::with_capacity(ts.len());
for item in ts.iter() {
from_iter.push(item.value);
}
let copy = ts.snapshot();
let mut from_copy = Vec::with_capacity(copy.len());
for t in copy {
from_copy.push(t.value);
}
from_copy.sort();
let mut expected = Vec::with_capacity(MAX + 1);
for i in 0_usize..(MAX + 1) {
expected.push(i);
}
let mut got = ts.drain_transforming(|t| t.value);
got.sort();
from_iter.sort();
assert_eq!(expected, got, "Contents do not match");
assert_eq!(expected, from_copy, "Contents from copy do not match");
assert_eq!(expected, from_iter, "Contents from iterator do not match");
assert!(ts.is_empty(), "Should be empty");
}
#[derive(Debug)]
struct Thing {
value: usize,
}
impl Display for Thing {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.value.to_string().as_str())
}
}
}