asynchronix/util/
spsc_queue.rs

1//! Single-producer single-consumer unbounded FIFO queue that stores values in
2//! fixed-size memory segments.
3
4#![allow(unused)]
5
6use std::cell::Cell;
7use std::error::Error;
8use std::fmt;
9use std::marker::PhantomData;
10use std::mem::MaybeUninit;
11use std::panic::{RefUnwindSafe, UnwindSafe};
12use std::ptr::{self, NonNull};
13use std::sync::atomic::Ordering;
14
15use crossbeam_utils::CachePadded;
16
17use crate::loom_exports::cell::UnsafeCell;
18use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr};
19use crate::loom_exports::sync::Arc;
20
21/// The number of slots in a single segment.
22const SEGMENT_LEN: usize = 32;
23
24/// A slot containing a single value.
25struct Slot<T> {
26    has_value: AtomicBool,
27    value: UnsafeCell<MaybeUninit<T>>,
28}
29
30impl<T> Default for Slot<T> {
31    fn default() -> Self {
32        Slot {
33            has_value: AtomicBool::new(false),
34            value: UnsafeCell::new(MaybeUninit::uninit()),
35        }
36    }
37}
38
39/// A memory segment containing `SEGMENT_LEN` slots.
40struct Segment<T> {
41    /// Address of the next segment.
42    ///
43    /// A null pointer means that the next segment is not allocated yet.
44    next_segment: AtomicPtr<Segment<T>>,
45    data: [Slot<T>; SEGMENT_LEN],
46}
47
48impl<T> Segment<T> {
49    /// Allocates a new segment.
50    fn allocate_new() -> NonNull<Self> {
51        let segment = Self {
52            next_segment: AtomicPtr::new(ptr::null_mut()),
53            data: Default::default(),
54        };
55
56        // Safety: the pointer is non-null since it comes from a box.
57        unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) }
58    }
59}
60
61/// The head of the queue from which values are popped.
62struct Head<T> {
63    /// Pointer to the segment at the head of the queue.
64    segment: NonNull<Segment<T>>,
65    /// Index of the next value to be read.
66    ///
67    /// If the index is equal to the segment length, it is necessary to move to
68    /// the next segment before the next value can be read.
69    next_read_idx: usize,
70}
71
72/// The tail of the queue to which values are pushed.
73struct Tail<T> {
74    /// Pointer to the segment at the tail of the queue.
75    segment: NonNull<Segment<T>>,
76    /// Index of the next value to be written.
77    ///
78    /// If the index is equal to the segment length, a new segment must be
79    /// allocated before a new value can be written.
80    next_write_idx: usize,
81}
82
83/// A single-producer, single-consumer unbounded FIFO queue.
84struct Queue<T> {
85    head: CachePadded<UnsafeCell<Head<T>>>,
86    tail: CachePadded<UnsafeCell<Tail<T>>>,
87}
88
89impl<T> Queue<T> {
90    /// Creates a new queue.
91    fn new() -> Self {
92        let segment = Segment::allocate_new();
93
94        let head = Head {
95            segment,
96            next_read_idx: 0,
97        };
98        let tail = Tail {
99            segment,
100            next_write_idx: 0,
101        };
102
103        Self {
104            head: CachePadded::new(UnsafeCell::new(head)),
105            tail: CachePadded::new(UnsafeCell::new(tail)),
106        }
107    }
108
109    /// Pushes a new value.
110    ///
111    /// # Safety
112    ///
113    /// The method cannot be called from multiple threads concurrently.
114    unsafe fn push(&self, value: T) {
115        // Safety: this is the only thread accessing the tail.
116        let tail = self.tail.with_mut(|p| &mut *p);
117
118        // If the whole segment has been written, allocate a new segment.
119        if tail.next_write_idx == SEGMENT_LEN {
120            let old_segment = tail.segment;
121            tail.segment = Segment::allocate_new();
122
123            // Safety: the old segment is still allocated since the consumer
124            // cannot deallocate it before `next_segment` is set to a non-null
125            // value.
126            old_segment
127                .as_ref()
128                .next_segment
129                .store(tail.segment.as_ptr(), Ordering::Release);
130
131            tail.next_write_idx = 0;
132        }
133
134        // Safety: the tail segment is allocated since the consumer cannot
135        // deallocate it before `next_segment` is set to a non-null value.
136        let data = &tail.segment.as_ref().data[tail.next_write_idx];
137
138        // Safety: we have exclusive access to the slot value since the consumer
139        // cannot access it before `has_value` is set to true.
140        data.value.with_mut(|p| (*p).write(value));
141
142        // Ordering: this Release store synchronizes with the Acquire load in
143        // `pop` and ensures that the value is visible to the consumer once
144        // `has_value` reads `true`.
145        data.has_value.store(true, Ordering::Release);
146
147        tail.next_write_idx += 1;
148    }
149
150    /// Pops a new value.
151    ///
152    /// # Safety
153    ///
154    /// The method cannot be called from multiple threads concurrently.
155    unsafe fn pop(&self) -> Option<T> {
156        // Safety: this is the only thread accessing the head.
157        let head = self.head.with_mut(|p| &mut *p);
158
159        // If the whole segment has been read, try to move to the next segment.
160        if head.next_read_idx == SEGMENT_LEN {
161            // Read the next segment or return `None` if it is not ready yet.
162            //
163            // Safety: the head segment is still allocated since we are the only
164            // thread that can deallocate it.
165            let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire);
166            let next_segment = NonNull::new(next_segment)?;
167
168            // Deallocate the old segment.
169            //
170            // Safety: the pointer was initialized from a box and the segment is
171            // still allocated since we are the only thread that can deallocate
172            // it.
173            let _ = Box::from_raw(head.segment.as_ptr());
174
175            // Update the segment and the next index.
176            head.segment = next_segment;
177            head.next_read_idx = 0;
178        }
179
180        let data = &head.segment.as_ref().data[head.next_read_idx];
181
182        // Ordering: this Acquire load synchronizes with the Release store in
183        // `push` and ensures that the value is visible once `has_value` reads
184        // `true`.
185        if !data.has_value.load(Ordering::Acquire) {
186            return None;
187        }
188
189        // Safety: since `has_value` is `true` then we have exclusive ownership
190        // of the value and we know that it was initialized.
191        let value = data.value.with(|p| (*p).assume_init_read());
192
193        head.next_read_idx += 1;
194
195        Some(value)
196    }
197}
198
199impl<T> Drop for Queue<T> {
200    fn drop(&mut self) {
201        unsafe {
202            // Drop all values.
203            while self.pop().is_some() {}
204
205            // All values have been dropped: the last segment can be freed.
206
207            // Safety: this is the only thread accessing the head since both the
208            // consumer and producer have been dropped.
209            let head = self.head.with_mut(|p| &mut *p);
210
211            // Safety: the pointer was initialized from a box and the segment is
212            // still allocated since we are the only thread that can deallocate
213            // it.
214            let _ = Box::from_raw(head.segment.as_ptr());
215        }
216    }
217}
218
219unsafe impl<T: Send> Send for Queue<T> {}
220unsafe impl<T: Send> Sync for Queue<T> {}
221
222impl<T> UnwindSafe for Queue<T> {}
223impl<T> RefUnwindSafe for Queue<T> {}
224
225/// A handle to a single-producer, single-consumer queue that can push values.
226pub(crate) struct Producer<T> {
227    queue: Arc<Queue<T>>,
228    _non_sync_phantom: PhantomData<Cell<()>>,
229}
230impl<T> Producer<T> {
231    /// Pushes a value to the queue.
232    pub(crate) fn push(&self, value: T) -> Result<(), PushError> {
233        if Arc::strong_count(&self.queue) == 1 {
234            return Err(PushError {});
235        }
236
237        unsafe { self.queue.push(value) };
238
239        Ok(())
240    }
241}
242
243#[derive(Debug, PartialEq, Eq, Clone, Copy)]
244/// Error returned when a push failed due to the consumer being dropped.
245pub(crate) struct PushError {}
246
247impl fmt::Display for PushError {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        write!(f, "sending message into a closed mailbox")
250    }
251}
252
253impl Error for PushError {}
254
255/// A handle to a single-producer, single-consumer queue that can pop values.
256pub(crate) struct Consumer<T> {
257    queue: Arc<Queue<T>>,
258    _non_sync_phantom: PhantomData<Cell<()>>,
259}
260impl<T> Consumer<T> {
261    /// Pops a value from the queue.
262    pub(crate) fn pop(&self) -> Option<T> {
263        unsafe { self.queue.pop() }
264    }
265}
266
267/// Creates the producer and consumer handles of a single-producer,
268/// single-consumer queue.
269pub(crate) fn spsc_queue<T>() -> (Producer<T>, Consumer<T>) {
270    let queue = Arc::new(Queue::new());
271
272    let producer = Producer {
273        queue: queue.clone(),
274        _non_sync_phantom: PhantomData,
275    };
276    let consumer = Consumer {
277        queue,
278        _non_sync_phantom: PhantomData,
279    };
280
281    (producer, consumer)
282}
283
284/// Loom tests.
285#[cfg(all(test, not(asynchronix_loom)))]
286mod tests {
287    use super::*;
288
289    use std::thread;
290
291    #[test]
292    fn spsc_queue_basic() {
293        const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 };
294
295        let (producer, consumer) = spsc_queue();
296
297        let th = thread::spawn(move || {
298            for i in 0..VALUE_COUNT {
299                let value = loop {
300                    if let Some(v) = consumer.pop() {
301                        break v;
302                    }
303                };
304
305                assert_eq!(value, i);
306            }
307        });
308
309        for i in 0..VALUE_COUNT {
310            producer.push(i).unwrap();
311        }
312
313        th.join().unwrap();
314    }
315}
316
317/// Loom tests.
318#[cfg(all(test, asynchronix_loom))]
319mod tests {
320    use super::*;
321
322    use loom::model::Builder;
323    use loom::thread;
324
325    #[test]
326    fn loom_spsc_queue_basic() {
327        const DEFAULT_PREEMPTION_BOUND: usize = 4;
328        const VALUE_COUNT: usize = 10;
329
330        let mut builder = Builder::new();
331        if builder.preemption_bound.is_none() {
332            builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
333        }
334
335        builder.check(move || {
336            let (producer, consumer) = spsc_queue();
337
338            let th = thread::spawn(move || {
339                let mut value = 0;
340                for _ in 0..VALUE_COUNT {
341                    if let Some(v) = consumer.pop() {
342                        assert_eq!(v, value);
343                        value += 1;
344                    }
345                }
346            });
347
348            for i in 0..VALUE_COUNT {
349                let _ = producer.push(i);
350            }
351
352            th.join().unwrap();
353        });
354    }
355
356    #[test]
357    fn loom_spsc_queue_new_segment() {
358        const DEFAULT_PREEMPTION_BOUND: usize = 4;
359        const VALUE_COUNT_BEFORE: usize = 5;
360        const VALUE_COUNT_AFTER: usize = 5;
361
362        let mut builder = Builder::new();
363        if builder.preemption_bound.is_none() {
364            builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
365        }
366
367        builder.check(move || {
368            let (producer, consumer) = spsc_queue();
369
370            // Fill up the first segment except for the last `VALUE_COUNT_BEFORE` slots.
371            for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) {
372                producer.push(i).unwrap();
373                consumer.pop();
374            }
375
376            let th = thread::spawn(move || {
377                let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE;
378                for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
379                    if let Some(v) = consumer.pop() {
380                        assert_eq!(v, value);
381                        value += 1;
382                    }
383                }
384            });
385
386            for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
387                let _ = producer.push(i);
388            }
389
390            th.join().unwrap();
391        });
392    }
393}