asynchronix/util/
spsc_queue.rs1#![allow(unused)]
5
6use std::cell::Cell;
7use std::error::Error;
8use std::fmt;
9use std::marker::PhantomData;
10use std::mem::MaybeUninit;
11use std::panic::{RefUnwindSafe, UnwindSafe};
12use std::ptr::{self, NonNull};
13use std::sync::atomic::Ordering;
14
15use crossbeam_utils::CachePadded;
16
17use crate::loom_exports::cell::UnsafeCell;
18use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr};
19use crate::loom_exports::sync::Arc;
20
21const SEGMENT_LEN: usize = 32;
23
24struct Slot<T> {
26 has_value: AtomicBool,
27 value: UnsafeCell<MaybeUninit<T>>,
28}
29
30impl<T> Default for Slot<T> {
31 fn default() -> Self {
32 Slot {
33 has_value: AtomicBool::new(false),
34 value: UnsafeCell::new(MaybeUninit::uninit()),
35 }
36 }
37}
38
39struct Segment<T> {
41 next_segment: AtomicPtr<Segment<T>>,
45 data: [Slot<T>; SEGMENT_LEN],
46}
47
48impl<T> Segment<T> {
49 fn allocate_new() -> NonNull<Self> {
51 let segment = Self {
52 next_segment: AtomicPtr::new(ptr::null_mut()),
53 data: Default::default(),
54 };
55
56 unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) }
58 }
59}
60
61struct Head<T> {
63 segment: NonNull<Segment<T>>,
65 next_read_idx: usize,
70}
71
72struct Tail<T> {
74 segment: NonNull<Segment<T>>,
76 next_write_idx: usize,
81}
82
83struct Queue<T> {
85 head: CachePadded<UnsafeCell<Head<T>>>,
86 tail: CachePadded<UnsafeCell<Tail<T>>>,
87}
88
89impl<T> Queue<T> {
90 fn new() -> Self {
92 let segment = Segment::allocate_new();
93
94 let head = Head {
95 segment,
96 next_read_idx: 0,
97 };
98 let tail = Tail {
99 segment,
100 next_write_idx: 0,
101 };
102
103 Self {
104 head: CachePadded::new(UnsafeCell::new(head)),
105 tail: CachePadded::new(UnsafeCell::new(tail)),
106 }
107 }
108
109 unsafe fn push(&self, value: T) {
115 let tail = self.tail.with_mut(|p| &mut *p);
117
118 if tail.next_write_idx == SEGMENT_LEN {
120 let old_segment = tail.segment;
121 tail.segment = Segment::allocate_new();
122
123 old_segment
127 .as_ref()
128 .next_segment
129 .store(tail.segment.as_ptr(), Ordering::Release);
130
131 tail.next_write_idx = 0;
132 }
133
134 let data = &tail.segment.as_ref().data[tail.next_write_idx];
137
138 data.value.with_mut(|p| (*p).write(value));
141
142 data.has_value.store(true, Ordering::Release);
146
147 tail.next_write_idx += 1;
148 }
149
150 unsafe fn pop(&self) -> Option<T> {
156 let head = self.head.with_mut(|p| &mut *p);
158
159 if head.next_read_idx == SEGMENT_LEN {
161 let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire);
166 let next_segment = NonNull::new(next_segment)?;
167
168 let _ = Box::from_raw(head.segment.as_ptr());
174
175 head.segment = next_segment;
177 head.next_read_idx = 0;
178 }
179
180 let data = &head.segment.as_ref().data[head.next_read_idx];
181
182 if !data.has_value.load(Ordering::Acquire) {
186 return None;
187 }
188
189 let value = data.value.with(|p| (*p).assume_init_read());
192
193 head.next_read_idx += 1;
194
195 Some(value)
196 }
197}
198
199impl<T> Drop for Queue<T> {
200 fn drop(&mut self) {
201 unsafe {
202 while self.pop().is_some() {}
204
205 let head = self.head.with_mut(|p| &mut *p);
210
211 let _ = Box::from_raw(head.segment.as_ptr());
215 }
216 }
217}
218
219unsafe impl<T: Send> Send for Queue<T> {}
220unsafe impl<T: Send> Sync for Queue<T> {}
221
222impl<T> UnwindSafe for Queue<T> {}
223impl<T> RefUnwindSafe for Queue<T> {}
224
225pub(crate) struct Producer<T> {
227 queue: Arc<Queue<T>>,
228 _non_sync_phantom: PhantomData<Cell<()>>,
229}
230impl<T> Producer<T> {
231 pub(crate) fn push(&self, value: T) -> Result<(), PushError> {
233 if Arc::strong_count(&self.queue) == 1 {
234 return Err(PushError {});
235 }
236
237 unsafe { self.queue.push(value) };
238
239 Ok(())
240 }
241}
242
243#[derive(Debug, PartialEq, Eq, Clone, Copy)]
244pub(crate) struct PushError {}
246
247impl fmt::Display for PushError {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "sending message into a closed mailbox")
250 }
251}
252
253impl Error for PushError {}
254
255pub(crate) struct Consumer<T> {
257 queue: Arc<Queue<T>>,
258 _non_sync_phantom: PhantomData<Cell<()>>,
259}
260impl<T> Consumer<T> {
261 pub(crate) fn pop(&self) -> Option<T> {
263 unsafe { self.queue.pop() }
264 }
265}
266
267pub(crate) fn spsc_queue<T>() -> (Producer<T>, Consumer<T>) {
270 let queue = Arc::new(Queue::new());
271
272 let producer = Producer {
273 queue: queue.clone(),
274 _non_sync_phantom: PhantomData,
275 };
276 let consumer = Consumer {
277 queue,
278 _non_sync_phantom: PhantomData,
279 };
280
281 (producer, consumer)
282}
283
284#[cfg(all(test, not(asynchronix_loom)))]
286mod tests {
287 use super::*;
288
289 use std::thread;
290
291 #[test]
292 fn spsc_queue_basic() {
293 const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 };
294
295 let (producer, consumer) = spsc_queue();
296
297 let th = thread::spawn(move || {
298 for i in 0..VALUE_COUNT {
299 let value = loop {
300 if let Some(v) = consumer.pop() {
301 break v;
302 }
303 };
304
305 assert_eq!(value, i);
306 }
307 });
308
309 for i in 0..VALUE_COUNT {
310 producer.push(i).unwrap();
311 }
312
313 th.join().unwrap();
314 }
315}
316
317#[cfg(all(test, asynchronix_loom))]
319mod tests {
320 use super::*;
321
322 use loom::model::Builder;
323 use loom::thread;
324
325 #[test]
326 fn loom_spsc_queue_basic() {
327 const DEFAULT_PREEMPTION_BOUND: usize = 4;
328 const VALUE_COUNT: usize = 10;
329
330 let mut builder = Builder::new();
331 if builder.preemption_bound.is_none() {
332 builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
333 }
334
335 builder.check(move || {
336 let (producer, consumer) = spsc_queue();
337
338 let th = thread::spawn(move || {
339 let mut value = 0;
340 for _ in 0..VALUE_COUNT {
341 if let Some(v) = consumer.pop() {
342 assert_eq!(v, value);
343 value += 1;
344 }
345 }
346 });
347
348 for i in 0..VALUE_COUNT {
349 let _ = producer.push(i);
350 }
351
352 th.join().unwrap();
353 });
354 }
355
356 #[test]
357 fn loom_spsc_queue_new_segment() {
358 const DEFAULT_PREEMPTION_BOUND: usize = 4;
359 const VALUE_COUNT_BEFORE: usize = 5;
360 const VALUE_COUNT_AFTER: usize = 5;
361
362 let mut builder = Builder::new();
363 if builder.preemption_bound.is_none() {
364 builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
365 }
366
367 builder.check(move || {
368 let (producer, consumer) = spsc_queue();
369
370 for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) {
372 producer.push(i).unwrap();
373 consumer.pop();
374 }
375
376 let th = thread::spawn(move || {
377 let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE;
378 for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
379 if let Some(v) = consumer.pop() {
380 assert_eq!(v, value);
381 value += 1;
382 }
383 }
384 });
385
386 for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
387 let _ = producer.push(i);
388 }
389
390 th.join().unwrap();
391 });
392 }
393}