lock_free/
queue.rs

1//! Simple lock-free queue using Michael & Scott algorithm.
2
3use std::sync::atomic::{AtomicPtr, Ordering};
4use std::ptr;
5
6struct Node<T> {
7    data: Option<T>,
8    next: AtomicPtr<Node<T>>,
9}
10
11/// A lock-free unbounded queue
12pub struct Queue<T> {
13    head: AtomicPtr<Node<T>>,
14    tail: AtomicPtr<Node<T>>,
15}
16
17impl<T> Queue<T> {
18    /// Creates a new empty queue
19    pub fn new() -> Self {
20        let dummy = Box::into_raw(Box::new(Node {
21            data: None,
22            next: AtomicPtr::new(ptr::null_mut()),
23        }));
24        
25        Queue {
26            head: AtomicPtr::new(dummy),
27            tail: AtomicPtr::new(dummy),
28        }
29    }
30
31    /// Enqueues an item
32    pub fn enqueue(&self, data: T) {
33        let new_node = Box::into_raw(Box::new(Node {
34            data: Some(data),
35            next: AtomicPtr::new(ptr::null_mut()),
36        }));
37
38        loop {
39            let tail = self.tail.load(Ordering::Acquire);
40            let next = unsafe { (*tail).next.load(Ordering::Acquire) };
41
42            if tail == self.tail.load(Ordering::Acquire) {
43                if next.is_null() {
44                    match unsafe { (*tail).next.compare_exchange_weak(
45                        ptr::null_mut(),
46                        new_node,
47                        Ordering::Release,
48                        Ordering::Acquire,
49                    ) } {
50                        Ok(_) => {
51                            let _ = self.tail.compare_exchange(
52                                tail,
53                                new_node,
54                                Ordering::Release,
55                                Ordering::Acquire,
56                            );
57                            return;
58                        }
59                        Err(_) => continue,
60                    }
61                } else {
62                    let _ = self.tail.compare_exchange(
63                        tail,
64                        next,
65                        Ordering::Release,
66                        Ordering::Acquire,
67                    );
68                }
69            }
70        }
71    }
72
73    /// Dequeues an item
74    pub fn dequeue(&self) -> Option<T> {
75        loop {
76            let head = self.head.load(Ordering::Acquire);
77            let tail = self.tail.load(Ordering::Acquire);
78            let next = unsafe { (*head).next.load(Ordering::Acquire) };
79
80            if head == self.head.load(Ordering::Acquire) {
81                if head == tail {
82                    if next.is_null() {
83                        return None;
84                    }
85                    let _ = self.tail.compare_exchange(
86                        tail,
87                        next,
88                        Ordering::Release,
89                        Ordering::Acquire,
90                    );
91                } else {
92                    let data = unsafe { (*next).data.take() };
93                    match self.head.compare_exchange(
94                        head,
95                        next,
96                        Ordering::Release,
97                        Ordering::Acquire,
98                    ) {
99                        Ok(_) => {
100                            unsafe { drop(Box::from_raw(head)); }
101                            return data;
102                        }
103                        Err(_) => continue,
104                    }
105                }
106            }
107        }
108    }
109
110    /// Returns true if the queue is empty
111    pub fn is_empty(&self) -> bool {
112        let head = self.head.load(Ordering::Acquire);
113        let tail = self.tail.load(Ordering::Acquire);
114        head == tail && unsafe { (*head).next.load(Ordering::Acquire).is_null() }
115    }
116}
117
118impl<T> Default for Queue<T> {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl<T> Drop for Queue<T> {
125    fn drop(&mut self) {
126        while self.dequeue().is_some() {}
127        unsafe {
128            drop(Box::from_raw(self.head.load(Ordering::Acquire)));
129        }
130    }
131}
132
133unsafe impl<T: Send> Send for Queue<T> {}
134unsafe impl<T: Send> Sync for Queue<T> {}