Skip to main content

kovan_queue/
seg_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11/// Slot state: Empty, ready to be written.
12const SLOT_EMPTY: usize = 0;
13/// Slot state: Currently being written to.
14const SLOT_WRITING: usize = 1;
15/// Slot state: Contains a value, ready to be read.
16const SLOT_WRITTEN: usize = 2;
17/// Slot state: Value has been read/consumed.
18const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21    state: AtomicUsize,
22    value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27    retired: RetiredNode,
28    slots: [Slot<T>; SEGMENT_SIZE],
29    next: Atomic<Segment<T>>,
30    id: usize,
31}
32
33impl<T> Segment<T> {
34    fn new(id: usize) -> Segment<T> {
35        // Use `core::array::from_fn` to construct each slot explicitly with known-good values.
36        let slots = core::array::from_fn(|_| Slot {
37            state: AtomicUsize::new(SLOT_EMPTY),
38            value: UnsafeCell::new(MaybeUninit::uninit()),
39        });
40        Segment {
41            retired: RetiredNode::new(),
42            slots,
43            next: Atomic::null(),
44            id,
45        }
46    }
47}
48
49pub struct SegQueue<T> {
50    head: CacheAligned<Atomic<Segment<T>>>,
51    tail: CacheAligned<Atomic<Segment<T>>>,
52}
53
54unsafe impl<T: Send> Send for SegQueue<T> {}
55unsafe impl<T: Send> Sync for SegQueue<T> {}
56
57impl<T: 'static> Default for SegQueue<T> {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl<T: 'static> SegQueue<T> {
64    /// Creates a new unbounded queue.
65    pub fn new() -> SegQueue<T> {
66        let segment = Box::into_raw(Box::new(Segment::new(0)));
67        let head = Atomic::new(segment);
68        let tail = Atomic::new(segment);
69
70        SegQueue {
71            head: CacheAligned::new(head),
72            tail: CacheAligned::new(tail),
73        }
74    }
75
76    /// Pushes an element into the queue.
77    pub fn push(&self, value: T) {
78        let backoff = crossbeam_utils::Backoff::new();
79        let guard = pin();
80
81        loop {
82            let tail = self.tail.load(Ordering::Acquire, &guard);
83
84            if tail.is_null() {
85                continue;
86            }
87
88            let t = unsafe { tail.as_ref().unwrap() };
89            let next = t.next.load(Ordering::Acquire, &guard);
90
91            if !next.is_null() {
92                let _ = self.tail.compare_exchange(
93                    tail,
94                    next,
95                    Ordering::SeqCst,
96                    Ordering::Relaxed,
97                    &guard,
98                );
99                continue;
100            }
101
102            for i in 0..SEGMENT_SIZE {
103                let slot = &t.slots[i];
104                let state = slot.state.load(Ordering::Acquire);
105
106                if state == SLOT_EMPTY {
107                    if slot
108                        .state
109                        .compare_exchange(
110                            SLOT_EMPTY,
111                            SLOT_WRITING,
112                            Ordering::SeqCst,
113                            Ordering::Relaxed,
114                        )
115                        .is_ok()
116                    {
117                        unsafe {
118                            slot.value.get().write(MaybeUninit::new(value));
119                        }
120                        slot.state.store(SLOT_WRITTEN, Ordering::Release);
121                        return;
122                    }
123                } else if state == SLOT_WRITING {
124                    continue;
125                }
126            }
127
128            // Segment is full, allocate new one
129            let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
130            let new_shared = unsafe { Shared::from_raw(new_segment) };
131
132            // Shared::null() doesn't exist, use from_raw(null) or check logic.
133            // compare_exchange expects Shared.
134            // We want to compare against null.
135            let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
136
137            if t.next
138                .compare_exchange(
139                    null_shared,
140                    new_shared,
141                    Ordering::SeqCst,
142                    Ordering::Relaxed,
143                    &guard,
144                )
145                .is_ok()
146            {
147                let _ = self.tail.compare_exchange(
148                    tail,
149                    new_shared,
150                    Ordering::SeqCst,
151                    Ordering::Relaxed,
152                    &guard,
153                );
154            } else {
155                unsafe { drop(Box::from_raw(new_segment)) };
156            }
157            backoff.snooze();
158        }
159    }
160
161    /// Pops an element from the queue.
162    pub fn pop(&self) -> Option<T> {
163        let backoff = crossbeam_utils::Backoff::new();
164        let guard = pin();
165
166        loop {
167            let head = self.head.load(Ordering::Acquire, &guard);
168            let h = unsafe { head.as_ref().unwrap() };
169
170            let mut all_consumed = true;
171
172            for i in 0..SEGMENT_SIZE {
173                let slot = &h.slots[i];
174                let state = slot.state.load(Ordering::Acquire);
175
176                if state == SLOT_WRITTEN {
177                    if slot
178                        .state
179                        .compare_exchange(
180                            SLOT_WRITTEN,
181                            SLOT_CONSUMED,
182                            Ordering::SeqCst,
183                            Ordering::Relaxed,
184                        )
185                        .is_ok()
186                    {
187                        let value = unsafe { slot.value.get().read().assume_init() };
188                        return Some(value);
189                    }
190                } else if state == SLOT_EMPTY {
191                    let next = h.next.load(Ordering::Acquire, &guard);
192                    if next.is_null() {
193                        return None;
194                    }
195                }
196
197                if slot.state.load(Ordering::Acquire) != SLOT_CONSUMED {
198                    all_consumed = false;
199                }
200            }
201
202            let next = h.next.load(Ordering::Acquire, &guard);
203            if all_consumed
204                && !next.is_null()
205                && self
206                    .head
207                    .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
208                    .is_ok()
209            {
210                // Retire the old segment
211                // kovan::retire takes *mut T
212                unsafe { kovan::retire(head.as_raw()) };
213                continue;
214            }
215
216            let current_head = self.head.load(Ordering::Acquire, &guard);
217            if current_head != head {
218                continue;
219            }
220
221            if h.next.load(Ordering::Acquire, &guard).is_null() {
222                return None;
223            }
224
225            backoff.snooze();
226        }
227    }
228}
229
230impl<T> Drop for SegQueue<T> {
231    fn drop(&mut self) {
232        // `pop()` calls `kovan::retire(head.as_raw())` on a segment *only after* a
233        // successful CAS that removes it from `self.head`.  Once retired, that segment
234        // is no longer reachable via the linked list.  `drop()` receives `&mut self`
235        // (exclusive ownership), so no concurrent pops can run.  The walk below starts
236        // from the current `self.head` and only visits segments that are *still owned*
237        // by the queue (i.e., not yet retired).  Kovan will independently reclaim the
238        // previously retired segments; `drop()` never touches them.
239        //
240        // The `pin()` below keeps all retired (but not yet freed) nodes alive during
241        // the walk.  When `guard` drops, kovan may free those retired nodes — but by
242        // that point we have already finished walking the live portion of the list.
243        let guard = pin();
244        let mut current = self.head.load(Ordering::Relaxed, &guard);
245
246        while !current.is_null() {
247            unsafe {
248                let segment_ptr = current.as_raw();
249                let segment = &*segment_ptr;
250                let next = segment.next.load(Ordering::Relaxed, &guard);
251
252                for i in 0..SEGMENT_SIZE {
253                    if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
254                        // drop_in_place on MaybeUninit<T> cast to *mut T is correct:
255                        // MaybeUninit<T> has the same layout as T and we confirmed
256                        // the slot is SLOT_WRITTEN (value is initialized).
257                        ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
258                    }
259                }
260
261                // SAFETY: segment_ptr was allocated via Box::into_raw in Segment::new()
262                // and has not been retired (it is still in the live linked list).
263                drop(Box::from_raw(segment_ptr));
264
265                current = next;
266            }
267        }
268    }
269}