faa_array_queue/
lib.rs

1/******************************************************************************
2 * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
3 * All rights reserved.
4 *
5 * Rust Version by Junker Jörg
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *     * Redistributions of source code must retain the above copyright
10 *       notice, this list of conditions and the following disclaimer.
11 *     * Redistributions in binary form must reproduce the above copyright
12 *       notice, this list of conditions and the following disclaimer in the
13 *       documentation and/or other materials provided with the distribution.
14 *     * Neither the name of Concurrency Freaks nor the
15 *       names of its contributors may be used to endorse or promote products
16 *       derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 ******************************************************************************
29 */
30
31use array_macro::array;
32use peril::{HazardPointer, HazardRecord, HazardRegistry, HazardValue, Ordering};
33use std::sync::atomic::{AtomicPtr, AtomicU32};
34
35#[cfg(test)]
36mod tests;
37/**
38 * <h1> Fetch-And-Add Array Queue </h1>
39 *
40 * Each node has one array but we don't search for a vacant entry. Instead, we
41 * use FAA to obtain an index in the array, for enqueueing or dequeuing.
42 *
43 * There are some similarities between this queue and the basic queue in YMC:
44 * http://chaoran.me/assets/pdf/wfq-ppopp16.pdf
45 * but it's not the same because the queue in listing 1 is obstruction-free, while
46 * our algorithm is lock-free.
47 * In FAAArrayQueue eventually a new node will be inserted (using Michael-Scott's
48 * algorithm) and it will have an item pre-filled in the first position, which means
49 * that at most, after BUFFER_SIZE steps, one item will be enqueued (and it can then
50 * be dequeued). This kind of progress is lock-free.
51 *
52 * Each entry in the array may contain one of three possible values:
53 * - A valid item that has been enqueued;
54 * - nullptr, which means no item has yet been enqueued in that position;
55 * - taken, a special value that means there was an item but it has been dequeued;
56 *
57 * Enqueue algorithm: FAA + CAS(null,item)
58 * Dequeue algorithm: FAA + CAS(item,taken)
59 * Consistency: Linearizable
60 * enqueue() progress: lock-free
61 * dequeue() progress: lock-free
62 * Memory Reclamation: Hazard Pointers (lock-free)
63 * Uncontended enqueue: 1 FAA + 1 CAS + 1 HP
64 * Uncontended dequeue: 1 FAA + 1 CAS + 1 HP
65 *
66 *
67 * <p>
68 * Lock-Free Linked List as described in Maged Michael and Michael Scott's paper:
69 * {@link http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf}
70 * <a href="http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf">
71 * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms</a>
72 * <p>
73 * The paper on Hazard Pointers is named "Hazard Pointers: Safe Memory
74 * Reclamation for Lock-Free objects" and it is available here:
75 * http://web.cecs.pdx.edu/~walpole/class/cs510/papers/11.pdf
76 *
77 * @author Pedro Ramalhete
78 * @author Andreia Correia
79 */
80
81const BUFFER_SIZE: u32 = 1024;
82const TAKEN: *mut () = 1 as *mut ();
83
84struct Node<T> {
85    deqidx: AtomicU32,
86    items: [AtomicPtr<T>; BUFFER_SIZE as usize],
87    enqidx: AtomicU32,
88    next: HazardPointer<Node<T>>,
89}
90
91impl<T: Send> Node<T> {
92    fn new(item: *mut T) -> Self {
93        let items: [AtomicPtr<T>; BUFFER_SIZE as usize] =
94            array![_ => Default::default(); BUFFER_SIZE as usize];
95        items[0].store(item, Ordering::Relaxed);
96        Node {
97            deqidx: AtomicU32::new(0),
98            items,
99            enqidx: AtomicU32::new(1),
100            next: HazardPointer::new(HazardValue::dummy(0)),
101        }
102    }
103
104    fn cas_next<'registry>(
105        &self,
106        registry: &'registry HazardRegistry<Node<T>>,
107        cmp: HazardValue<Node<T>>,
108        val: HazardValue<Node<T>>,
109    ) -> bool {
110        self.next
111            .compare_exchange(registry, cmp, val, Ordering::AcqRel, Ordering::Relaxed)
112            .is_ok()
113    }
114}
115#[repr(align(128))]
116struct AlignedHazardPtr<T: Send>(HazardPointer<T>);
117
118/// a lock free mpmc queue
119///
120/// # Examples
121///
122/// ```
123/// use faa_array_queue::FaaArrayQueue;
124///
125/// let queue = FaaArrayQueue::<usize>::default();
126/// ```
127pub struct FaaArrayQueue<T: Send> {
128    registry: HazardRegistry<Node<T>>,
129    head: AlignedHazardPtr<Node<T>>,
130    tail: AlignedHazardPtr<Node<T>>,
131}
132
133impl<T: Send> Drop for FaaArrayQueue<T> {
134    fn drop(&mut self) {
135        loop {
136            if self.dequeue().is_none() {
137                break;
138            }
139        }
140    }
141}
142
143impl<T: Send> Default for FaaArrayQueue<T> {
144    fn default() -> Self {
145        let registry = HazardRegistry::default();
146        let sentinel = HazardValue::boxed(Node::new(std::ptr::null_mut()));
147        let head = AlignedHazardPtr(HazardPointer::new(sentinel.clone()));
148        let tail = AlignedHazardPtr(HazardPointer::new(sentinel));
149        FaaArrayQueue {
150            registry,
151            head,
152            tail,
153        }
154    }
155}
156
157impl<T: Send> FaaArrayQueue<T> {
158    /// add an item to the tail of the queue
159    ///
160    /// # Arguments
161    ///
162    /// * `item` - the item to add
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use faa_array_queue::FaaArrayQueue;
168    ///
169    /// let queue = FaaArrayQueue::<usize>::default();
170    /// queue.enqueue(1337);
171    /// assert!(queue.dequeue().unwrap() == 1337);
172    /// ```
173    pub fn enqueue(&self, item: T) {
174        let item = Box::new(item);
175        let item = Box::into_raw(item);
176        let mut record = HazardRecord::default();
177        loop {
178            let scope = self.tail.0.protect(&self.registry, &mut record);
179            let ltail = scope.as_ref().unwrap();
180            let idx = ltail.enqidx.fetch_add(1, Ordering::AcqRel);
181            if idx > (BUFFER_SIZE - 1) {
182                // This node is full
183                if scope.changed(Ordering::Acquire) {
184                    drop(scope);
185                    continue;
186                }
187
188                let lnext = {
189                    let mut record2 = HazardRecord::default();
190                    let scope2 = ltail.next.protect(&self.registry, &mut record2);
191                    scope2.clone_value()
192                };
193
194                if lnext.is_dummy() {
195                    let new_node = HazardValue::boxed(Node::new(item));
196                    let cloned_node = new_node.clone();
197                    if ltail.cas_next(&self.registry, HazardValue::dummy(0), new_node) {
198                        let _ = scope.compare_exchange(
199                            cloned_node,
200                            Ordering::AcqRel,
201                            Ordering::Relaxed,
202                        );
203                        return;
204                    }
205                } else {
206                    let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
207                }
208                continue;
209            }
210
211            if ltail.items[idx as usize]
212                .compare_exchange(
213                    std::ptr::null_mut(),
214                    item,
215                    Ordering::AcqRel,
216                    Ordering::Relaxed,
217                )
218                .is_ok()
219            {
220                return;
221            }
222        }
223    }
224
225    /// remove an item from the head of the queue
226    ///
227    /// # Arguments
228    ///
229    /// * `return` - Some item removed or None if the queue is empty
230    ///
231    /// # Examples
232    ///
233    /// ```
234    /// use faa_array_queue::FaaArrayQueue;
235    ///
236    /// let queue = FaaArrayQueue::<usize>::default();
237    /// assert!(queue.dequeue().is_none());
238    /// ```
239    pub fn dequeue(&self) -> Option<T> {
240        let mut record = HazardRecord::default();
241        loop {
242            let scope = self.head.0.protect(&self.registry, &mut record);
243            let lhead = scope.as_ref().unwrap();
244
245            if lhead.deqidx.load(Ordering::Acquire) >= lhead.enqidx.load(Ordering::Acquire)
246                && lhead.next.get_dummy(Ordering::Acquire).is_none()
247            {
248                break;
249            }
250            let idx = lhead.deqidx.fetch_add(1, Ordering::AcqRel);
251            if idx > (BUFFER_SIZE - 1) {
252                // This node has been drained, check if there is another one
253                let lnext = {
254                    let mut record2 = HazardRecord::default();
255                    let scope2 = lhead.next.protect(&self.registry, &mut record2);
256                    scope2.clone_value()
257                };
258                if lnext.is_dummy() {
259                    break; // No more nodes in the queue
260                }
261
262                let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
263                continue;
264            }
265
266            let item = lhead.items[idx as usize].swap(TAKEN as *mut T, Ordering::AcqRel);
267            if item == std::ptr::null_mut() {
268                continue;
269            }
270            let item = unsafe { Box::from_raw(item) };
271            return Some(*item);
272        }
273        None
274    }
275}