1use std::sync::atomic::{AtomicPtr, Ordering};
4use std::ptr;
5
6struct Node<T> {
7 data: Option<T>,
8 next: AtomicPtr<Node<T>>,
9}
10
11pub struct Queue<T> {
13 head: AtomicPtr<Node<T>>,
14 tail: AtomicPtr<Node<T>>,
15}
16
17impl<T> Queue<T> {
18 pub fn new() -> Self {
20 let dummy = Box::into_raw(Box::new(Node {
21 data: None,
22 next: AtomicPtr::new(ptr::null_mut()),
23 }));
24
25 Queue {
26 head: AtomicPtr::new(dummy),
27 tail: AtomicPtr::new(dummy),
28 }
29 }
30
31 pub fn enqueue(&self, data: T) {
33 let new_node = Box::into_raw(Box::new(Node {
34 data: Some(data),
35 next: AtomicPtr::new(ptr::null_mut()),
36 }));
37
38 loop {
39 let tail = self.tail.load(Ordering::Acquire);
40 let next = unsafe { (*tail).next.load(Ordering::Acquire) };
41
42 if tail == self.tail.load(Ordering::Acquire) {
43 if next.is_null() {
44 match unsafe { (*tail).next.compare_exchange_weak(
45 ptr::null_mut(),
46 new_node,
47 Ordering::Release,
48 Ordering::Acquire,
49 ) } {
50 Ok(_) => {
51 let _ = self.tail.compare_exchange(
52 tail,
53 new_node,
54 Ordering::Release,
55 Ordering::Acquire,
56 );
57 return;
58 }
59 Err(_) => continue,
60 }
61 } else {
62 let _ = self.tail.compare_exchange(
63 tail,
64 next,
65 Ordering::Release,
66 Ordering::Acquire,
67 );
68 }
69 }
70 }
71 }
72
73 pub fn dequeue(&self) -> Option<T> {
75 loop {
76 let head = self.head.load(Ordering::Acquire);
77 let tail = self.tail.load(Ordering::Acquire);
78 let next = unsafe { (*head).next.load(Ordering::Acquire) };
79
80 if head == self.head.load(Ordering::Acquire) {
81 if head == tail {
82 if next.is_null() {
83 return None;
84 }
85 let _ = self.tail.compare_exchange(
86 tail,
87 next,
88 Ordering::Release,
89 Ordering::Acquire,
90 );
91 } else {
92 let data = unsafe { (*next).data.take() };
93 match self.head.compare_exchange(
94 head,
95 next,
96 Ordering::Release,
97 Ordering::Acquire,
98 ) {
99 Ok(_) => {
100 unsafe { drop(Box::from_raw(head)); }
101 return data;
102 }
103 Err(_) => continue,
104 }
105 }
106 }
107 }
108 }
109
110 pub fn is_empty(&self) -> bool {
112 let head = self.head.load(Ordering::Acquire);
113 let tail = self.tail.load(Ordering::Acquire);
114 head == tail && unsafe { (*head).next.load(Ordering::Acquire).is_null() }
115 }
116}
117
118impl<T> Default for Queue<T> {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl<T> Drop for Queue<T> {
125 fn drop(&mut self) {
126 while self.dequeue().is_some() {}
127 unsafe {
128 drop(Box::from_raw(self.head.load(Ordering::Acquire)));
129 }
130 }
131}
132
133unsafe impl<T: Send> Send for Queue<T> {}
134unsafe impl<T: Send> Sync for Queue<T> {}