1#![no_std]
52
53extern crate alloc;
54
55use alloc::boxed::Box;
56use alloc::sync::Arc;
57use core::cell::Cell;
58use core::marker::PhantomData;
59use core::mem;
60use core::mem::MaybeUninit;
61use core::ops::{Deref, DerefMut};
62use core::ptr;
63use core::ptr::NonNull;
64use core::sync::atomic::{AtomicPtr, Ordering};
65
66pub struct Node<T> {
70 inner: NonNull<NodeInner<T>>,
71 phantom: PhantomData<T>,
72}
73
74unsafe impl<T: Send> Send for Node<T> {}
75unsafe impl<T: Sync> Sync for Node<T> {}
76
77struct NodeInner<T> {
78 next: AtomicPtr<NodeInner<T>>,
79 data: MaybeUninit<T>,
80}
81
82impl<T> Node<T> {
83 pub fn new(data: T) -> Node<T> {
85 Node {
86 inner: unsafe {
87 NonNull::new_unchecked(Box::into_raw(Box::new(NodeInner {
88 next: AtomicPtr::new(ptr::null_mut()),
89 data: MaybeUninit::new(data),
90 })))
91 },
92 phantom: PhantomData,
93 }
94 }
95
96 pub fn into_inner(this: Node<T>) -> T {
98 unsafe {
99 let data = ptr::read(this.inner.as_ref().data.as_ptr());
100 drop(Box::from_raw(this.inner.as_ptr()));
101 mem::forget(this);
102 data
103 }
104 }
105}
106
107impl<T> Deref for Node<T> {
108 type Target = T;
109
110 fn deref(&self) -> &Self::Target {
111 unsafe { &*self.inner.as_ref().data.as_ptr() }
112 }
113}
114
115impl<T> DerefMut for Node<T> {
116 fn deref_mut(&mut self) -> &mut Self::Target {
117 unsafe { &mut *self.inner.as_mut().data.as_mut_ptr() }
118 }
119}
120
121impl<T> Drop for Node<T> {
122 fn drop(&mut self) {
123 unsafe {
124 ptr::drop_in_place(self.inner.as_mut().data.as_mut_ptr());
125 drop(Box::from_raw(self.inner.as_ptr()));
126 }
127 }
128}
129
130pub struct Queue<T> {
132 head: Cell<*mut NodeInner<T>>,
133 phantom: PhantomData<T>,
134}
135
136unsafe impl<T: Send> Send for Queue<T> {}
137
138impl<T> Queue<T> {
139 pub fn new() -> Queue<T> {
141 let node = Box::into_raw(Box::new(NodeInner {
142 next: AtomicPtr::new(ptr::null_mut()),
143 data: MaybeUninit::uninit(),
144 }));
145
146 Queue { head: Cell::new(node), phantom: PhantomData }
147 }
148
149 pub fn split(self) -> (Producer<T>, Consumer<T>) {
151 let queue = Arc::new(self);
152
153 let producer = Producer { queue: queue.clone(), tail: queue.head.get() };
154 let consumer = Consumer { queue };
155
156 (producer, consumer)
157 }
158}
159
160impl<T> Drop for Queue<T> {
161 fn drop(&mut self) {
162 unsafe {
163 let head = self.head.get();
164 let mut current = (*head).next.load(Ordering::Relaxed);
165
166 drop(Box::from_raw(head));
167
168 while !current.is_null() {
169 let next = (*current).next.load(Ordering::Relaxed);
170 ptr::drop_in_place((*current).data.as_mut_ptr());
171 drop(Box::from_raw(current));
172 current = next;
173 }
174 }
175 }
176}
177
178pub struct Consumer<T> {
182 queue: Arc<Queue<T>>,
183}
184
185unsafe impl<T: Send> Send for Consumer<T> {}
186
187impl<T> Consumer<T> {
188 pub fn pop(&mut self) -> Option<Node<T>> {
191 unsafe {
192 let head = self.queue.head.get();
193 let next = (*head).next.load(Ordering::Acquire);
194
195 if !next.is_null() {
196 ptr::copy_nonoverlapping((*next).data.as_ptr(), (*head).data.as_mut_ptr(), 1);
197 (*head).next.store(ptr::null_mut(), Ordering::Relaxed);
198
199 self.queue.head.set(next);
200
201 return Some(Node { inner: NonNull::new_unchecked(head), phantom: PhantomData });
202 }
203
204 None
205 }
206 }
207}
208
209pub struct Producer<T> {
213 #[allow(unused)]
214 queue: Arc<Queue<T>>,
215 tail: *mut NodeInner<T>,
216}
217
218unsafe impl<T: Send> Send for Producer<T> {}
219
220impl<T> Producer<T> {
221 pub fn push(&mut self, node: Node<T>) {
223 unsafe {
224 let node_ptr = node.inner.as_ptr();
225 mem::forget(node);
226
227 let tail = &*self.tail;
228 tail.next.store(node_ptr, Ordering::Release);
229
230 self.tail = node_ptr;
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 extern crate std;
240
241 #[test]
242 fn multithreaded() {
243 let (mut producer, mut consumer) = Queue::new().split();
244
245 let thread1 = std::thread::spawn(move || {
246 for _ in 0..10000 {
247 producer.push(Node::new(false));
248 }
249
250 producer.push(Node::new(true));
251 });
252
253 let thread2 = std::thread::spawn(move || {
254 let mut counter = 0;
255
256 loop {
257 if let Some(node) = consumer.pop() {
258 if *node {
259 break;
260 } else {
261 counter += 1;
262 }
263 }
264 }
265
266 assert_eq!(counter, 10000);
267 });
268
269 thread1.join().unwrap();
270 thread2.join().unwrap();
271 }
272
273 #[test]
274 fn multiple_queues() {
275 let (mut producer1, mut consumer1) = Queue::new().split();
276 let (mut producer2, mut consumer2) = Queue::new().split();
277
278 for _ in 0..10000 {
279 producer1.push(Node::new(()));
280 }
281
282 let mut counter = 0;
283 while let Some(node) = consumer1.pop() {
284 producer2.push(node);
285 counter += 1;
286 }
287 assert_eq!(counter, 10000);
288
289 let mut counter = 0;
290 while let Some(_) = consumer2.pop() {
291 counter += 1;
292 }
293 assert_eq!(counter, 10000);
294 }
295
296 #[test]
297 fn drop_occurs() {
298 struct S(Arc<Cell<usize>>);
299
300 impl Drop for S {
301 fn drop(&mut self) {
302 self.0.set(self.0.get() + 1);
303 }
304 }
305
306 let (mut producer, mut consumer) = Queue::new().split();
307
308 let counter = Arc::new(Cell::new(0));
309
310 for _ in 0..10000 {
311 producer.push(Node::new(S(counter.clone())));
312 }
313
314 while let Some(_) = consumer.pop() {}
315
316 assert_eq!(counter.get(), 10000);
317 }
318}