Skip to main content

kovan_queue/
seg_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11/// Slot state: Empty, ready to be written.
12const SLOT_EMPTY: usize = 0;
13/// Slot state: Currently being written to.
14const SLOT_WRITING: usize = 1;
15/// Slot state: Contains a value, ready to be read.
16const SLOT_WRITTEN: usize = 2;
17/// Slot state: Value has been read/consumed.
18const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21    state: AtomicUsize,
22    value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27    retired: RetiredNode,
28    slots: [Slot<T>; SEGMENT_SIZE],
29    next: Atomic<Segment<T>>,
30    id: usize,
31}
32
33impl<T> Segment<T> {
34    fn new(id: usize) -> Segment<T> {
35        let mut slots: [Slot<T>; SEGMENT_SIZE] = unsafe { MaybeUninit::zeroed().assume_init() };
36        for slot in &mut slots {
37            slot.state = AtomicUsize::new(SLOT_EMPTY);
38        }
39        Segment {
40            retired: RetiredNode::new(),
41            slots,
42            next: Atomic::null(),
43            id,
44        }
45    }
46}
47
48pub struct SegQueue<T> {
49    head: CacheAligned<Atomic<Segment<T>>>,
50    tail: CacheAligned<Atomic<Segment<T>>>,
51}
52
53unsafe impl<T: Send> Send for SegQueue<T> {}
54unsafe impl<T: Send> Sync for SegQueue<T> {}
55
56impl<T: 'static> Default for SegQueue<T> {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl<T: 'static> SegQueue<T> {
63    /// Creates a new unbounded queue.
64    pub fn new() -> SegQueue<T> {
65        let segment = Box::into_raw(Box::new(Segment::new(0)));
66        let head = Atomic::new(segment);
67        let tail = Atomic::new(segment);
68
69        SegQueue {
70            head: CacheAligned::new(head),
71            tail: CacheAligned::new(tail),
72        }
73    }
74
75    /// Pushes an element into the queue.
76    pub fn push(&self, value: T) {
77        let backoff = crossbeam_utils::Backoff::new();
78        let guard = pin();
79
80        loop {
81            let tail = self.tail.load(Ordering::Acquire, &guard);
82
83            if tail.is_null() {
84                continue;
85            }
86
87            let t = unsafe { tail.as_ref().unwrap() };
88            let next = t.next.load(Ordering::Acquire, &guard);
89
90            if !next.is_null() {
91                let _ = self.tail.compare_exchange(
92                    tail,
93                    next,
94                    Ordering::SeqCst,
95                    Ordering::Relaxed,
96                    &guard,
97                );
98                continue;
99            }
100
101            for i in 0..SEGMENT_SIZE {
102                let slot = &t.slots[i];
103                let state = slot.state.load(Ordering::Acquire);
104
105                if state == SLOT_EMPTY {
106                    if slot
107                        .state
108                        .compare_exchange(
109                            SLOT_EMPTY,
110                            SLOT_WRITING,
111                            Ordering::SeqCst,
112                            Ordering::Relaxed,
113                        )
114                        .is_ok()
115                    {
116                        unsafe {
117                            slot.value.get().write(MaybeUninit::new(value));
118                        }
119                        slot.state.store(SLOT_WRITTEN, Ordering::Release);
120                        return;
121                    }
122                } else if state == SLOT_WRITING {
123                    continue;
124                }
125            }
126
127            // Segment is full, allocate new one
128            let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
129            let new_shared = unsafe { Shared::from_raw(new_segment) };
130
131            // Shared::null() doesn't exist, use from_raw(null) or check logic.
132            // compare_exchange expects Shared.
133            // We want to compare against null.
134            let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
135
136            if t.next
137                .compare_exchange(
138                    null_shared,
139                    new_shared,
140                    Ordering::SeqCst,
141                    Ordering::Relaxed,
142                    &guard,
143                )
144                .is_ok()
145            {
146                let _ = self.tail.compare_exchange(
147                    tail,
148                    new_shared,
149                    Ordering::SeqCst,
150                    Ordering::Relaxed,
151                    &guard,
152                );
153            } else {
154                unsafe { drop(Box::from_raw(new_segment)) };
155            }
156            backoff.snooze();
157        }
158    }
159
160    /// Pops an element from the queue.
161    pub fn pop(&self) -> Option<T> {
162        let backoff = crossbeam_utils::Backoff::new();
163        let guard = pin();
164
165        loop {
166            let head = self.head.load(Ordering::Acquire, &guard);
167            let h = unsafe { head.as_ref().unwrap() };
168
169            for i in 0..SEGMENT_SIZE {
170                let slot = &h.slots[i];
171                let state = slot.state.load(Ordering::Acquire);
172
173                if state == SLOT_WRITTEN {
174                    if slot
175                        .state
176                        .compare_exchange(
177                            SLOT_WRITTEN,
178                            SLOT_CONSUMED,
179                            Ordering::SeqCst,
180                            Ordering::Relaxed,
181                        )
182                        .is_ok()
183                    {
184                        let value = unsafe { slot.value.get().read().assume_init() };
185                        return Some(value);
186                    }
187                } else if state == SLOT_EMPTY {
188                    let next = h.next.load(Ordering::Acquire, &guard);
189                    if next.is_null() {
190                        return None;
191                    }
192                }
193            }
194
195            let next = h.next.load(Ordering::Acquire, &guard);
196            if !next.is_null()
197                && self
198                    .head
199                    .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
200                    .is_ok()
201            {
202                // Retire the old segment
203                // kovan::retire takes *mut T
204                kovan::retire(head.as_raw());
205                continue;
206            }
207
208            let current_head = self.head.load(Ordering::Acquire, &guard);
209            if current_head != head {
210                continue;
211            }
212
213            if h.next.load(Ordering::Acquire, &guard).is_null() {
214                return None;
215            }
216
217            backoff.snooze();
218        }
219    }
220}
221
222impl<T> Drop for SegQueue<T> {
223    fn drop(&mut self) {
224        let guard = pin();
225        let mut current = self.head.load(Ordering::Relaxed, &guard);
226
227        while !current.is_null() {
228            unsafe {
229                let segment_ptr = current.as_raw();
230                let segment = &*segment_ptr;
231                let next = segment.next.load(Ordering::Relaxed, &guard);
232
233                for i in 0..SEGMENT_SIZE {
234                    if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
235                        ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
236                    }
237                }
238
239                // In Drop, we have exclusive access. Segments already retired are handled by kovan.
240                // We are responsible for dropping the remaining segments in the list.
241                drop(Box::from_raw(segment_ptr));
242
243                current = next;
244            }
245        }
246    }
247}