cordyceps/
stack.rs

1//! [Intrusive], singly-linked first-in, first-out (FIFO) stacks.
2//!
3//! See the documentation for the [`Stack`] and [`TransferStack`] types for
4//! details.
5//!
6//! [intrusive]: crate#intrusive-data-structures
7#![warn(missing_debug_implementations)]
8
9use crate::{loom::cell::UnsafeCell, Linked};
10use core::{fmt, marker::PhantomPinned, ptr::NonNull};
11
12#[cfg(target_has_atomic = "ptr")]
13pub use has_cas_atomics::*;
14
15/// Items exclusive to targets with CAS atomics
16#[cfg(target_has_atomic = "ptr")]
17mod has_cas_atomics {
18    use core::{
19        fmt,
20        ptr::{self, NonNull},
21    };
22
23    use crate::{
24        loom::sync::atomic::{AtomicPtr, Ordering::*},
25        Linked,
26    };
27
28    use super::{Links, Stack};
29
30    /// An [intrusive] lock-free singly-linked FIFO stack, where all entries
31    /// currently in the stack are consumed in a single atomic operation.
32    ///
33    /// A transfer stack is perhaps the world's simplest lock-free concurrent data
34    /// structure. It provides two primary operations:
35    ///
36    /// - [`TransferStack::push`], which appends an element to the end of the
37    ///   transfer stack,
38    ///
39    /// - [`TransferStack::take_all`], which atomically takes all elements currently
40    ///   on the transfer stack and returns them as a new mutable [`Stack`].
41    ///
42    /// These are both *O*(1) operations, although `push` performs a
43    /// compare-and-swap loop that may be retried if another producer concurrently
44    /// pushed an element.
45    ///
46    /// In order to be part of a `TransferStack`, a type `T` must implement
47    /// the [`Linked`] trait for [`stack::Links<T>`](Links).
48    ///
49    /// Pushing elements into a `TransferStack` takes ownership of those elements
50    /// through an owning [`Handle` type](Linked::Handle). Dropping a
51    /// [`TransferStack`] drops all elements currently linked into the stack.
52    ///
53    /// A transfer stack is often useful in cases where a large number of resources
54    /// must be efficiently transferred from several producers to a consumer, such
55    /// as for reuse or cleanup. For example, a [`TransferStack`] can be used as the
56    /// "thread" (shared) free list in a [`mimalloc`-style sharded
57    /// allocator][mimalloc], with a mutable [`Stack`] used as the local
58    /// (unsynchronized) free list. When an allocation is freed from the same CPU
59    /// core that it was allocated on, it is pushed to the local free list, using an
60    /// unsynchronized mutable [`Stack::push`] operation. If an allocation is freed
61    /// from a different thread, it is instead pushed to that thread's shared free
62    /// list, a [`TransferStack`], using an atomic [`TransferStack::push`]
63    /// operation. New allocations are popped from the local unsynchronized free
64    /// list, and if the local free list is empty, the entire shared free list is
65    /// moved onto the local free list. This allows objects which do not leave the
66    /// CPU core they were allocated on to be both allocated and deallocated using
67    /// unsynchronized operations, and new allocations only perform an atomic
68    /// operation when the local free list is empty.
69    ///
70    /// [intrusive]: crate#intrusive-data-structures
71    /// [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf
72    pub struct TransferStack<T: Linked<Links<T>>> {
73        head: AtomicPtr<T>,
74    }
75
76    // === impl TransferStack ===
77    impl<T> TransferStack<T>
78    where
79        T: Linked<Links<T>>,
80    {
81        /// Returns a new `TransferStack` with no elements.
82        #[cfg(not(loom))]
83        #[must_use]
84        pub const fn new() -> Self {
85            Self {
86                head: AtomicPtr::new(ptr::null_mut()),
87            }
88        }
89
90        /// Returns a new `TransferStack` with no elements.
91        #[cfg(loom)]
92        #[must_use]
93        pub fn new() -> Self {
94            Self {
95                head: AtomicPtr::new(ptr::null_mut()),
96            }
97        }
98
99        /// Pushes `element` onto the end of this `TransferStack`, taking ownership
100        /// of it.
101        ///
102        /// This is an *O*(1) operation, although it performs a compare-and-swap
103        /// loop that may repeat if another producer is concurrently calling `push`
104        /// on the same `TransferStack`.
105        ///
106        /// This takes ownership over `element` through its [owning `Handle`
107        /// type](Linked::Handle). If the `TransferStack` is dropped before the
108        /// pushed `element` is removed from the stack, the `element` will be dropped.
109        #[inline]
110        pub fn push(&self, element: T::Handle) {
111            self.push_was_empty(element);
112        }
113
114        /// Pushes `element` onto the end of this `TransferStack`, taking ownership
115        /// of it. Returns `true` if the stack was previously empty (the previous
116        /// head was null).
117        ///
118        /// This is an *O*(1) operation, although it performs a compare-and-swap
119        /// loop that may repeat if another producer is concurrently calling `push`
120        /// on the same `TransferStack`.
121        ///
122        /// This takes ownership over `element` through its [owning `Handle`
123        /// type](Linked::Handle). If the `TransferStack` is dropped before the
124        /// pushed `element` is removed from the stack, the `element` will be dropped.
125        pub fn push_was_empty(&self, element: T::Handle) -> bool {
126            let ptr = T::into_ptr(element);
127            test_trace!(?ptr, "TransferStack::push");
128            let links = unsafe { T::links(ptr).as_mut() };
129            debug_assert!(links.next.with(|next| unsafe { (*next).is_none() }));
130
131            let mut head = self.head.load(Relaxed);
132            loop {
133                test_trace!(?ptr, ?head, "TransferStack::push");
134                links.next.with_mut(|next| unsafe {
135                    *next = NonNull::new(head);
136                });
137
138                match self
139                    .head
140                    .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire)
141                {
142                    Ok(old) => {
143                        let was_empty = old.is_null();
144                        test_trace!(?ptr, ?head, was_empty, "TransferStack::push -> pushed");
145                        return was_empty;
146                    }
147                    Err(actual) => head = actual,
148                }
149            }
150        }
151
152        /// Takes all elements *currently* in this `TransferStack`, returning a new
153        /// mutable [`Stack`] containing those elements.
154        ///
155        /// This is an *O*(1) operation which does not allocate memory. It will
156        /// never loop and does not spin.
157        #[must_use]
158        pub fn take_all(&self) -> Stack<T> {
159            let head = self.head.swap(ptr::null_mut(), AcqRel);
160            let head = NonNull::new(head);
161            Stack { head }
162        }
163    }
164
165    impl<T> Drop for TransferStack<T>
166    where
167        T: Linked<Links<T>>,
168    {
169        fn drop(&mut self) {
170            // The stack owns any entries that are still in the stack; ensure they
171            // are dropped before dropping the stack.
172            for entry in self.take_all() {
173                drop(entry);
174            }
175        }
176    }
177
178    impl<T> fmt::Debug for TransferStack<T>
179    where
180        T: Linked<Links<T>>,
181    {
182        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183            let Self { head } = self;
184            f.debug_struct("TransferStack").field("head", head).finish()
185        }
186    }
187
188    impl<T> Default for TransferStack<T>
189    where
190        T: Linked<Links<T>>,
191    {
192        fn default() -> Self {
193            Self::new()
194        }
195    }
196}
197
198/// An [intrusive] singly-linked mutable FIFO stack.
199///
200/// This is a very simple implementation of a linked `Stack`, which provides
201/// *O*(1) [`push`](Self::push) and [`pop`](Self::pop) operations. Items are
202/// popped from the stack in the opposite order that they were pushed in.
203///
204/// A [`Stack`] also implements the [`Iterator`] trait, with the
205/// [`Iterator::next`] method popping elements from the end of the stack.
206///
207/// In order to be part of a `Stack`, a type `T` must implement
208/// the [`Linked`] trait for [`stack::Links<T>`](Links).
209///
210/// Pushing elements into a `Stack` takes ownership of those elements
211/// through an owning [`Handle` type](Linked::Handle). Dropping a
212/// `Stack` drops all elements currently linked into the stack.
213///
214/// [intrusive]: crate#intrusive-data-structures
215pub struct Stack<T: Linked<Links<T>>> {
216    pub(crate) head: Option<NonNull<T>>,
217}
218
219/// Singly-linked-list linkage
220///
221/// Links to other nodes in a [`TransferStack`], [`Stack`], or [`SortedList`].
222///
223/// In order to be part of a [`TransferStack`], [`Stack`], or [`SortedList`],
224/// a type must contain an instance of this type, and must implement the
225/// [`Linked`] trait for `Links<Self>`.
226///
227/// [`SortedList`]: crate::SortedList
228//
229// TODO(AJM): In the next breaking change, we might want to specifically have
230// a `SingleLinks` and `DoubleLinks` type to make the relationship more clear,
231// instead of "stack" being singly-flavored and "list" being doubly-flavored
232pub struct Links<T> {
233    /// The next node in the queue.
234    pub(crate) next: UnsafeCell<Option<NonNull<T>>>,
235
236    /// Linked list links must always be `!Unpin`, in order to ensure that they
237    /// never recieve LLVM `noalias` annotations; see also
238    /// <https://github.com/rust-lang/rust/issues/63818>.
239    _unpin: PhantomPinned,
240}
241
242// === impl Stack ===
243
244impl<T> Stack<T>
245where
246    T: Linked<Links<T>>,
247{
248    /// Returns a new `Stack` with no elements in it.
249    #[must_use]
250    pub const fn new() -> Self {
251        Self { head: None }
252    }
253
254    /// Pushes `element` onto the end of this `Stack`, taking ownership
255    /// of it.
256    ///
257    /// This is an *O*(1) operation that does not allocate memory. It will never
258    /// loop.
259    ///
260    /// This takes ownership over `element` through its [owning `Handle`
261    /// type](Linked::Handle). If the `Stack` is dropped before the
262    /// pushed `element` is [`pop`](Self::pop)pped from the stack, the `element`
263    /// will be dropped.
264    pub fn push(&mut self, element: T::Handle) {
265        let ptr = T::into_ptr(element);
266        test_trace!(?ptr, ?self.head, "Stack::push");
267        unsafe {
268            // Safety: we have exclusive mutable access to the stack, and
269            // therefore can also mutate the stack's entries.
270            let links = T::links(ptr).as_mut();
271            links.next.with_mut(|next| {
272                debug_assert!((*next).is_none());
273                *next = self.head.replace(ptr);
274            })
275        }
276    }
277
278    /// Returns the element most recently [push](Self::push)ed to this `Stack`,
279    /// or `None` if the stack is empty.
280    ///
281    /// This is an *O*(1) operation which does not allocate memory. It will
282    /// never loop and does not spin.
283    #[must_use]
284    pub fn pop(&mut self) -> Option<T::Handle> {
285        test_trace!(?self.head, "Stack::pop");
286        let head = self.head.take()?;
287        unsafe {
288            // Safety: we have exclusive ownership over this chunk of stack.
289
290            // advance the head link to the next node after the current one (if
291            // there is one).
292            self.head = T::links(head).as_mut().next.with_mut(|next| (*next).take());
293
294            test_trace!(?self.head, "Stack::pop -> popped");
295
296            // return the current node
297            Some(T::from_ptr(head))
298        }
299    }
300
301    /// Takes all elements *currently* in this `Stack`, returning a new
302    /// mutable `Stack` containing those elements.
303    ///
304    /// This is an *O*(1) operation which does not allocate memory. It will
305    /// never loop and does not spin.
306    #[must_use]
307    pub fn take_all(&mut self) -> Self {
308        Self {
309            head: self.head.take(),
310        }
311    }
312
313    /// Returns `true` if this `Stack` is empty.
314    #[inline]
315    #[must_use]
316    pub fn is_empty(&self) -> bool {
317        self.head.is_none()
318    }
319}
320
321impl<T> Drop for Stack<T>
322where
323    T: Linked<Links<T>>,
324{
325    fn drop(&mut self) {
326        // The stack owns any entries that are still in the stack; ensure they
327        // are dropped before dropping the stack.
328        for entry in self {
329            drop(entry);
330        }
331    }
332}
333
334impl<T> fmt::Debug for Stack<T>
335where
336    T: Linked<Links<T>>,
337{
338    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
339        let Self { head } = self;
340        f.debug_struct("Stack").field("head", head).finish()
341    }
342}
343
344impl<T> Iterator for Stack<T>
345where
346    T: Linked<Links<T>>,
347{
348    type Item = T::Handle;
349
350    fn next(&mut self) -> Option<Self::Item> {
351        self.pop()
352    }
353}
354
355impl<T> Default for Stack<T>
356where
357    T: Linked<Links<T>>,
358{
359    fn default() -> Self {
360        Self::new()
361    }
362}
363
364/// # Safety
365///
366/// A `Stack` is `Send` if `T` is send, because moving it across threads
367/// also implicitly moves any `T`s in the stack.
368unsafe impl<T> Send for Stack<T>
369where
370    T: Send,
371    T: Linked<Links<T>>,
372{
373}
374
375unsafe impl<T> Sync for Stack<T>
376where
377    T: Sync,
378    T: Linked<Links<T>>,
379{
380}
381
382// === impl Links ===
383
384impl<T> Links<T> {
385    /// Returns new [`TransferStack`] links.
386    #[cfg(not(loom))]
387    #[must_use]
388    pub const fn new() -> Self {
389        Self {
390            next: UnsafeCell::new(None),
391            _unpin: PhantomPinned,
392        }
393    }
394
395    /// Returns new [`TransferStack`] links.
396    #[cfg(loom)]
397    #[must_use]
398    pub fn new() -> Self {
399        Self {
400            next: UnsafeCell::new(None),
401            _unpin: PhantomPinned,
402        }
403    }
404}
405
406/// # Safety
407///
408/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may
409/// mutably alias another value, but the links can only be _accessed_ by the
410/// owner of the [`TransferStack`] itself, because the pointers are private. As
411/// long as [`TransferStack`] upholds its own invariants, `Links` should not
412/// make a type `!Send`.
413unsafe impl<T: Send> Send for Links<T> {}
414
415/// # Safety
416///
417/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may
418/// mutably alias another value, but the links can only be _accessed_ by the
419/// owner of the [`TransferStack`] itself, because the pointers are private. As
420/// long as [`TransferStack`] upholds its own invariants, `Links` should not
421/// make a type `!Send`.
422unsafe impl<T: Sync> Sync for Links<T> {}
423
424impl<T> fmt::Debug for Links<T> {
425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426        f.write_str("transfer_stack::Links { ... }")
427    }
428}
429
430impl<T> Default for Links<T> {
431    fn default() -> Self {
432        Self::new()
433    }
434}
435
436#[cfg(test)]
437mod loom {
438    use super::*;
439    use crate::loom::{
440        self,
441        sync::{
442            atomic::{AtomicUsize, Ordering},
443            Arc,
444        },
445        thread,
446    };
447    use test_util::Entry;
448
449    #[test]
450    fn multithreaded_push() {
451        const PUSHES: i32 = 2;
452        loom::model(|| {
453            let stack = Arc::new(TransferStack::new());
454            let threads = Arc::new(AtomicUsize::new(2));
455            let thread1 = thread::spawn({
456                let stack = stack.clone();
457                let threads = threads.clone();
458                move || {
459                    Entry::push_all(&stack, 1, PUSHES);
460                    threads.fetch_sub(1, Ordering::Relaxed);
461                }
462            });
463
464            let thread2 = thread::spawn({
465                let stack = stack.clone();
466                let threads = threads.clone();
467                move || {
468                    Entry::push_all(&stack, 2, PUSHES);
469                    threads.fetch_sub(1, Ordering::Relaxed);
470                }
471            });
472
473            let mut seen = Vec::new();
474
475            loop {
476                seen.extend(stack.take_all().map(|entry| entry.val));
477
478                if threads.load(Ordering::Relaxed) == 0 {
479                    break;
480                }
481
482                thread::yield_now();
483            }
484
485            seen.extend(stack.take_all().map(|entry| entry.val));
486
487            seen.sort();
488            assert_eq!(seen, vec![10, 11, 20, 21]);
489
490            thread1.join().unwrap();
491            thread2.join().unwrap();
492        })
493    }
494
495    #[test]
496    fn multithreaded_pop() {
497        const PUSHES: i32 = 2;
498        loom::model(|| {
499            let stack = Arc::new(TransferStack::new());
500            let thread1 = thread::spawn({
501                let stack = stack.clone();
502                move || Entry::push_all(&stack, 1, PUSHES)
503            });
504
505            let thread2 = thread::spawn({
506                let stack = stack.clone();
507                move || Entry::push_all(&stack, 2, PUSHES)
508            });
509
510            let thread3 = thread::spawn({
511                let stack = stack.clone();
512                move || stack.take_all().map(|entry| entry.val).collect::<Vec<_>>()
513            });
514
515            let seen_thread0 = stack.take_all().map(|entry| entry.val).collect::<Vec<_>>();
516            let seen_thread3 = thread3.join().unwrap();
517
518            thread1.join().unwrap();
519            thread2.join().unwrap();
520
521            let seen_thread0_final = stack.take_all().map(|entry| entry.val).collect::<Vec<_>>();
522
523            let mut all = dbg!(seen_thread0);
524            all.extend(dbg!(seen_thread3));
525            all.extend(dbg!(seen_thread0_final));
526
527            all.sort();
528            assert_eq!(all, vec![10, 11, 20, 21]);
529        })
530    }
531
532    #[test]
533    fn doesnt_leak() {
534        const PUSHES: i32 = 2;
535        loom::model(|| {
536            let stack = Arc::new(TransferStack::new());
537            let thread1 = thread::spawn({
538                let stack = stack.clone();
539                move || Entry::push_all(&stack, 1, PUSHES)
540            });
541
542            let thread2 = thread::spawn({
543                let stack = stack.clone();
544                move || Entry::push_all(&stack, 2, PUSHES)
545            });
546
547            tracing::info!("dropping stack");
548            drop(stack);
549
550            thread1.join().unwrap();
551            thread2.join().unwrap();
552        })
553    }
554
555    #[test]
556    fn take_all_doesnt_leak() {
557        const PUSHES: i32 = 2;
558        loom::model(|| {
559            let stack = Arc::new(TransferStack::new());
560            let thread1 = thread::spawn({
561                let stack = stack.clone();
562                move || Entry::push_all(&stack, 1, PUSHES)
563            });
564
565            let thread2 = thread::spawn({
566                let stack = stack.clone();
567                move || Entry::push_all(&stack, 2, PUSHES)
568            });
569
570            thread1.join().unwrap();
571            thread2.join().unwrap();
572
573            let take_all = stack.take_all();
574
575            tracing::info!("dropping stack");
576            drop(stack);
577
578            tracing::info!("dropping take_all");
579            drop(take_all);
580        })
581    }
582
583    #[test]
584    fn take_all_doesnt_leak_racy() {
585        const PUSHES: i32 = 2;
586        loom::model(|| {
587            let stack = Arc::new(TransferStack::new());
588            let thread1 = thread::spawn({
589                let stack = stack.clone();
590                move || Entry::push_all(&stack, 1, PUSHES)
591            });
592
593            let thread2 = thread::spawn({
594                let stack = stack.clone();
595                move || Entry::push_all(&stack, 2, PUSHES)
596            });
597
598            let take_all = stack.take_all();
599
600            thread1.join().unwrap();
601            thread2.join().unwrap();
602
603            tracing::info!("dropping stack");
604            drop(stack);
605
606            tracing::info!("dropping take_all");
607            drop(take_all);
608        })
609    }
610
611    #[test]
612    fn unsync() {
613        loom::model(|| {
614            let mut stack = Stack::<Entry>::new();
615            stack.push(Entry::new(1));
616            stack.push(Entry::new(2));
617            stack.push(Entry::new(3));
618            let mut take_all = stack.take_all();
619
620            for i in (1..=3).rev() {
621                assert_eq!(take_all.next().unwrap().val, i);
622                stack.push(Entry::new(10 + i));
623            }
624
625            let mut i = 11;
626            for entry in stack.take_all() {
627                assert_eq!(entry.val, i);
628                i += 1;
629            }
630        })
631    }
632
633    #[test]
634    fn unsync_doesnt_leak() {
635        loom::model(|| {
636            let mut stack = Stack::<Entry>::new();
637            stack.push(Entry::new(1));
638            stack.push(Entry::new(2));
639            stack.push(Entry::new(3));
640        })
641    }
642}
643
644#[cfg(test)]
645mod test {
646    use super::{test_util::Entry, *};
647
648    #[test]
649    fn stack_is_send_sync() {
650        crate::util::assert_send_sync::<TransferStack<Entry>>()
651    }
652
653    #[test]
654    fn links_are_send_sync() {
655        crate::util::assert_send_sync::<Links<Entry>>()
656    }
657}
658
659#[cfg(test)]
660pub(crate) mod test_util {
661    use super::*;
662    use crate::loom::alloc;
663    use core::pin::Pin;
664    use core::ptr;
665
666    #[pin_project::pin_project]
667    pub(crate) struct Entry {
668        #[pin]
669        links: Links<Entry>,
670        pub(crate) val: i32,
671        track: alloc::Track<()>,
672    }
673
674    // ----------------------------------------------------------------------
675    // Helper impls for `sorted_list`
676    impl PartialEq for Entry {
677        fn eq(&self, other: &Self) -> bool {
678            self.val.eq(&other.val)
679        }
680    }
681
682    impl Eq for Entry {}
683
684    impl PartialOrd for Entry {
685        fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
686            Some(self.cmp(other))
687        }
688    }
689
690    impl Ord for Entry {
691        fn cmp(&self, other: &Self) -> core::cmp::Ordering {
692            self.val.cmp(&other.val)
693        }
694    }
695    // ----------------------------------------------------------------------
696
697    unsafe impl Linked<Links<Self>> for Entry {
698        type Handle = Pin<Box<Entry>>;
699
700        fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Self> {
701            unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
702        }
703
704        unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
705            // Safety: if this function is only called by the linked list
706            // implementation (and it is not intended for external use), we can
707            // expect that the `NonNull` was constructed from a reference which
708            // was pinned.
709            //
710            // If other callers besides `List`'s internals were to call this on
711            // some random `NonNull<Entry>`, this would not be the case, and
712            // this could be constructing an erroneous `Pin` from a referent
713            // that may not be pinned!
714            Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
715        }
716
717        unsafe fn links(target: NonNull<Self>) -> NonNull<Links<Self>> {
718            let links = ptr::addr_of_mut!((*target.as_ptr()).links);
719            // Safety: it's fine to use `new_unchecked` here; if the pointer that we
720            // offset to the `links` field is not null (which it shouldn't be, as we
721            // received it as a `NonNull`), the offset pointer should therefore also
722            // not be null.
723            NonNull::new_unchecked(links)
724        }
725    }
726
727    impl Entry {
728        pub(crate) fn new(val: i32) -> Pin<Box<Entry>> {
729            Box::pin(Entry {
730                links: Links::new(),
731                val,
732                track: alloc::Track::new(()),
733            })
734        }
735
736        pub(super) fn push_all(stack: &TransferStack<Self>, thread: i32, n: i32) {
737            for i in 0..n {
738                let entry = Self::new((thread * 10) + i);
739                stack.push(entry);
740            }
741        }
742    }
743}