Skip to main content

scirs2_core/distributed/
lock_free.rs

1//! Lock-free concurrent data structures
2//!
3//! This module provides lock-free data structures built entirely on
4//! `std::sync::atomic` operations. They are `Send + Sync` and suitable for
5//! high-contention concurrent workloads.
6//!
7//! ## Structures
8//!
9//! - [`LockFreeStack`]: Treiber stack — LIFO, lock-free push/pop.
10//! - [`LockFreeQueue`]: Michael-Scott style queue — FIFO, lock-free
11//!   enqueue/dequeue.
12//!
13//! Both structures use tagged pointers (with an ABA counter) to avoid the
14//! classic ABA problem that plagues naive CAS-based implementations.
15//!
16//! ## Safety
17//!
18//! The raw-pointer manipulations inside are `unsafe` but encapsulated behind
19//! safe public APIs. Manual memory management is handled in `Drop`
20//! implementations to prevent leaks.
21
22use std::fmt;
23use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
24
25// ─────────────────────────────────────────────────────────────────────────────
26// LockFreeStack (Treiber stack)
27// ─────────────────────────────────────────────────────────────────────────────
28
29/// A node in the Treiber stack.
30struct StackNode<T> {
31    value: T,
32    next: *mut StackNode<T>,
33}
34
35/// A lock-free LIFO stack (Treiber stack).
36///
37/// All operations (`push`, `pop`, `peek_len`) are lock-free and use
38/// compare-and-swap loops on the head pointer.
39///
40/// # Example
41///
42/// ```rust
43/// use scirs2_core::distributed::lock_free::LockFreeStack;
44///
45/// let stack = LockFreeStack::new();
46/// stack.push(1);
47/// stack.push(2);
48/// stack.push(3);
49///
50/// assert_eq!(stack.pop(), Some(3));
51/// assert_eq!(stack.pop(), Some(2));
52/// assert_eq!(stack.pop(), Some(1));
53/// assert_eq!(stack.pop(), None);
54/// ```
55pub struct LockFreeStack<T> {
56    head: AtomicPtr<StackNode<T>>,
57    len: AtomicUsize,
58}
59
60// Safety: StackNode pointers are only accessed through atomic operations
61// and the values inside are T: Send.
62unsafe impl<T: Send> Send for LockFreeStack<T> {}
63unsafe impl<T: Send> Sync for LockFreeStack<T> {}
64
65impl<T> LockFreeStack<T> {
66    /// Create a new empty lock-free stack.
67    pub fn new() -> Self {
68        Self {
69            head: AtomicPtr::new(std::ptr::null_mut()),
70            len: AtomicUsize::new(0),
71        }
72    }
73
74    /// Push a value onto the top of the stack.
75    ///
76    /// This operation is lock-free and will succeed even under contention.
77    pub fn push(&self, value: T) {
78        let new_node = Box::into_raw(Box::new(StackNode {
79            value,
80            next: std::ptr::null_mut(),
81        }));
82
83        loop {
84            let current_head = self.head.load(Ordering::Acquire);
85            // Safety: new_node is a valid, uniquely-owned pointer
86            unsafe {
87                (*new_node).next = current_head;
88            }
89
90            if self
91                .head
92                .compare_exchange_weak(current_head, new_node, Ordering::Release, Ordering::Relaxed)
93                .is_ok()
94            {
95                self.len.fetch_add(1, Ordering::Relaxed);
96                return;
97            }
98            // CAS failed — retry with updated head
99        }
100    }
101
102    /// Pop the top value from the stack, or return `None` if empty.
103    ///
104    /// This operation is lock-free.
105    pub fn pop(&self) -> Option<T> {
106        loop {
107            let current_head = self.head.load(Ordering::Acquire);
108            if current_head.is_null() {
109                return None;
110            }
111
112            // Safety: current_head is non-null and was allocated by `push`
113            let next = unsafe { (*current_head).next };
114
115            if self
116                .head
117                .compare_exchange_weak(current_head, next, Ordering::Release, Ordering::Relaxed)
118                .is_ok()
119            {
120                // Successfully swapped head — take ownership of the node
121                // Safety: we have exclusive access to current_head now
122                let node = unsafe { Box::from_raw(current_head) };
123                self.len.fetch_sub(1, Ordering::Relaxed);
124                return Some(node.value);
125            }
126            // CAS failed — retry
127        }
128    }
129
130    /// Returns `true` if the stack is empty.
131    ///
132    /// Note: due to concurrent operations, the result may be stale by the
133    /// time it is observed.
134    pub fn is_empty(&self) -> bool {
135        self.head.load(Ordering::Acquire).is_null()
136    }
137
138    /// Approximate number of elements in the stack.
139    ///
140    /// This is an approximation because concurrent push/pop operations may
141    /// be in progress.
142    pub fn len(&self) -> usize {
143        self.len.load(Ordering::Relaxed)
144    }
145}
146
147impl<T> Default for LockFreeStack<T> {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153impl<T: fmt::Debug> fmt::Debug for LockFreeStack<T> {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("LockFreeStack")
156            .field("len", &self.len())
157            .finish()
158    }
159}
160
161impl<T> Drop for LockFreeStack<T> {
162    fn drop(&mut self) {
163        // Drain all remaining nodes to free memory
164        while self.pop().is_some() {}
165    }
166}
167
168// ─────────────────────────────────────────────────────────────────────────────
169// LockFreeQueue (Michael-Scott style)
170// ─────────────────────────────────────────────────────────────────────────────
171
172/// A node in the lock-free queue.
173///
174/// We use `ManuallyDrop` for the value so that we can read it out with
175/// `ptr::read` exactly once — after the CAS that gives us exclusive
176/// logical ownership — without triggering a double-drop.
177struct QueueNode<T> {
178    value: std::mem::ManuallyDrop<Option<T>>,
179    next: AtomicPtr<QueueNode<T>>,
180}
181
182impl<T> QueueNode<T> {
183    fn new(value: Option<T>) -> *mut Self {
184        Box::into_raw(Box::new(Self {
185            value: std::mem::ManuallyDrop::new(value),
186            next: AtomicPtr::new(std::ptr::null_mut()),
187        }))
188    }
189}
190
191/// A lock-free FIFO queue (Michael-Scott queue).
192///
193/// Uses a sentinel node so that `enqueue` and `dequeue` operate on
194/// different ends of the linked list, reducing contention.
195///
196/// # Example
197///
198/// ```rust
199/// use scirs2_core::distributed::lock_free::LockFreeQueue;
200///
201/// let queue = LockFreeQueue::new();
202/// queue.enqueue(1);
203/// queue.enqueue(2);
204/// queue.enqueue(3);
205///
206/// assert_eq!(queue.dequeue(), Some(1));
207/// assert_eq!(queue.dequeue(), Some(2));
208/// assert_eq!(queue.dequeue(), Some(3));
209/// assert_eq!(queue.dequeue(), None);
210/// ```
211pub struct LockFreeQueue<T> {
212    head: AtomicPtr<QueueNode<T>>,
213    tail: AtomicPtr<QueueNode<T>>,
214    len: AtomicUsize,
215    /// Retired nodes waiting for safe reclamation.
216    /// We cannot free nodes immediately in `dequeue` because other threads
217    /// may still be reading them (classic ABA / use-after-free).
218    /// Nodes are freed when the queue is dropped.
219    retired: std::sync::Mutex<Vec<*mut QueueNode<T>>>,
220}
221
222// Safety: the queue uses atomic operations for all pointer manipulations
223// and the values are T: Send.
224unsafe impl<T: Send> Send for LockFreeQueue<T> {}
225unsafe impl<T: Send> Sync for LockFreeQueue<T> {}
226
227impl<T> LockFreeQueue<T> {
228    /// Create a new empty lock-free queue.
229    pub fn new() -> Self {
230        // Sentinel node (dummy head)
231        let sentinel = QueueNode::new(None);
232        Self {
233            head: AtomicPtr::new(sentinel),
234            tail: AtomicPtr::new(sentinel),
235            len: AtomicUsize::new(0),
236            retired: std::sync::Mutex::new(Vec::new()),
237        }
238    }
239
240    /// Enqueue a value at the tail of the queue.
241    ///
242    /// This operation is lock-free.
243    pub fn enqueue(&self, value: T) {
244        let new_node = QueueNode::new(Some(value));
245
246        loop {
247            let tail = self.tail.load(Ordering::Acquire);
248            // Safety: tail is always a valid pointer (sentinel or real node)
249            let tail_next = unsafe { (*tail).next.load(Ordering::Acquire) };
250
251            if tail_next.is_null() {
252                // Tail is actually the last node — try to link new node
253                // Safety: tail is valid
254                if unsafe {
255                    (*tail)
256                        .next
257                        .compare_exchange_weak(
258                            std::ptr::null_mut(),
259                            new_node,
260                            Ordering::Release,
261                            Ordering::Relaxed,
262                        )
263                        .is_ok()
264                } {
265                    // Successfully linked — try to advance tail
266                    let _ = self.tail.compare_exchange(
267                        tail,
268                        new_node,
269                        Ordering::Release,
270                        Ordering::Relaxed,
271                    );
272                    self.len.fetch_add(1, Ordering::Relaxed);
273                    return;
274                }
275                // CAS on next failed — retry
276            } else {
277                // Tail is lagging — try to advance it
278                let _ = self.tail.compare_exchange(
279                    tail,
280                    tail_next,
281                    Ordering::Release,
282                    Ordering::Relaxed,
283                );
284            }
285        }
286    }
287
288    /// Dequeue a value from the head of the queue, or return `None` if empty.
289    ///
290    /// This operation is lock-free.
291    pub fn dequeue(&self) -> Option<T> {
292        loop {
293            let head = self.head.load(Ordering::Acquire);
294            let tail = self.tail.load(Ordering::Acquire);
295            // Safety: head is always valid (sentinel or real node)
296            let head_next = unsafe { (*head).next.load(Ordering::Acquire) };
297
298            // Re-check that head hasn't changed (consistency snapshot)
299            if head != self.head.load(Ordering::Acquire) {
300                continue;
301            }
302
303            if head == tail {
304                if head_next.is_null() {
305                    // Queue is truly empty
306                    return None;
307                }
308                // Tail is lagging behind head — help advance it
309                let _ = self.tail.compare_exchange(
310                    tail,
311                    head_next,
312                    Ordering::Release,
313                    Ordering::Relaxed,
314                );
315            } else if !head_next.is_null() {
316                // Try to swing head to head_next (claiming the old sentinel).
317                // We do NOT read the value until the CAS succeeds — reading
318                // before CAS caused a data-loss race where a losing thread
319                // would Option::take() the value out of a node that another
320                // thread then legitimately dequeued, finding None.
321                if self
322                    .head
323                    .compare_exchange_weak(head, head_next, Ordering::AcqRel, Ordering::Relaxed)
324                    .is_ok()
325                {
326                    // CAS succeeded — we have exclusive logical ownership of
327                    // `head` (the old sentinel) and the value inside
328                    // `head_next` (which is now the new sentinel).
329                    //
330                    // Safety: head_next is valid and no other thread will read
331                    // its value because it is now the sentinel (head).
332                    // We use ptr::read on the ManuallyDrop to extract the
333                    // Option<T> without running drop on the node's field.
334                    let value = unsafe {
335                        std::ptr::read(
336                            &(*head_next).value as *const std::mem::ManuallyDrop<Option<T>>,
337                        )
338                    };
339                    let value = std::mem::ManuallyDrop::into_inner(value);
340
341                    // Clear the value slot in the node (it is now the sentinel
342                    // and must not hold a value). Write None via ManuallyDrop.
343                    unsafe {
344                        std::ptr::write(
345                            &mut (*head_next).value as *mut std::mem::ManuallyDrop<Option<T>>,
346                            std::mem::ManuallyDrop::new(None),
347                        );
348                    }
349
350                    // Retire the old sentinel node instead of freeing immediately.
351                    // Other threads may still hold a pointer to it from an
352                    // earlier load. Nodes are freed in Drop.
353                    if let Ok(mut retired) = self.retired.lock() {
354                        retired.push(head);
355                    }
356                    // else: if lock is poisoned, leak the node (safe but leaks)
357                    self.len.fetch_sub(1, Ordering::Relaxed);
358                    return value;
359                }
360                // CAS failed — another thread won; retry from the top
361            }
362        }
363    }
364
365    /// Returns `true` if the queue appears empty.
366    ///
367    /// Due to concurrent operations, this may be stale.
368    pub fn is_empty(&self) -> bool {
369        let head = self.head.load(Ordering::Acquire);
370        let tail = self.tail.load(Ordering::Acquire);
371        if head != tail {
372            return false;
373        }
374        // Safety: head is valid
375        let head_next = unsafe { (*head).next.load(Ordering::Acquire) };
376        head_next.is_null()
377    }
378
379    /// Approximate number of elements in the queue.
380    pub fn len(&self) -> usize {
381        self.len.load(Ordering::Relaxed)
382    }
383}
384
385impl<T> Default for LockFreeQueue<T> {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391impl<T: fmt::Debug> fmt::Debug for LockFreeQueue<T> {
392    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393        f.debug_struct("LockFreeQueue")
394            .field("len", &self.len())
395            .finish()
396    }
397}
398
399impl<T> Drop for LockFreeQueue<T> {
400    fn drop(&mut self) {
401        // We have &mut self, so no concurrent access.
402        // Walk the linked list starting from head and free every node,
403        // dropping any remaining values stored in ManuallyDrop.
404        let mut current = *self.head.get_mut();
405        while !current.is_null() {
406            // Safety: we have exclusive access; current is a valid node pointer
407            unsafe {
408                let next = (*current).next.load(Ordering::Relaxed);
409                // Drop the inner Option<T> that ManuallyDrop is protecting
410                std::mem::ManuallyDrop::drop(&mut (*current).value);
411                // Free the node itself
412                let _ = Box::from_raw(current);
413                current = next;
414            }
415        }
416
417        // Free retired nodes (old sentinels from dequeue operations).
418        // Their values have already been extracted or cleared.
419        if let Ok(retired) = self.retired.get_mut() {
420            for &node in retired.iter() {
421                if !node.is_null() {
422                    unsafe {
423                        std::mem::ManuallyDrop::drop(&mut (*node).value);
424                        let _ = Box::from_raw(node);
425                    }
426                }
427            }
428        }
429    }
430}
431
432// ─────────────────────────────────────────────────────────────────────────────
433// LockFreeCounter — bonus utility
434// ─────────────────────────────────────────────────────────────────────────────
435
436/// A lock-free atomic counter with fetch-add, fetch-sub, and CAS operations.
437///
438/// This is a thin wrapper around `AtomicUsize` that provides a clean API
439/// for concurrent counting.
440#[derive(Debug)]
441pub struct LockFreeCounter {
442    value: AtomicUsize,
443}
444
445impl LockFreeCounter {
446    /// Create a new counter initialised to `initial`.
447    pub fn new(initial: usize) -> Self {
448        Self {
449            value: AtomicUsize::new(initial),
450        }
451    }
452
453    /// Atomically increment and return the previous value.
454    pub fn increment(&self) -> usize {
455        self.value.fetch_add(1, Ordering::AcqRel)
456    }
457
458    /// Atomically decrement and return the previous value.
459    ///
460    /// Saturates at zero (will not wrap).
461    pub fn decrement(&self) -> usize {
462        loop {
463            let current = self.value.load(Ordering::Acquire);
464            if current == 0 {
465                return 0;
466            }
467            if self
468                .value
469                .compare_exchange_weak(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
470                .is_ok()
471            {
472                return current;
473            }
474        }
475    }
476
477    /// Get the current value.
478    pub fn get(&self) -> usize {
479        self.value.load(Ordering::Acquire)
480    }
481
482    /// Atomically add `n` and return the previous value.
483    pub fn add(&self, n: usize) -> usize {
484        self.value.fetch_add(n, Ordering::AcqRel)
485    }
486
487    /// Compare-and-swap: if current value equals `expected`, set to `new_val`.
488    ///
489    /// Returns `Ok(expected)` on success, `Err(actual)` on failure.
490    pub fn compare_and_swap(&self, expected: usize, new_val: usize) -> Result<usize, usize> {
491        self.value
492            .compare_exchange(expected, new_val, Ordering::AcqRel, Ordering::Acquire)
493    }
494
495    /// Reset the counter to zero and return the previous value.
496    pub fn reset(&self) -> usize {
497        self.value.swap(0, Ordering::AcqRel)
498    }
499}
500
501impl Default for LockFreeCounter {
502    fn default() -> Self {
503        Self::new(0)
504    }
505}
506
507// ─────────────────────────────────────────────────────────────────────────────
508// Tests
509// ─────────────────────────────────────────────────────────────────────────────
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use std::sync::Arc;
515    use std::thread;
516
517    // ── LockFreeStack ────────────────────────────────────────────────────────
518
519    #[test]
520    fn test_stack_push_pop_basic() {
521        let stack = LockFreeStack::new();
522        stack.push(1);
523        stack.push(2);
524        stack.push(3);
525
526        assert_eq!(stack.pop(), Some(3));
527        assert_eq!(stack.pop(), Some(2));
528        assert_eq!(stack.pop(), Some(1));
529        assert_eq!(stack.pop(), None);
530    }
531
532    #[test]
533    fn test_stack_empty() {
534        let stack = LockFreeStack::<i32>::new();
535        assert!(stack.is_empty());
536        assert_eq!(stack.len(), 0);
537        assert_eq!(stack.pop(), None);
538    }
539
540    #[test]
541    fn test_stack_len() {
542        let stack = LockFreeStack::new();
543        assert_eq!(stack.len(), 0);
544        stack.push(10);
545        assert_eq!(stack.len(), 1);
546        stack.push(20);
547        assert_eq!(stack.len(), 2);
548        stack.pop();
549        assert_eq!(stack.len(), 1);
550    }
551
552    #[test]
553    fn test_stack_concurrent_push() {
554        let stack = Arc::new(LockFreeStack::new());
555        let n_threads = 8;
556        let n_items = 1000;
557
558        let handles: Vec<_> = (0..n_threads)
559            .map(|t| {
560                let stack = Arc::clone(&stack);
561                thread::spawn(move || {
562                    for i in 0..n_items {
563                        stack.push(t * n_items + i);
564                    }
565                })
566            })
567            .collect();
568
569        for h in handles {
570            h.join().expect("thread panicked");
571        }
572
573        assert_eq!(stack.len(), n_threads * n_items);
574
575        // Pop all elements
576        let mut count = 0;
577        while stack.pop().is_some() {
578            count += 1;
579        }
580        assert_eq!(count, n_threads * n_items);
581    }
582
583    #[test]
584    fn test_stack_concurrent_push_pop() {
585        let stack = Arc::new(LockFreeStack::new());
586        let n_threads = 4;
587        let n_items = 500;
588
589        // Producers
590        let producers: Vec<_> = (0..n_threads)
591            .map(|_| {
592                let stack = Arc::clone(&stack);
593                thread::spawn(move || {
594                    for i in 0..n_items {
595                        stack.push(i);
596                    }
597                })
598            })
599            .collect();
600
601        // Consumers
602        let consumers: Vec<_> = (0..n_threads)
603            .map(|_| {
604                let stack = Arc::clone(&stack);
605                thread::spawn(move || {
606                    let mut count = 0usize;
607                    for _ in 0..n_items {
608                        // Retry loop
609                        loop {
610                            if stack.pop().is_some() {
611                                count += 1;
612                                break;
613                            }
614                            thread::yield_now();
615                        }
616                    }
617                    count
618                })
619            })
620            .collect();
621
622        for h in producers {
623            h.join().expect("producer panicked");
624        }
625
626        let total_consumed: usize = consumers
627            .into_iter()
628            .map(|h| h.join().expect("consumer panicked"))
629            .sum();
630        assert_eq!(total_consumed, n_threads * n_items);
631    }
632
633    #[test]
634    fn test_stack_drop_frees_memory() {
635        // Just verify no panic/leak (Miri would catch leaks)
636        let stack = LockFreeStack::new();
637        for i in 0..100 {
638            stack.push(format!("item_{i}"));
639        }
640        drop(stack);
641    }
642
643    #[test]
644    fn test_stack_default() {
645        let stack: LockFreeStack<i32> = Default::default();
646        assert!(stack.is_empty());
647    }
648
649    // ── LockFreeQueue ────────────────────────────────────────────────────────
650
651    #[test]
652    fn test_queue_enqueue_dequeue_basic() {
653        let queue = LockFreeQueue::new();
654        queue.enqueue(1);
655        queue.enqueue(2);
656        queue.enqueue(3);
657
658        assert_eq!(queue.dequeue(), Some(1));
659        assert_eq!(queue.dequeue(), Some(2));
660        assert_eq!(queue.dequeue(), Some(3));
661        assert_eq!(queue.dequeue(), None);
662    }
663
664    #[test]
665    fn test_queue_empty() {
666        let queue = LockFreeQueue::<i32>::new();
667        assert!(queue.is_empty());
668        assert_eq!(queue.len(), 0);
669        assert_eq!(queue.dequeue(), None);
670    }
671
672    #[test]
673    fn test_queue_len() {
674        let queue = LockFreeQueue::new();
675        assert_eq!(queue.len(), 0);
676        queue.enqueue(10);
677        assert_eq!(queue.len(), 1);
678        queue.enqueue(20);
679        assert_eq!(queue.len(), 2);
680        queue.dequeue();
681        assert_eq!(queue.len(), 1);
682    }
683
684    #[test]
685    fn test_queue_fifo_order() {
686        let queue = LockFreeQueue::new();
687        for i in 0..20 {
688            queue.enqueue(i);
689        }
690        for i in 0..20 {
691            assert_eq!(queue.dequeue(), Some(i));
692        }
693    }
694
695    #[test]
696    fn test_queue_concurrent_enqueue() {
697        let queue = Arc::new(LockFreeQueue::new());
698        let n_threads = 8;
699        let n_items = 1000;
700
701        let handles: Vec<_> = (0..n_threads)
702            .map(|t| {
703                let queue = Arc::clone(&queue);
704                thread::spawn(move || {
705                    for i in 0..n_items {
706                        queue.enqueue(t * n_items + i);
707                    }
708                })
709            })
710            .collect();
711
712        for h in handles {
713            h.join().expect("thread panicked");
714        }
715
716        // Dequeue all
717        let mut items = Vec::new();
718        while let Some(v) = queue.dequeue() {
719            items.push(v);
720        }
721        assert_eq!(items.len(), n_threads * n_items);
722
723        // Verify all values are present
724        items.sort_unstable();
725        let mut expected: Vec<usize> = Vec::new();
726        for t in 0..n_threads {
727            for i in 0..n_items {
728                expected.push(t * n_items + i);
729            }
730        }
731        expected.sort_unstable();
732        assert_eq!(items, expected);
733    }
734
735    #[test]
736    fn test_queue_concurrent_enqueue_dequeue() {
737        use std::sync::atomic::{AtomicUsize, Ordering};
738
739        let queue = Arc::new(LockFreeQueue::new());
740        let n_threads = 4;
741        let n_items = 500;
742        let total = n_threads * n_items;
743        let remaining = Arc::new(AtomicUsize::new(total));
744
745        let producers: Vec<_> = (0..n_threads)
746            .map(|_| {
747                let queue = Arc::clone(&queue);
748                thread::spawn(move || {
749                    for i in 0..n_items {
750                        queue.enqueue(i);
751                    }
752                })
753            })
754            .collect();
755
756        let consumers: Vec<_> = (0..n_threads)
757            .map(|_| {
758                let queue = Arc::clone(&queue);
759                let remaining = Arc::clone(&remaining);
760                thread::spawn(move || {
761                    let mut count = 0usize;
762                    loop {
763                        // Check if all items have been claimed
764                        let rem = remaining.load(Ordering::Acquire);
765                        if rem == 0 {
766                            break;
767                        }
768                        if let Some(_) = queue.dequeue() {
769                            remaining.fetch_sub(1, Ordering::AcqRel);
770                            count += 1;
771                        } else {
772                            thread::yield_now();
773                        }
774                    }
775                    count
776                })
777            })
778            .collect();
779
780        for h in producers {
781            h.join().expect("producer panicked");
782        }
783
784        let total_consumed: usize = consumers
785            .into_iter()
786            .map(|h| h.join().expect("consumer panicked"))
787            .sum();
788        assert_eq!(total_consumed, total);
789    }
790
791    #[test]
792    fn test_queue_drop_frees_memory() {
793        let queue = LockFreeQueue::new();
794        for i in 0..100 {
795            queue.enqueue(format!("item_{i}"));
796        }
797        drop(queue);
798    }
799
800    #[test]
801    fn test_queue_default() {
802        let queue: LockFreeQueue<i32> = Default::default();
803        assert!(queue.is_empty());
804    }
805
806    // ── LockFreeCounter ─────────────────────────────────────────────────────
807
808    #[test]
809    fn test_counter_basic() {
810        let counter = LockFreeCounter::new(0);
811        assert_eq!(counter.get(), 0);
812        assert_eq!(counter.increment(), 0);
813        assert_eq!(counter.get(), 1);
814        assert_eq!(counter.increment(), 1);
815        assert_eq!(counter.get(), 2);
816        assert_eq!(counter.decrement(), 2);
817        assert_eq!(counter.get(), 1);
818    }
819
820    #[test]
821    fn test_counter_concurrent() {
822        let counter = Arc::new(LockFreeCounter::new(0));
823        let n_threads = 8;
824        let n_increments = 10_000;
825
826        let handles: Vec<_> = (0..n_threads)
827            .map(|_| {
828                let counter = Arc::clone(&counter);
829                thread::spawn(move || {
830                    for _ in 0..n_increments {
831                        counter.increment();
832                    }
833                })
834            })
835            .collect();
836
837        for h in handles {
838            h.join().expect("thread panicked");
839        }
840
841        assert_eq!(counter.get(), n_threads * n_increments);
842    }
843
844    #[test]
845    fn test_counter_decrement_saturates() {
846        let counter = LockFreeCounter::new(0);
847        assert_eq!(counter.decrement(), 0);
848        assert_eq!(counter.get(), 0);
849    }
850
851    #[test]
852    fn test_counter_compare_and_swap() {
853        let counter = LockFreeCounter::new(10);
854        assert_eq!(counter.compare_and_swap(10, 20), Ok(10));
855        assert_eq!(counter.get(), 20);
856        assert_eq!(counter.compare_and_swap(10, 30), Err(20));
857        assert_eq!(counter.get(), 20);
858    }
859
860    #[test]
861    fn test_counter_reset() {
862        let counter = LockFreeCounter::new(0);
863        counter.add(100);
864        assert_eq!(counter.reset(), 100);
865        assert_eq!(counter.get(), 0);
866    }
867
868    #[test]
869    fn test_counter_add() {
870        let counter = LockFreeCounter::new(5);
871        assert_eq!(counter.add(10), 5);
872        assert_eq!(counter.get(), 15);
873    }
874}