cordyceps/stack.rs
1//! [Intrusive], singly-linked first-in, first-out (FIFO) stacks.
2//!
3//! See the documentation for the [`Stack`] and [`TransferStack`] types for
4//! details.
5//!
6//! [intrusive]: crate#intrusive-data-structures
7#![warn(missing_debug_implementations)]
8
9use crate::{loom::cell::UnsafeCell, Linked};
10use core::{fmt, marker::PhantomPinned, ptr::NonNull};
11
12#[cfg(target_has_atomic = "ptr")]
13pub use has_cas_atomics::*;
14
15/// Items exclusive to targets with CAS atomics
16#[cfg(target_has_atomic = "ptr")]
17mod has_cas_atomics {
18 use core::{
19 fmt,
20 ptr::{self, NonNull},
21 };
22
23 use crate::{
24 loom::sync::atomic::{AtomicPtr, Ordering::*},
25 Linked,
26 };
27
28 use super::{Links, Stack};
29
30 /// An [intrusive] lock-free singly-linked FIFO stack, where all entries
31 /// currently in the stack are consumed in a single atomic operation.
32 ///
33 /// A transfer stack is perhaps the world's simplest lock-free concurrent data
34 /// structure. It provides two primary operations:
35 ///
36 /// - [`TransferStack::push`], which appends an element to the end of the
37 /// transfer stack,
38 ///
39 /// - [`TransferStack::take_all`], which atomically takes all elements currently
40 /// on the transfer stack and returns them as a new mutable [`Stack`].
41 ///
42 /// These are both *O*(1) operations, although `push` performs a
43 /// compare-and-swap loop that may be retried if another producer concurrently
44 /// pushed an element.
45 ///
46 /// In order to be part of a `TransferStack`, a type `T` must implement
47 /// the [`Linked`] trait for [`stack::Links<T>`](Links).
48 ///
49 /// Pushing elements into a `TransferStack` takes ownership of those elements
50 /// through an owning [`Handle` type](Linked::Handle). Dropping a
51 /// [`TransferStack`] drops all elements currently linked into the stack.
52 ///
53 /// A transfer stack is often useful in cases where a large number of resources
54 /// must be efficiently transferred from several producers to a consumer, such
55 /// as for reuse or cleanup. For example, a [`TransferStack`] can be used as the
56 /// "thread" (shared) free list in a [`mimalloc`-style sharded
57 /// allocator][mimalloc], with a mutable [`Stack`] used as the local
58 /// (unsynchronized) free list. When an allocation is freed from the same CPU
59 /// core that it was allocated on, it is pushed to the local free list, using an
60 /// unsynchronized mutable [`Stack::push`] operation. If an allocation is freed
61 /// from a different thread, it is instead pushed to that thread's shared free
62 /// list, a [`TransferStack`], using an atomic [`TransferStack::push`]
63 /// operation. New allocations are popped from the local unsynchronized free
64 /// list, and if the local free list is empty, the entire shared free list is
65 /// moved onto the local free list. This allows objects which do not leave the
66 /// CPU core they were allocated on to be both allocated and deallocated using
67 /// unsynchronized operations, and new allocations only perform an atomic
68 /// operation when the local free list is empty.
69 ///
70 /// [intrusive]: crate#intrusive-data-structures
71 /// [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf
72 pub struct TransferStack<T: Linked<Links<T>>> {
73 head: AtomicPtr<T>,
74 }
75
76 // === impl TransferStack ===
77 impl<T> TransferStack<T>
78 where
79 T: Linked<Links<T>>,
80 {
81 /// Returns a new `TransferStack` with no elements.
82 #[cfg(not(loom))]
83 #[must_use]
84 pub const fn new() -> Self {
85 Self {
86 head: AtomicPtr::new(ptr::null_mut()),
87 }
88 }
89
90 /// Returns a new `TransferStack` with no elements.
91 #[cfg(loom)]
92 #[must_use]
93 pub fn new() -> Self {
94 Self {
95 head: AtomicPtr::new(ptr::null_mut()),
96 }
97 }
98
99 /// Pushes `element` onto the end of this `TransferStack`, taking ownership
100 /// of it.
101 ///
102 /// This is an *O*(1) operation, although it performs a compare-and-swap
103 /// loop that may repeat if another producer is concurrently calling `push`
104 /// on the same `TransferStack`.
105 ///
106 /// This takes ownership over `element` through its [owning `Handle`
107 /// type](Linked::Handle). If the `TransferStack` is dropped before the
108 /// pushed `element` is removed from the stack, the `element` will be dropped.
109 #[inline]
110 pub fn push(&self, element: T::Handle) {
111 self.push_was_empty(element);
112 }
113
114 /// Pushes `element` onto the end of this `TransferStack`, taking ownership
115 /// of it. Returns `true` if the stack was previously empty (the previous
116 /// head was null).
117 ///
118 /// This is an *O*(1) operation, although it performs a compare-and-swap
119 /// loop that may repeat if another producer is concurrently calling `push`
120 /// on the same `TransferStack`.
121 ///
122 /// This takes ownership over `element` through its [owning `Handle`
123 /// type](Linked::Handle). If the `TransferStack` is dropped before the
124 /// pushed `element` is removed from the stack, the `element` will be dropped.
125 pub fn push_was_empty(&self, element: T::Handle) -> bool {
126 let ptr = T::into_ptr(element);
127 test_trace!(?ptr, "TransferStack::push");
128 let links = unsafe { T::links(ptr).as_mut() };
129 debug_assert!(links.next.with(|next| unsafe { (*next).is_none() }));
130
131 let mut head = self.head.load(Relaxed);
132 loop {
133 test_trace!(?ptr, ?head, "TransferStack::push");
134 links.next.with_mut(|next| unsafe {
135 *next = NonNull::new(head);
136 });
137
138 match self
139 .head
140 .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire)
141 {
142 Ok(old) => {
143 let was_empty = old.is_null();
144 test_trace!(?ptr, ?head, was_empty, "TransferStack::push -> pushed");
145 return was_empty;
146 }
147 Err(actual) => head = actual,
148 }
149 }
150 }
151
152 /// Takes all elements *currently* in this `TransferStack`, returning a new
153 /// mutable [`Stack`] containing those elements.
154 ///
155 /// This is an *O*(1) operation which does not allocate memory. It will
156 /// never loop and does not spin.
157 #[must_use]
158 pub fn take_all(&self) -> Stack<T> {
159 let head = self.head.swap(ptr::null_mut(), AcqRel);
160 let head = NonNull::new(head);
161 Stack { head }
162 }
163 }
164
165 impl<T> Drop for TransferStack<T>
166 where
167 T: Linked<Links<T>>,
168 {
169 fn drop(&mut self) {
170 // The stack owns any entries that are still in the stack; ensure they
171 // are dropped before dropping the stack.
172 for entry in self.take_all() {
173 drop(entry);
174 }
175 }
176 }
177
178 impl<T> fmt::Debug for TransferStack<T>
179 where
180 T: Linked<Links<T>>,
181 {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 let Self { head } = self;
184 f.debug_struct("TransferStack").field("head", head).finish()
185 }
186 }
187
188 impl<T> Default for TransferStack<T>
189 where
190 T: Linked<Links<T>>,
191 {
192 fn default() -> Self {
193 Self::new()
194 }
195 }
196}
197
198/// An [intrusive] singly-linked mutable FIFO stack.
199///
200/// This is a very simple implementation of a linked `Stack`, which provides
201/// *O*(1) [`push`](Self::push) and [`pop`](Self::pop) operations. Items are
202/// popped from the stack in the opposite order that they were pushed in.
203///
204/// A [`Stack`] also implements the [`Iterator`] trait, with the
205/// [`Iterator::next`] method popping elements from the end of the stack.
206///
207/// In order to be part of a `Stack`, a type `T` must implement
208/// the [`Linked`] trait for [`stack::Links<T>`](Links).
209///
210/// Pushing elements into a `Stack` takes ownership of those elements
211/// through an owning [`Handle` type](Linked::Handle). Dropping a
212/// `Stack` drops all elements currently linked into the stack.
213///
214/// [intrusive]: crate#intrusive-data-structures
215pub struct Stack<T: Linked<Links<T>>> {
216 pub(crate) head: Option<NonNull<T>>,
217}
218
219/// Singly-linked-list linkage
220///
221/// Links to other nodes in a [`TransferStack`], [`Stack`], or [`SortedList`].
222///
223/// In order to be part of a [`TransferStack`], [`Stack`], or [`SortedList`],
224/// a type must contain an instance of this type, and must implement the
225/// [`Linked`] trait for `Links<Self>`.
226///
227/// [`SortedList`]: crate::SortedList
228//
229// TODO(AJM): In the next breaking change, we might want to specifically have
230// a `SingleLinks` and `DoubleLinks` type to make the relationship more clear,
231// instead of "stack" being singly-flavored and "list" being doubly-flavored
232pub struct Links<T> {
233 /// The next node in the queue.
234 pub(crate) next: UnsafeCell<Option<NonNull<T>>>,
235
236 /// Linked list links must always be `!Unpin`, in order to ensure that they
237 /// never recieve LLVM `noalias` annotations; see also
238 /// <https://github.com/rust-lang/rust/issues/63818>.
239 _unpin: PhantomPinned,
240}
241
242// === impl Stack ===
243
244impl<T> Stack<T>
245where
246 T: Linked<Links<T>>,
247{
248 /// Returns a new `Stack` with no elements in it.
249 #[must_use]
250 pub const fn new() -> Self {
251 Self { head: None }
252 }
253
254 /// Pushes `element` onto the end of this `Stack`, taking ownership
255 /// of it.
256 ///
257 /// This is an *O*(1) operation that does not allocate memory. It will never
258 /// loop.
259 ///
260 /// This takes ownership over `element` through its [owning `Handle`
261 /// type](Linked::Handle). If the `Stack` is dropped before the
262 /// pushed `element` is [`pop`](Self::pop)pped from the stack, the `element`
263 /// will be dropped.
264 pub fn push(&mut self, element: T::Handle) {
265 let ptr = T::into_ptr(element);
266 test_trace!(?ptr, ?self.head, "Stack::push");
267 unsafe {
268 // Safety: we have exclusive mutable access to the stack, and
269 // therefore can also mutate the stack's entries.
270 let links = T::links(ptr).as_mut();
271 links.next.with_mut(|next| {
272 debug_assert!((*next).is_none());
273 *next = self.head.replace(ptr);
274 })
275 }
276 }
277
278 /// Returns the element most recently [push](Self::push)ed to this `Stack`,
279 /// or `None` if the stack is empty.
280 ///
281 /// This is an *O*(1) operation which does not allocate memory. It will
282 /// never loop and does not spin.
283 #[must_use]
284 pub fn pop(&mut self) -> Option<T::Handle> {
285 test_trace!(?self.head, "Stack::pop");
286 let head = self.head.take()?;
287 unsafe {
288 // Safety: we have exclusive ownership over this chunk of stack.
289
290 // advance the head link to the next node after the current one (if
291 // there is one).
292 self.head = T::links(head).as_mut().next.with_mut(|next| (*next).take());
293
294 test_trace!(?self.head, "Stack::pop -> popped");
295
296 // return the current node
297 Some(T::from_ptr(head))
298 }
299 }
300
301 /// Takes all elements *currently* in this `Stack`, returning a new
302 /// mutable `Stack` containing those elements.
303 ///
304 /// This is an *O*(1) operation which does not allocate memory. It will
305 /// never loop and does not spin.
306 #[must_use]
307 pub fn take_all(&mut self) -> Self {
308 Self {
309 head: self.head.take(),
310 }
311 }
312
313 /// Returns `true` if this `Stack` is empty.
314 #[inline]
315 #[must_use]
316 pub fn is_empty(&self) -> bool {
317 self.head.is_none()
318 }
319}
320
321impl<T> Drop for Stack<T>
322where
323 T: Linked<Links<T>>,
324{
325 fn drop(&mut self) {
326 // The stack owns any entries that are still in the stack; ensure they
327 // are dropped before dropping the stack.
328 for entry in self {
329 drop(entry);
330 }
331 }
332}
333
334impl<T> fmt::Debug for Stack<T>
335where
336 T: Linked<Links<T>>,
337{
338 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
339 let Self { head } = self;
340 f.debug_struct("Stack").field("head", head).finish()
341 }
342}
343
344impl<T> Iterator for Stack<T>
345where
346 T: Linked<Links<T>>,
347{
348 type Item = T::Handle;
349
350 fn next(&mut self) -> Option<Self::Item> {
351 self.pop()
352 }
353}
354
355impl<T> Default for Stack<T>
356where
357 T: Linked<Links<T>>,
358{
359 fn default() -> Self {
360 Self::new()
361 }
362}
363
364/// # Safety
365///
366/// A `Stack` is `Send` if `T` is send, because moving it across threads
367/// also implicitly moves any `T`s in the stack.
368unsafe impl<T> Send for Stack<T>
369where
370 T: Send,
371 T: Linked<Links<T>>,
372{
373}
374
375unsafe impl<T> Sync for Stack<T>
376where
377 T: Sync,
378 T: Linked<Links<T>>,
379{
380}
381
382// === impl Links ===
383
384impl<T> Links<T> {
385 /// Returns new [`TransferStack`] links.
386 #[cfg(not(loom))]
387 #[must_use]
388 pub const fn new() -> Self {
389 Self {
390 next: UnsafeCell::new(None),
391 _unpin: PhantomPinned,
392 }
393 }
394
395 /// Returns new [`TransferStack`] links.
396 #[cfg(loom)]
397 #[must_use]
398 pub fn new() -> Self {
399 Self {
400 next: UnsafeCell::new(None),
401 _unpin: PhantomPinned,
402 }
403 }
404}
405
406/// # Safety
407///
408/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may
409/// mutably alias another value, but the links can only be _accessed_ by the
410/// owner of the [`TransferStack`] itself, because the pointers are private. As
411/// long as [`TransferStack`] upholds its own invariants, `Links` should not
412/// make a type `!Send`.
413unsafe impl<T: Send> Send for Links<T> {}
414
415/// # Safety
416///
417/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may
418/// mutably alias another value, but the links can only be _accessed_ by the
419/// owner of the [`TransferStack`] itself, because the pointers are private. As
420/// long as [`TransferStack`] upholds its own invariants, `Links` should not
421/// make a type `!Send`.
422unsafe impl<T: Sync> Sync for Links<T> {}
423
424impl<T> fmt::Debug for Links<T> {
425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426 f.write_str("transfer_stack::Links { ... }")
427 }
428}
429
430impl<T> Default for Links<T> {
431 fn default() -> Self {
432 Self::new()
433 }
434}
435
436#[cfg(test)]
437mod loom {
438 use super::*;
439 use crate::loom::{
440 self,
441 sync::{
442 atomic::{AtomicUsize, Ordering},
443 Arc,
444 },
445 thread,
446 };
447 use test_util::Entry;
448
449 #[test]
450 fn multithreaded_push() {
451 const PUSHES: i32 = 2;
452 loom::model(|| {
453 let stack = Arc::new(TransferStack::new());
454 let threads = Arc::new(AtomicUsize::new(2));
455 let thread1 = thread::spawn({
456 let stack = stack.clone();
457 let threads = threads.clone();
458 move || {
459 Entry::push_all(&stack, 1, PUSHES);
460 threads.fetch_sub(1, Ordering::Relaxed);
461 }
462 });
463
464 let thread2 = thread::spawn({
465 let stack = stack.clone();
466 let threads = threads.clone();
467 move || {
468 Entry::push_all(&stack, 2, PUSHES);
469 threads.fetch_sub(1, Ordering::Relaxed);
470 }
471 });
472
473 let mut seen = Vec::new();
474
475 loop {
476 seen.extend(stack.take_all().map(|entry| entry.val));
477
478 if threads.load(Ordering::Relaxed) == 0 {
479 break;
480 }
481
482 thread::yield_now();
483 }
484
485 seen.extend(stack.take_all().map(|entry| entry.val));
486
487 seen.sort();
488 assert_eq!(seen, vec![10, 11, 20, 21]);
489
490 thread1.join().unwrap();
491 thread2.join().unwrap();
492 })
493 }
494
495 #[test]
496 fn multithreaded_pop() {
497 const PUSHES: i32 = 2;
498 loom::model(|| {
499 let stack = Arc::new(TransferStack::new());
500 let thread1 = thread::spawn({
501 let stack = stack.clone();
502 move || Entry::push_all(&stack, 1, PUSHES)
503 });
504
505 let thread2 = thread::spawn({
506 let stack = stack.clone();
507 move || Entry::push_all(&stack, 2, PUSHES)
508 });
509
510 let thread3 = thread::spawn({
511 let stack = stack.clone();
512 move || stack.take_all().map(|entry| entry.val).collect::<Vec<_>>()
513 });
514
515 let seen_thread0 = stack.take_all().map(|entry| entry.val).collect::<Vec<_>>();
516 let seen_thread3 = thread3.join().unwrap();
517
518 thread1.join().unwrap();
519 thread2.join().unwrap();
520
521 let seen_thread0_final = stack.take_all().map(|entry| entry.val).collect::<Vec<_>>();
522
523 let mut all = dbg!(seen_thread0);
524 all.extend(dbg!(seen_thread3));
525 all.extend(dbg!(seen_thread0_final));
526
527 all.sort();
528 assert_eq!(all, vec![10, 11, 20, 21]);
529 })
530 }
531
532 #[test]
533 fn doesnt_leak() {
534 const PUSHES: i32 = 2;
535 loom::model(|| {
536 let stack = Arc::new(TransferStack::new());
537 let thread1 = thread::spawn({
538 let stack = stack.clone();
539 move || Entry::push_all(&stack, 1, PUSHES)
540 });
541
542 let thread2 = thread::spawn({
543 let stack = stack.clone();
544 move || Entry::push_all(&stack, 2, PUSHES)
545 });
546
547 tracing::info!("dropping stack");
548 drop(stack);
549
550 thread1.join().unwrap();
551 thread2.join().unwrap();
552 })
553 }
554
555 #[test]
556 fn take_all_doesnt_leak() {
557 const PUSHES: i32 = 2;
558 loom::model(|| {
559 let stack = Arc::new(TransferStack::new());
560 let thread1 = thread::spawn({
561 let stack = stack.clone();
562 move || Entry::push_all(&stack, 1, PUSHES)
563 });
564
565 let thread2 = thread::spawn({
566 let stack = stack.clone();
567 move || Entry::push_all(&stack, 2, PUSHES)
568 });
569
570 thread1.join().unwrap();
571 thread2.join().unwrap();
572
573 let take_all = stack.take_all();
574
575 tracing::info!("dropping stack");
576 drop(stack);
577
578 tracing::info!("dropping take_all");
579 drop(take_all);
580 })
581 }
582
583 #[test]
584 fn take_all_doesnt_leak_racy() {
585 const PUSHES: i32 = 2;
586 loom::model(|| {
587 let stack = Arc::new(TransferStack::new());
588 let thread1 = thread::spawn({
589 let stack = stack.clone();
590 move || Entry::push_all(&stack, 1, PUSHES)
591 });
592
593 let thread2 = thread::spawn({
594 let stack = stack.clone();
595 move || Entry::push_all(&stack, 2, PUSHES)
596 });
597
598 let take_all = stack.take_all();
599
600 thread1.join().unwrap();
601 thread2.join().unwrap();
602
603 tracing::info!("dropping stack");
604 drop(stack);
605
606 tracing::info!("dropping take_all");
607 drop(take_all);
608 })
609 }
610
611 #[test]
612 fn unsync() {
613 loom::model(|| {
614 let mut stack = Stack::<Entry>::new();
615 stack.push(Entry::new(1));
616 stack.push(Entry::new(2));
617 stack.push(Entry::new(3));
618 let mut take_all = stack.take_all();
619
620 for i in (1..=3).rev() {
621 assert_eq!(take_all.next().unwrap().val, i);
622 stack.push(Entry::new(10 + i));
623 }
624
625 let mut i = 11;
626 for entry in stack.take_all() {
627 assert_eq!(entry.val, i);
628 i += 1;
629 }
630 })
631 }
632
633 #[test]
634 fn unsync_doesnt_leak() {
635 loom::model(|| {
636 let mut stack = Stack::<Entry>::new();
637 stack.push(Entry::new(1));
638 stack.push(Entry::new(2));
639 stack.push(Entry::new(3));
640 })
641 }
642}
643
644#[cfg(test)]
645mod test {
646 use super::{test_util::Entry, *};
647
648 #[test]
649 fn stack_is_send_sync() {
650 crate::util::assert_send_sync::<TransferStack<Entry>>()
651 }
652
653 #[test]
654 fn links_are_send_sync() {
655 crate::util::assert_send_sync::<Links<Entry>>()
656 }
657}
658
659#[cfg(test)]
660pub(crate) mod test_util {
661 use super::*;
662 use crate::loom::alloc;
663 use core::pin::Pin;
664 use core::ptr;
665
666 #[pin_project::pin_project]
667 pub(crate) struct Entry {
668 #[pin]
669 links: Links<Entry>,
670 pub(crate) val: i32,
671 track: alloc::Track<()>,
672 }
673
674 // ----------------------------------------------------------------------
675 // Helper impls for `sorted_list`
676 impl PartialEq for Entry {
677 fn eq(&self, other: &Self) -> bool {
678 self.val.eq(&other.val)
679 }
680 }
681
682 impl Eq for Entry {}
683
684 impl PartialOrd for Entry {
685 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
686 Some(self.cmp(other))
687 }
688 }
689
690 impl Ord for Entry {
691 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
692 self.val.cmp(&other.val)
693 }
694 }
695 // ----------------------------------------------------------------------
696
697 unsafe impl Linked<Links<Self>> for Entry {
698 type Handle = Pin<Box<Entry>>;
699
700 fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Self> {
701 unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
702 }
703
704 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
705 // Safety: if this function is only called by the linked list
706 // implementation (and it is not intended for external use), we can
707 // expect that the `NonNull` was constructed from a reference which
708 // was pinned.
709 //
710 // If other callers besides `List`'s internals were to call this on
711 // some random `NonNull<Entry>`, this would not be the case, and
712 // this could be constructing an erroneous `Pin` from a referent
713 // that may not be pinned!
714 Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
715 }
716
717 unsafe fn links(target: NonNull<Self>) -> NonNull<Links<Self>> {
718 let links = ptr::addr_of_mut!((*target.as_ptr()).links);
719 // Safety: it's fine to use `new_unchecked` here; if the pointer that we
720 // offset to the `links` field is not null (which it shouldn't be, as we
721 // received it as a `NonNull`), the offset pointer should therefore also
722 // not be null.
723 NonNull::new_unchecked(links)
724 }
725 }
726
727 impl Entry {
728 pub(crate) fn new(val: i32) -> Pin<Box<Entry>> {
729 Box::pin(Entry {
730 links: Links::new(),
731 val,
732 track: alloc::Track::new(()),
733 })
734 }
735
736 pub(super) fn push_all(stack: &TransferStack<Self>, thread: i32, n: i32) {
737 for i in 0..n {
738 let entry = Self::new((thread * 10) + i);
739 stack.push(entry);
740 }
741 }
742 }
743}