1use std::{mem, ptr};
2
3struct SpscQueue<V: Send + Sync> {
4 buffer: *mut V,
5 capacity: usize,
6 capacity_mask: usize,
7 ended: bool,
10 read_next: usize,
11 write_next: usize,
12}
13
14unsafe impl<V: Send + Sync> Send for SpscQueue<V> {}
15
16unsafe impl<V: Send + Sync> Sync for SpscQueue<V> {}
17
18impl<V: Send + Sync> Drop for SpscQueue<V> {
19 fn drop(&mut self) {
20 unsafe {
21 let _ = Vec::from_raw_parts(self.buffer, 0, self.capacity);
22 };
23 }
24}
25
26impl<V: Send + Sync> SpscQueue<V> {
27 pub fn new(capacity_exponent: usize) -> SpscQueue<V> {
28 assert!(capacity_exponent < mem::size_of::<usize>() * 8);
29 let capacity = 1 << capacity_exponent;
30 let mut vec = Vec::with_capacity(capacity);
31 let ptr = vec.as_mut_ptr();
32 mem::forget(vec);
33 SpscQueue {
34 buffer: ptr,
35 capacity,
36 capacity_mask: capacity - 1,
37 ended: false,
38 read_next: 0,
39 write_next: 0,
40 }
41 }
42}
43
44pub struct SpscQueueProducer<V: Send + Sync> {
46 queue: *mut SpscQueue<V>,
47}
48
49unsafe impl<V: Send + Sync> Send for SpscQueueProducer<V> {}
50
51unsafe impl<V: Send + Sync> Sync for SpscQueueProducer<V> {}
52
53impl<V: Send + Sync> Drop for SpscQueueProducer<V> {
54 fn drop(&mut self) {
55 unsafe {
56 let _ = Box::from_raw(self.queue);
57 };
58 }
59}
60
61impl<V: Send + Sync> SpscQueueProducer<V> {
62 pub fn enqueue(&mut self, value: V) -> () {
63 let queue = unsafe { &mut *self.queue };
64 while queue.write_next >= queue.read_next + queue.capacity {
65 };
67 unsafe { ptr::write(queue.buffer.offset((queue.write_next & queue.capacity_mask) as isize), value) };
68 queue.write_next += 1;
70 }
71
72 pub fn finish(&mut self) -> () {
73 let queue = unsafe { &mut *self.queue };
74 queue.ended = true;
75 }
76}
77
78pub enum MaybeDequeued<V> {
79 Ended,
80 None,
81 Some(V),
82}
83
84pub struct SpscQueueConsumer<V: Send + Sync> {
85 queue: *mut SpscQueue<V>,
86}
87
88unsafe impl<V: Send + Sync> Send for SpscQueueConsumer<V> {}
89
90unsafe impl<V: Send + Sync> Sync for SpscQueueConsumer<V> {}
91
92impl<V: Send + Sync> SpscQueueConsumer<V> {
93 #[inline(always)]
94 fn queue(&self) -> &SpscQueue<V> {
95 unsafe { &*self.queue }
96 }
97
98 #[inline(always)]
99 fn queue_mut(&self) -> &mut SpscQueue<V> {
100 unsafe { &mut *self.queue }
101 }
102
103 #[inline(always)]
104 pub fn is_empty(&self) -> bool {
105 let queue = self.queue();
106 queue.read_next >= queue.write_next
107 }
108
109 pub fn maybe_dequeue(&mut self) -> MaybeDequeued<V> {
110 if self.is_empty() {
111 if self.queue().ended {
112 return MaybeDequeued::Ended;
113 };
114 return MaybeDequeued::None;
115 };
116 let queue = self.queue_mut();
117 let value = unsafe { ptr::read(queue.buffer.offset((queue.read_next & queue.capacity_mask) as isize)) };
118 queue.read_next += 1;
119 MaybeDequeued::Some(value)
120 }
121
122 pub fn dequeue(&mut self) -> Option<V> {
123 loop {
124 match self.maybe_dequeue() {
125 MaybeDequeued::None => {}
127 MaybeDequeued::Ended => return None,
129 MaybeDequeued::Some(v) => return Some(v),
130 };
131 };
132 }
133}
134
135pub fn create_spsc_queue<V: Send + Sync>(capacity_exponent: usize) -> (SpscQueueProducer<V>, SpscQueueConsumer<V>) {
136 let queue = Box::into_raw(Box::new(SpscQueue::<V>::new(capacity_exponent)));
137 (SpscQueueProducer { queue }, SpscQueueConsumer { queue })
138}