faa_array_queue/lib.rs
1/******************************************************************************
2 * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
3 * All rights reserved.
4 *
5 * Rust Version by Junker Jörg
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Concurrency Freaks nor the
15 * names of its contributors may be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 ******************************************************************************
29 */
30
31use array_macro::array;
32use peril::{HazardPointer, HazardRecord, HazardRegistry, HazardValue, Ordering};
33use std::sync::atomic::{AtomicPtr, AtomicU32};
34
35#[cfg(test)]
36mod tests;
37/**
38 * <h1> Fetch-And-Add Array Queue </h1>
39 *
40 * Each node has one array but we don't search for a vacant entry. Instead, we
41 * use FAA to obtain an index in the array, for enqueueing or dequeuing.
42 *
43 * There are some similarities between this queue and the basic queue in YMC:
44 * http://chaoran.me/assets/pdf/wfq-ppopp16.pdf
45 * but it's not the same because the queue in listing 1 is obstruction-free, while
46 * our algorithm is lock-free.
47 * In FAAArrayQueue eventually a new node will be inserted (using Michael-Scott's
48 * algorithm) and it will have an item pre-filled in the first position, which means
49 * that at most, after BUFFER_SIZE steps, one item will be enqueued (and it can then
50 * be dequeued). This kind of progress is lock-free.
51 *
52 * Each entry in the array may contain one of three possible values:
53 * - A valid item that has been enqueued;
54 * - nullptr, which means no item has yet been enqueued in that position;
55 * - taken, a special value that means there was an item but it has been dequeued;
56 *
57 * Enqueue algorithm: FAA + CAS(null,item)
58 * Dequeue algorithm: FAA + CAS(item,taken)
59 * Consistency: Linearizable
60 * enqueue() progress: lock-free
61 * dequeue() progress: lock-free
62 * Memory Reclamation: Hazard Pointers (lock-free)
63 * Uncontended enqueue: 1 FAA + 1 CAS + 1 HP
64 * Uncontended dequeue: 1 FAA + 1 CAS + 1 HP
65 *
66 *
67 * <p>
68 * Lock-Free Linked List as described in Maged Michael and Michael Scott's paper:
69 * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf}
70 * <a href="http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf">
71 * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms</a>
72 * <p>
73 * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory
74 * Reclamation for Lock-Free objects" and it is available here:
75 * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf
76 *
77 * @author Pedro Ramalhete
78 * @author Andreia Correia
79 */
80
81const BUFFER_SIZE: u32 = 1024;
82const TAKEN: *mut () = 1 as *mut ();
83
84struct Node<T> {
85 deqidx: AtomicU32,
86 items: [AtomicPtr<T>; BUFFER_SIZE as usize],
87 enqidx: AtomicU32,
88 next: HazardPointer<Node<T>>,
89}
90
91impl<T: Send> Node<T> {
92 fn new(item: *mut T) -> Self {
93 let items: [AtomicPtr<T>; BUFFER_SIZE as usize] =
94 array![_ => Default::default(); BUFFER_SIZE as usize];
95 items[0].store(item, Ordering::Relaxed);
96 Node {
97 deqidx: AtomicU32::new(0),
98 items,
99 enqidx: AtomicU32::new(1),
100 next: HazardPointer::new(HazardValue::dummy(0)),
101 }
102 }
103
104 fn cas_next<'registry>(
105 &self,
106 registry: &'registry HazardRegistry<Node<T>>,
107 cmp: HazardValue<Node<T>>,
108 val: HazardValue<Node<T>>,
109 ) -> bool {
110 self.next
111 .compare_exchange(registry, cmp, val, Ordering::AcqRel, Ordering::Relaxed)
112 .is_ok()
113 }
114}
115#[repr(align(128))]
116struct AlignedHazardPtr<T: Send>(HazardPointer<T>);
117
118/// a lock free mpmc queue
119///
120/// # Examples
121///
122/// ```
123/// use faa_array_queue::FaaArrayQueue;
124///
125/// let queue = FaaArrayQueue::<usize>::default();
126/// ```
127pub struct FaaArrayQueue<T: Send> {
128 registry: HazardRegistry<Node<T>>,
129 head: AlignedHazardPtr<Node<T>>,
130 tail: AlignedHazardPtr<Node<T>>,
131}
132
133impl<T: Send> Drop for FaaArrayQueue<T> {
134 fn drop(&mut self) {
135 loop {
136 if self.dequeue().is_none() {
137 break;
138 }
139 }
140 }
141}
142
143impl<T: Send> Default for FaaArrayQueue<T> {
144 fn default() -> Self {
145 let registry = HazardRegistry::default();
146 let sentinel = HazardValue::boxed(Node::new(std::ptr::null_mut()));
147 let head = AlignedHazardPtr(HazardPointer::new(sentinel.clone()));
148 let tail = AlignedHazardPtr(HazardPointer::new(sentinel));
149 FaaArrayQueue {
150 registry,
151 head,
152 tail,
153 }
154 }
155}
156
157impl<T: Send> FaaArrayQueue<T> {
158 /// add an item to the tail of the queue
159 ///
160 /// # Arguments
161 ///
162 /// * `item` - the item to add
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// use faa_array_queue::FaaArrayQueue;
168 ///
169 /// let queue = FaaArrayQueue::<usize>::default();
170 /// queue.enqueue(1337);
171 /// assert!(queue.dequeue().unwrap() == 1337);
172 /// ```
173 pub fn enqueue(&self, item: T) {
174 let item = Box::new(item);
175 let item = Box::into_raw(item);
176 let mut record = HazardRecord::default();
177 loop {
178 let scope = self.tail.0.protect(&self.registry, &mut record);
179 let ltail = scope.as_ref().unwrap();
180 let idx = ltail.enqidx.fetch_add(1, Ordering::AcqRel);
181 if idx > (BUFFER_SIZE - 1) {
182 // This node is full
183 if scope.changed(Ordering::Acquire) {
184 drop(scope);
185 continue;
186 }
187
188 let lnext = {
189 let mut record2 = HazardRecord::default();
190 let scope2 = ltail.next.protect(&self.registry, &mut record2);
191 scope2.clone_value()
192 };
193
194 if lnext.is_dummy() {
195 let new_node = HazardValue::boxed(Node::new(item));
196 let cloned_node = new_node.clone();
197 if ltail.cas_next(&self.registry, HazardValue::dummy(0), new_node) {
198 let _ = scope.compare_exchange(
199 cloned_node,
200 Ordering::AcqRel,
201 Ordering::Relaxed,
202 );
203 return;
204 }
205 } else {
206 let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
207 }
208 continue;
209 }
210
211 if ltail.items[idx as usize]
212 .compare_exchange(
213 std::ptr::null_mut(),
214 item,
215 Ordering::AcqRel,
216 Ordering::Relaxed,
217 )
218 .is_ok()
219 {
220 return;
221 }
222 }
223 }
224
225 /// remove an item from the head of the queue
226 ///
227 /// # Arguments
228 ///
229 /// * `return` - Some item removed or None if the queue is empty
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use faa_array_queue::FaaArrayQueue;
235 ///
236 /// let queue = FaaArrayQueue::<usize>::default();
237 /// assert!(queue.dequeue().is_none());
238 /// ```
239 pub fn dequeue(&self) -> Option<T> {
240 let mut record = HazardRecord::default();
241 loop {
242 let scope = self.head.0.protect(&self.registry, &mut record);
243 let lhead = scope.as_ref().unwrap();
244
245 if lhead.deqidx.load(Ordering::Acquire) >= lhead.enqidx.load(Ordering::Acquire)
246 && lhead.next.get_dummy(Ordering::Acquire).is_none()
247 {
248 break;
249 }
250 let idx = lhead.deqidx.fetch_add(1, Ordering::AcqRel);
251 if idx > (BUFFER_SIZE - 1) {
252 // This node has been drained, check if there is another one
253 let lnext = {
254 let mut record2 = HazardRecord::default();
255 let scope2 = lhead.next.protect(&self.registry, &mut record2);
256 scope2.clone_value()
257 };
258 if lnext.is_dummy() {
259 break; // No more nodes in the queue
260 }
261
262 let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
263 continue;
264 }
265
266 let item = lhead.items[idx as usize].swap(TAKEN as *mut T, Ordering::AcqRel);
267 if item == std::ptr::null_mut() {
268 continue;
269 }
270 let item = unsafe { Box::from_raw(item) };
271 return Some(*item);
272 }
273 None
274 }
275}