Skip to main content

atomic_lifo/
lib.rs

1//! # jni-simple
2//! # atomic lifo
3//! Lock free thread-safe lifo for rust.
4//!
5//! ## Example
6//! ```rust
7//! use std::thread;
8//! use atomic_lifo::AtomicLifo;
9//!
10//! static MT_LIFO: AtomicLifo<u32> = AtomicLifo::new();
11//!
12//! pub fn example() {
13//!     MT_LIFO.push(456);
14//!     MT_LIFO.push(123);
15//!     let th = {
16//!         thread::spawn(move || {
17//!             assert_eq!(MT_LIFO.pop(), Some(123));
18//!             assert_eq!(MT_LIFO.pop(), Some(456));
19//!             assert_eq!(MT_LIFO.pop(), None);
20//!         })
21//!     };
22//!
23//!     th.join().unwrap();
24//! }
25#![no_std]
26#![deny(clippy::correctness)]
27#![deny(
28    clippy::perf,
29    clippy::complexity,
30    clippy::style,
31    clippy::nursery,
32    clippy::pedantic,
33    clippy::clone_on_ref_ptr,
34    clippy::decimal_literal_representation,
35    clippy::float_cmp_const,
36    clippy::missing_docs_in_private_items,
37    clippy::multiple_inherent_impl,
38    clippy::unwrap_used,
39    clippy::cargo_common_metadata,
40    clippy::used_underscore_binding
41)]
42extern crate alloc;
43
44use alloc::boxed::Box;
45use core::ptr::null_mut;
46use core::sync::atomic::Ordering::SeqCst;
47use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
48use defer_heavy::defer;
49
50/// Thread Safe LIFO Stack/Single linked list.
51#[derive(Debug, Default)]
52pub struct AtomicLifo<T: Sync + Send + 'static> {
53    /// amount of concurrent ongoing calls to pop.
54    concurrent_pop_count: AtomicUsize,
55    /// current generation of hazard nodes
56    hazard_generation: AtomicUsize,
57    /// threshold counter to catch an edge case when generation never increments to force it to increment and the hazard list to be freed.
58    hazard_threshold: AtomicUsize,
59    /// provides mutual exclusion to free some elements in the hazard list.
60    hazard_lock: AtomicBool,
61    /// the head of the hazard list
62    hazard_head: AtomicPtr<HazardNode<T>>,
63    /// the head of the queue
64    head: AtomicPtr<Node<T>>,
65}
66
67impl<T: Sync + Send + 'static> Drop for AtomicLifo<T> {
68    fn drop(&mut self) {
69        unsafe {
70            let mut current_free = self.head.load(SeqCst);
71            loop {
72                if current_free.is_null() {
73                    break;
74                }
75
76                let node = Box::from_raw(current_free);
77                current_free = node.next;
78                _ = Box::from_raw(node.value);
79            }
80
81            let hazard_head = self.hazard_head.load(SeqCst);
82            if !hazard_head.is_null() {
83                _ = Box::from_raw(hazard_head);
84            }
85        }
86    }
87}
88
89/// Node that contains normal nodes that should be freed later.
90#[derive(Debug)]
91struct HazardNode<T: Sync + Send + 'static> {
92    /// the generation of this hazard node
93    generation: usize,
94    /// the node we want to free later
95    node: *mut Node<T>,
96    /// next hazard node
97    next: *mut HazardNode<T>,
98}
99
100impl<T: Sync + Send + 'static> Drop for HazardNode<T> {
101    fn drop(&mut self) {
102        if !self.node.is_null() {
103            unsafe {
104                _ = Box::from_raw(self.node);
105            }
106        }
107
108        let mut cur_free = self.next;
109        while !cur_free.is_null() {
110            unsafe {
111                let mut cur_free_unbox = Box::from_raw(cur_free);
112                cur_free = cur_free_unbox.next;
113                cur_free_unbox.next = null_mut();
114            }
115        }
116    }
117}
118
119/// Lifo node
120#[derive(Debug)]
121struct Node<T: Sync + Send + 'static> {
122    /// the next node
123    next: *mut Node<T>,
124    /// the value pointer
125    value: *mut T,
126}
127
128impl<T: Sync + Send + 'static> AtomicLifo<T> {
129    /// Constructs a new empty `AtomicLifo`
130    #[must_use]
131    pub const fn new() -> Self {
132        Self {
133            concurrent_pop_count: AtomicUsize::new(0),
134            hazard_generation: AtomicUsize::new(0),
135            hazard_threshold: AtomicUsize::new(0),
136            hazard_lock: AtomicBool::new(false),
137            hazard_head: AtomicPtr::new(null_mut()),
138            head: AtomicPtr::new(null_mut()),
139        }
140    }
141
142    /// Free the hazard list if possible.
143    unsafe fn free_hazard_list(&self, count: usize) {
144        /// To handle overflow we only consider elements to be of an old generation
145        /// If the abs diff to the current generation is less than half the possible values.
146        const MAX_DIFF: usize = usize::MAX / 2;
147
148        if self.hazard_lock.swap(true, SeqCst) {
149            return;
150        }
151
152        defer! {
153            self.hazard_lock.store(false, SeqCst);
154        }
155
156        self.hazard_threshold.store(0, SeqCst);
157
158        //The hazard head may be in flux and I don't bother trying to free it here.
159        //The drop of the entire thing will free it.
160        let mut cur_ptr = self.hazard_head.load(SeqCst);
161
162        //This is unlikely to iterate too many elements as we call this fn right after we increment count.
163        //Meaning the only elements that we have to skip are the ones that other threads pop in the meantime!
164        while let Some(cur) = cur_ptr.as_mut() {
165            let next_ptr = cur.next;
166            let Some(next) = next_ptr.as_ref() else {
167                return;
168            };
169
170            //Second check prevents funny overflow things.
171            if next.generation < count && next.generation.abs_diff(count) <= MAX_DIFF {
172                cur.next = null_mut();
173                _ = Box::from_raw(next_ptr);
174                return;
175            }
176
177            cur_ptr = next_ptr;
178        }
179    }
180
181    pub fn push(&self, value: T) {
182        let node = Box::into_raw(Box::new(Node {
183            value: Box::into_raw(Box::new(value)),
184            next: self.head.load(SeqCst),
185        }));
186
187        let node_ref = unsafe { node.as_mut().unwrap_unchecked() };
188
189        loop {
190            if self
191                .head
192                .compare_exchange(node_ref.next, node, SeqCst, SeqCst)
193                .is_err()
194            {
195                node_ref.next = self.head.load(SeqCst);
196                continue;
197            }
198
199            return;
200        }
201    }
202
203    ///
204    /// Pops the top of the lifo stack
205    ///
206    /// # Panics
207    /// if more than `usize::MAX` concurrent calls in different threads to this fn are made.
208    ///
209    pub fn pop(&self) -> Option<T> {
210        while self.hazard_threshold.load(SeqCst) > 500_000 {
211            //This is an edge case where we have an absurd amount of threads spinning
212            //on pop and actually succeed in removing elements.
213            //This will make acc_count never reach 0 all while the hazard list grows without it ever being freed.
214            //To break this we just spin here until the acc_count reaches 0 and the hazard free is invoked by some thread currently still in pop.
215            core::hint::spin_loop();
216        }
217
218        assert_ne!(
219            self.concurrent_pop_count.fetch_add(1, SeqCst),
220            usize::MAX,
221            "Too many threads calling pop concurrently"
222        );
223
224        defer! {
225            let sub = self.concurrent_pop_count.fetch_sub(1, SeqCst);
226            debug_assert_ne!(sub, 0, "AtomicLifo::poll UNDERFLOW");
227            if sub != 1 {
228                return;
229            }
230
231            let haz_cnt = self.hazard_generation.fetch_add(1, SeqCst);
232            unsafe {
233                self.free_hazard_list(haz_cnt);
234            }
235        }
236
237        let removed = loop {
238            let head = self.head.load(SeqCst);
239            let next = unsafe { head.as_ref()?.next };
240
241            if self
242                .head
243                .compare_exchange(head, next, SeqCst, SeqCst)
244                .is_err()
245            {
246                continue;
247            }
248
249            break head;
250        };
251
252        //Safe, removed must be non-null and we "own" it here for a very short time. Other thread may be currently looking at the next pointer only.
253        let removed_obj = unsafe { Box::from_raw(removed.as_ref().unwrap_unchecked().value) };
254
255        let count = self.hazard_generation.load(SeqCst);
256
257        let hazard_node = Box::into_raw(Box::new(HazardNode {
258            generation: count,
259            node: removed,
260            next: self.hazard_head.load(SeqCst),
261        }));
262
263        loop {
264            let node_ref = unsafe { hazard_node.as_mut().unwrap_unchecked() };
265
266            if self
267                .hazard_head
268                .compare_exchange(node_ref.next, hazard_node, SeqCst, SeqCst)
269                .is_err()
270            {
271                node_ref.next = self.hazard_head.load(SeqCst);
272                continue;
273            }
274
275            break;
276        }
277
278        self.hazard_threshold.fetch_add(1, SeqCst);
279
280        Some(*removed_obj)
281    }
282}