#![cfg(test)]
use crate::{cursor::Cursor, sync::Node};
use std::{
collections::HashSet,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Barrier, Mutex,
},
thread::{self, yield_now},
};
fn push_before_anywhere(root: &Node<usize>, mut value: usize) {
for _ in 0..1_000 {
match root.push_before(value, |_| true) {
Ok(_) => return,
Err(v) => {
value = v;
yield_now();
}
}
}
panic!("unable to insert {value} after retries");
}
#[test]
fn extend_preserves_order() {
let mut root = Node::new(0usize);
root.extend([1usize, 2, 3]);
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0, 1, 2, 3]);
}
#[test]
fn extend_splices_before_existing_successor() {
let mut root = Node::new(0usize);
root.push_before(99usize, |_| true).unwrap();
root.extend([1usize, 2]);
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0, 1, 2, 99]);
}
#[test]
fn extend_using_applies_predicate() {
let mut root = Node::new(0usize);
root.push_before(99usize, |_| true).unwrap();
let failed = root.extend_using([1usize, 2], |cur| *cur == 99);
assert!(failed.is_empty());
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0, 1, 2, 99]);
}
#[test]
fn extend_tuple_uses_per_item_predicate() {
let mut root = Node::new(0usize);
root.push_before(99usize, |_| true).unwrap();
let before_99: fn(&usize) -> bool = |cur| *cur == 99;
let failed = root.extend_with_predicates(vec![(1usize, before_99), (2usize, before_99)]);
assert!(failed.is_empty());
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0, 1, 2, 99]);
}
#[test]
fn extend_using_returns_failures() {
let mut root = Node::new(0usize);
let failed = root.extend_using([1usize, 2], |cur| *cur == 99);
assert_eq!(failed, vec![1, 2]);
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0]);
}
#[test]
fn extend_with_predicates_returns_failures() {
let mut root = Node::new(0usize);
let never: fn(&usize) -> bool = |_| false;
let failed = root.extend_with_predicates(vec![(1usize, never), (2usize, never)]);
assert_eq!(failed.len(), 2);
assert_eq!(failed[0].0, 1);
assert_eq!(failed[1].0, 2);
let values: Vec<_> = root.unique_iter().map(|node| *node).collect();
assert_eq!(values, vec![0]);
}
#[test]
fn concurrent_push_before_keeps_all_nodes() {
let root = Node::new(0usize);
let threads = 8;
let per_thread = 25;
let next_id = Arc::new(AtomicUsize::new(1));
let barrier = Arc::new(Barrier::new(threads));
let mut handles = Vec::new();
for _ in 0..threads {
let root = root.clone();
let barrier = Arc::clone(&barrier);
let next_id = Arc::clone(&next_id);
handles.push(thread::spawn(move || {
barrier.wait();
for _ in 0..per_thread {
let id = next_id.fetch_add(1, Ordering::Relaxed);
push_before_anywhere(&root, id);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let mut seen = HashSet::new();
for node in root.unique_iter() {
seen.insert(*node);
}
assert_eq!(seen.len(), threads * per_thread + 1);
for id in 0..=threads * per_thread {
assert!(seen.contains(&id), "missing {id} after concurrent pushes");
}
}
#[test]
fn push_and_pop_race_leaves_singleton_ring() {
let root = Node::new(0usize);
let initial = 4usize;
for i in 1..=initial {
push_before_anywhere(&root, i);
}
let push_total = 20usize;
let next_id = Arc::new(AtomicUsize::new(initial + 1));
let remaining = Arc::new(AtomicUsize::new(initial));
let popped_values: Arc<Mutex<Vec<Node<usize>>>> = Arc::new(Mutex::new(Vec::new()));
let pushers = 2;
let poppers = 2;
let barrier = Arc::new(Barrier::new(pushers + poppers));
let mut handles = Vec::new();
for _ in 0..pushers {
let root = root.clone();
let barrier = Arc::clone(&barrier);
let next_id = Arc::clone(&next_id);
let remaining = Arc::clone(&remaining);
handles.push(thread::spawn(move || {
barrier.wait();
loop {
let id = next_id.fetch_add(1, Ordering::Relaxed);
if id > initial + push_total {
break;
}
push_before_anywhere(&root, id);
remaining.fetch_add(1, Ordering::AcqRel);
}
}));
}
for _ in 0..poppers {
let root = root.clone();
let barrier = Arc::clone(&barrier);
let remaining = Arc::clone(&remaining);
let popped_values = Arc::clone(&popped_values);
handles.push(thread::spawn(move || {
barrier.wait();
let max_attempts = (initial + push_total) * 20;
for _ in 0..max_attempts {
if remaining.load(Ordering::Acquire) == 0 {
break;
}
if let Some(node) = Node::pop_when(&root, |cur| *cur != 0) {
popped_values.lock().unwrap().push(node);
remaining.fetch_sub(1, Ordering::AcqRel);
} else {
yield_now();
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
while let Some(node) = Node::pop_when(&root, |cur| *cur != 0) {
popped_values.lock().unwrap().push(node);
}
let popped = popped_values.lock().unwrap();
let values: Vec<_> = popped.iter().map(|node| **node).collect();
let unique: HashSet<_> = values.iter().copied().collect();
assert_eq!(unique.len(), popped.len());
assert!(popped.len() >= initial);
assert!(popped.len() <= initial + push_total);
assert!(values.iter().all(|v| *v != 0));
assert!(Node::ptr_eq(&Node::load_next_strong(&root), &root));
assert!(Node::resolve_next(&root).is_none());
}
#[test]
fn independent_rings_handle_concurrent_work() {
let ring_a = Node::new(0usize);
let ring_b = Node::new(10_000usize);
let initial = 8usize;
for i in 1..=initial {
push_before_anywhere(&ring_a, i);
push_before_anywhere(&ring_b, 10_000 + i);
}
let extra = 16usize;
let barrier = Arc::new(Barrier::new(4));
let popped_a: Arc<Mutex<Vec<Node<usize>>>> = Arc::new(Mutex::new(Vec::new()));
let popped_b: Arc<Mutex<Vec<Node<usize>>>> = Arc::new(Mutex::new(Vec::new()));
let next_a = Arc::new(AtomicUsize::new(1_000));
let next_b = Arc::new(AtomicUsize::new(20_000));
let mut handles = Vec::new();
{
let ring_a = ring_a.clone();
let barrier = Arc::clone(&barrier);
let next_a = Arc::clone(&next_a);
handles.push(thread::spawn(move || {
barrier.wait();
for _ in 0..extra {
let id = next_a.fetch_add(1, Ordering::Relaxed);
push_before_anywhere(&ring_a, id);
}
}));
}
{
let ring_b = ring_b.clone();
let barrier = Arc::clone(&barrier);
let next_b = Arc::clone(&next_b);
handles.push(thread::spawn(move || {
barrier.wait();
for _ in 0..extra {
let id = next_b.fetch_add(1, Ordering::Relaxed);
push_before_anywhere(&ring_b, id);
}
}));
}
{
let ring_a = ring_a.clone();
let barrier = Arc::clone(&barrier);
let popped_a = Arc::clone(&popped_a);
handles.push(thread::spawn(move || {
barrier.wait();
let mut popped = 0;
let mut attempts = 0;
while popped < initial + extra && attempts < (initial + extra) * 20 {
if let Some(node) = Node::pop_when(&ring_a, |cur| *cur != 0) {
popped += 1;
popped_a.lock().unwrap().push(node);
} else {
attempts += 1;
yield_now();
}
}
}));
}
{
let ring_b = ring_b.clone();
let barrier = Arc::clone(&barrier);
let popped_b = Arc::clone(&popped_b);
handles.push(thread::spawn(move || {
barrier.wait();
let mut popped = 0;
let mut attempts = 0;
while popped < initial + extra && attempts < (initial + extra) * 20 {
if let Some(node) = Node::pop_when(&ring_b, |cur| *cur != 10_000) {
popped += 1;
popped_b.lock().unwrap().push(node);
} else {
attempts += 1;
yield_now();
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
while let Some(node) = Node::pop_when(&ring_a, |cur| *cur != 0) {
popped_a.lock().unwrap().push(node);
}
while let Some(node) = Node::pop_when(&ring_b, |cur| *cur != 10_000) {
popped_b.lock().unwrap().push(node);
}
let popped_a = popped_a.lock().unwrap();
let popped_b = popped_b.lock().unwrap();
assert!(popped_a.len() >= initial);
assert!(popped_a.len() <= initial + extra);
assert!(popped_b.len() >= initial);
assert!(popped_b.len() <= initial + extra);
let values_a: Vec<_> = popped_a.iter().map(|n| **n).collect();
let values_b: Vec<_> = popped_b.iter().map(|n| **n).collect();
assert!(values_a.iter().all(|v| *v < 10_000));
assert!(values_b.iter().all(|v| *v > 10_000));
assert!(Node::ptr_eq(&Node::load_next_strong(&ring_a), &ring_a));
assert!(Node::ptr_eq(&Node::load_next_strong(&ring_b), &ring_b));
}
#[test]
fn ownership_helpers_only_succeed_when_unique() {
let with_alias = Node::new(7usize);
let alias = with_alias.clone();
assert!(Node::into_inner(with_alias).is_none());
assert_eq!(Node::into_inner(alias), Some(7));
let unique = Node::new(11usize);
assert_eq!(Node::try_unwrap(unique), Ok(11));
let cursor = Cursor::new(Node::new(25usize));
let cursor_clone = cursor.clone();
assert!(Cursor::into_current(cursor).is_none());
assert_eq!(*Cursor::into_current(cursor_clone).unwrap(), 25);
}