Skip to main content

kovan_queue/
seg_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11/// Slot state: Empty, ready to be written.
12const SLOT_EMPTY: usize = 0;
13/// Slot state: Currently being written to.
14const SLOT_WRITING: usize = 1;
15/// Slot state: Contains a value, ready to be read.
16const SLOT_WRITTEN: usize = 2;
17/// Slot state: Value has been read/consumed.
18const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21    state: AtomicUsize,
22    value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27    retired: RetiredNode,
28    slots: [Slot<T>; SEGMENT_SIZE],
29    next: Atomic<Segment<T>>,
30    id: usize,
31}
32
33impl<T> Segment<T> {
34    fn new(id: usize) -> Segment<T> {
35        // Use `core::array::from_fn` to construct each slot explicitly with known-good values.
36        let slots = core::array::from_fn(|_| Slot {
37            state: AtomicUsize::new(SLOT_EMPTY),
38            value: UnsafeCell::new(MaybeUninit::uninit()),
39        });
40        Segment {
41            retired: RetiredNode::new(),
42            slots,
43            next: Atomic::null(),
44            id,
45        }
46    }
47}
48
49pub struct SegQueue<T> {
50    head: CacheAligned<Atomic<Segment<T>>>,
51    tail: CacheAligned<Atomic<Segment<T>>>,
52    len: AtomicUsize,
53}
54
55unsafe impl<T: Send> Send for SegQueue<T> {}
56unsafe impl<T: Send> Sync for SegQueue<T> {}
57
58impl<T: 'static> Default for SegQueue<T> {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl<T: 'static> SegQueue<T> {
65    /// Creates a new unbounded queue.
66    pub fn new() -> SegQueue<T> {
67        let segment = Box::into_raw(Box::new(Segment::new(0)));
68        let head = Atomic::new(segment);
69        let tail = Atomic::new(segment);
70
71        SegQueue {
72            head: CacheAligned::new(head),
73            tail: CacheAligned::new(tail),
74            len: AtomicUsize::new(0),
75        }
76    }
77
78    /// Pushes an element into the queue.
79    pub fn push(&self, value: T) {
80        let backoff = crossbeam_utils::Backoff::new();
81        let guard = pin();
82
83        loop {
84            let tail = self.tail.load(Ordering::Acquire, &guard);
85
86            if tail.is_null() {
87                continue;
88            }
89
90            let t = unsafe { tail.as_ref().unwrap() };
91            let next = t.next.load(Ordering::Acquire, &guard);
92
93            if !next.is_null() {
94                let _ = self.tail.compare_exchange(
95                    tail,
96                    next,
97                    Ordering::SeqCst,
98                    Ordering::Relaxed,
99                    &guard,
100                );
101                continue;
102            }
103
104            for i in 0..SEGMENT_SIZE {
105                let slot = &t.slots[i];
106                let state = slot.state.load(Ordering::Acquire);
107
108                if state == SLOT_EMPTY {
109                    if slot
110                        .state
111                        .compare_exchange(
112                            SLOT_EMPTY,
113                            SLOT_WRITING,
114                            Ordering::SeqCst,
115                            Ordering::Relaxed,
116                        )
117                        .is_ok()
118                    {
119                        unsafe {
120                            slot.value.get().write(MaybeUninit::new(value));
121                        }
122                        slot.state.store(SLOT_WRITTEN, Ordering::Release);
123                        self.len.fetch_add(1, Ordering::Relaxed);
124                        return;
125                    }
126                } else if state == SLOT_WRITING {
127                    continue;
128                }
129            }
130
131            // Segment is full, allocate new one
132            let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
133            let new_shared = unsafe { Shared::from_raw(new_segment) };
134
135            // Shared::null() doesn't exist, use from_raw(null) or check logic.
136            // compare_exchange expects Shared.
137            // We want to compare against null.
138            let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
139
140            if t.next
141                .compare_exchange(
142                    null_shared,
143                    new_shared,
144                    Ordering::SeqCst,
145                    Ordering::Relaxed,
146                    &guard,
147                )
148                .is_ok()
149            {
150                let _ = self.tail.compare_exchange(
151                    tail,
152                    new_shared,
153                    Ordering::SeqCst,
154                    Ordering::Relaxed,
155                    &guard,
156                );
157            } else {
158                unsafe { drop(Box::from_raw(new_segment)) };
159            }
160            backoff.snooze();
161        }
162    }
163
164    /// Pops an element from the queue.
165    pub fn pop(&self) -> Option<T> {
166        let backoff = crossbeam_utils::Backoff::new();
167        let guard = pin();
168
169        loop {
170            let head = self.head.load(Ordering::Acquire, &guard);
171            let h = unsafe { head.as_ref().unwrap() };
172
173            let mut all_consumed = true;
174
175            for i in 0..SEGMENT_SIZE {
176                let slot = &h.slots[i];
177                let state = slot.state.load(Ordering::Acquire);
178
179                if state == SLOT_WRITTEN {
180                    if slot
181                        .state
182                        .compare_exchange(
183                            SLOT_WRITTEN,
184                            SLOT_CONSUMED,
185                            Ordering::SeqCst,
186                            Ordering::Relaxed,
187                        )
188                        .is_ok()
189                    {
190                        let value = unsafe { slot.value.get().read().assume_init() };
191                        self.len.fetch_sub(1, Ordering::Relaxed);
192                        return Some(value);
193                    }
194                } else if state == SLOT_EMPTY {
195                    let next = h.next.load(Ordering::Acquire, &guard);
196                    if next.is_null() {
197                        return None;
198                    }
199                }
200
201                if slot.state.load(Ordering::Acquire) != SLOT_CONSUMED {
202                    all_consumed = false;
203                }
204            }
205
206            let next = h.next.load(Ordering::Acquire, &guard);
207            if all_consumed
208                && !next.is_null()
209                && self
210                    .head
211                    .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
212                    .is_ok()
213            {
214                // Retire the old segment
215                // kovan::retire takes *mut T
216                unsafe { kovan::retire(head.as_raw()) };
217                continue;
218            }
219
220            let current_head = self.head.load(Ordering::Acquire, &guard);
221            if current_head != head {
222                continue;
223            }
224
225            if h.next.load(Ordering::Acquire, &guard).is_null() {
226                return None;
227            }
228
229            backoff.snooze();
230        }
231    }
232
233    /// Returns the number of elements in the queue.
234    ///
235    /// This is an approximation in concurrent scenarios: elements may be pushed
236    /// or popped by other threads between the moment the length is read and the
237    /// moment the caller acts on the returned value.
238    #[inline]
239    pub fn len(&self) -> usize {
240        self.len.load(Ordering::Relaxed)
241    }
242
243    /// Returns `true` if the queue is empty.
244    #[inline]
245    pub fn is_empty(&self) -> bool {
246        self.len() == 0
247    }
248}
249
250impl<T> Drop for SegQueue<T> {
251    fn drop(&mut self) {
252        // `pop()` calls `kovan::retire(head.as_raw())` on a segment *only after* a
253        // successful CAS that removes it from `self.head`.  Once retired, that segment
254        // is no longer reachable via the linked list.  `drop()` receives `&mut self`
255        // (exclusive ownership), so no concurrent pops can run.  The walk below starts
256        // from the current `self.head` and only visits segments that are *still owned*
257        // by the queue (i.e., not yet retired).  Kovan will independently reclaim the
258        // previously retired segments; `drop()` never touches them.
259        //
260        // The `pin()` below keeps all retired (but not yet freed) nodes alive during
261        // the walk.  When `guard` drops, kovan may free those retired nodes — but by
262        // that point we have already finished walking the live portion of the list.
263        let guard = pin();
264        let mut current = self.head.load(Ordering::Relaxed, &guard);
265
266        while !current.is_null() {
267            unsafe {
268                let segment_ptr = current.as_raw();
269                let segment = &*segment_ptr;
270                let next = segment.next.load(Ordering::Relaxed, &guard);
271
272                for i in 0..SEGMENT_SIZE {
273                    if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
274                        // drop_in_place on MaybeUninit<T> cast to *mut T is correct:
275                        // MaybeUninit<T> has the same layout as T and we confirmed
276                        // the slot is SLOT_WRITTEN (value is initialized).
277                        ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
278                    }
279                }
280
281                // SAFETY: segment_ptr was allocated via Box::into_raw in Segment::new()
282                // and has not been retired (it is still in the live linked list).
283                drop(Box::from_raw(segment_ptr));
284
285                current = next;
286            }
287        }
288    }
289}