may_queue/
mpsc_list_v1.rs

1use std::cell::UnsafeCell;
2use std::ptr;
3use std::sync::atomic::{AtomicPtr, Ordering};
4
5use crossbeam_utils::{Backoff, CachePadded};
6
7struct Node<T> {
8    prev: *mut Node<T>,
9    next: AtomicPtr<Node<T>>,
10    value: Option<T>,
11    refs: usize,
12}
13// linked bit is MSB, ref count is 2 for handle and list
14const REF_INIT: usize = 0x1000_0002;
15const REF_COUNT_MASK: usize = 0x0FFF_FFFF;
16
17impl<T> Node<T> {
18    unsafe fn new(v: Option<T>) -> *mut Node<T> {
19        Box::into_raw(Box::new(Node {
20            prev: ptr::null_mut(),
21            next: AtomicPtr::new(ptr::null_mut()),
22            value: v,
23            refs: REF_INIT,
24        }))
25    }
26}
27
28pub struct Entry<T>(ptr::NonNull<Node<T>>);
29
30unsafe impl<T: Sync> Sync for Entry<T> {}
31
32impl<T> Entry<T> {
33    /// get the internal data mut ref
34    /// # Safety
35    ///
36    /// must make sure it's not popped by the consumer
37    #[inline]
38    pub unsafe fn with_mut_data<F>(&self, f: F)
39    where
40        F: FnOnce(&mut T),
41    {
42        let node = &mut *self.0.as_ptr();
43        let data = node.value.as_mut().expect("Node value is None");
44        f(data);
45    }
46
47    /// judge if the node is still linked in the list
48    #[inline]
49    pub fn is_link(&self) -> bool {
50        let node = unsafe { &mut *self.0.as_ptr() };
51        node.refs & !REF_COUNT_MASK != 0
52    }
53
54    #[inline]
55    pub fn into_ptr(self) -> *mut Self {
56        let ret = self.0.as_ptr() as *mut Self;
57        ::std::mem::forget(self);
58        ret
59    }
60
61    #[inline]
62    /// # Safety
63    ///
64    /// Must use the ptr that from `Entry::into_ptr`
65    pub unsafe fn from_ptr(ptr: *mut Self) -> Self {
66        Entry(ptr::NonNull::new_unchecked(ptr as *mut Node<T>))
67    }
68
69    // remove the entry from it's list and return the contained value
70    // it's only safe for the consumer that call pop()
71    pub fn remove(mut self) -> Option<T> {
72        unsafe {
73            let node = self.0.as_mut();
74
75            // when the link bit is cleared, next and prev is no longer valid
76            if node.refs & !REF_COUNT_MASK == 0 {
77                // already removed
78                return None;
79            }
80
81            // this is a new tail just return
82            if node.prev.is_null() {
83                return None;
84            }
85
86            let next = node.next.load(Ordering::Acquire);
87            let prev = &mut *node.prev;
88
89            // here we must make sure the next is not equal to null
90            // other thread may modify the next value if it's null
91            // it's safe to remove the node that between tail and head
92            // but not safe to remove the last node since it's volatile
93            // when next is null, the remove takes no action
94            // and expect pop() would eventually consume the data
95            // this is mainly used in the timer list, so it's rarely
96            // the next is not contention for that we have wait some time already
97            // leave the last node not removed also persist the queue for a while
98            // that prevent frequent queue create and destroy
99            if !next.is_null() {
100                // clear the link bit
101                node.refs &= REF_COUNT_MASK;
102
103                // this is not the last node, just unlink it
104                (*next).prev = prev;
105                prev.next.store(next, Ordering::Release);
106
107                let ret = node.value.take();
108
109                // since self is not dropped, below is always false
110                node.refs -= 1;
111                if node.refs == 0 {
112                    // release the node only when the ref count becomes 0
113                    let _: Box<Node<T>> = Box::from_raw(node);
114                }
115
116                return ret;
117            }
118        }
119
120        None
121    }
122}
123
124impl<T> Drop for Entry<T> {
125    // only call this drop in the same thread, or you must make sure it happens with no contention
126    // running in a coroutine is a kind of sequential operation, so it can safely drop there after
127    // returning from "kernel"
128    fn drop(&mut self) {
129        let node = unsafe { self.0.as_mut() };
130        // dec the ref count of node
131        node.refs -= 1;
132        if node.refs == 0 {
133            // release the node
134            let _: Box<Node<T>> = unsafe { Box::from_raw(node) };
135        }
136    }
137}
138
139unsafe impl<T: Send> Send for Entry<T> {}
140
141/// The multi-producer single-consumer structure. This is not cloneable, but it
142/// may be safely shared so long as it is guaranteed that there is only one
143/// popper at a time (many pushers are allowed).
144pub struct Queue<T> {
145    head: CachePadded<AtomicPtr<Node<T>>>,
146    tail: UnsafeCell<*mut Node<T>>,
147}
148
149unsafe impl<T: Send> Send for Queue<T> {}
150unsafe impl<T: Send> Sync for Queue<T> {}
151
152impl<T> Queue<T> {
153    /// Creates a new queue that is safe to share among multiple producers and
154    /// one consumer.
155    pub fn new() -> Queue<T> {
156        let stub = unsafe { Node::new(None) };
157        // there is no handle for the node, so it's ref should be 1 now
158        unsafe { &mut *stub }.refs = 1;
159        Queue {
160            head: AtomicPtr::new(stub).into(),
161            tail: UnsafeCell::new(stub),
162        }
163    }
164
165    /// Pushes a new value onto this queue.
166    /// if the new node is head, indicate a true
167    /// this is used to update the BH if it's a new head
168    pub fn push(&self, t: T) -> (Entry<T>, bool) {
169        unsafe {
170            let node = Node::new(Some(t));
171            let prev = self.head.swap(node, Ordering::AcqRel);
172            (*node).prev = prev;
173            (*prev).next.store(node, Ordering::Release);
174            let tail = *self.tail.get();
175            let is_head = tail == prev;
176            (Entry(ptr::NonNull::new_unchecked(node)), is_head)
177        }
178    }
179
180    /// if the queue is empty
181    #[inline]
182    pub fn is_empty(&self) -> bool {
183        let tail = unsafe { *self.tail.get() };
184        // the list is empty
185        self.head.load(Ordering::Acquire) == tail
186    }
187
188    /// get the head ref
189    /// # Safety
190    /// the if you pop the head, it's unsafe hold the head ref
191    #[inline]
192    pub unsafe fn peek(&self) -> Option<&T> {
193        let tail = *self.tail.get();
194        // the list is empty
195        if self.head.load(Ordering::Acquire) == tail {
196            return None;
197        }
198        // spin until tail next become non-null
199        let mut next;
200        let backoff = Backoff::new();
201        loop {
202            next = (*tail).next.load(Ordering::Acquire);
203            if !next.is_null() {
204                break;
205            }
206            backoff.snooze();
207        }
208
209        assert!((*tail).value.is_none());
210        assert!((*next).value.is_some());
211
212        (*next).value.as_ref()
213    }
214
215    pub fn pop_if<F>(&self, f: &F) -> Option<T>
216    where
217        F: Fn(&T) -> bool,
218    {
219        unsafe {
220            let tail = *self.tail.get();
221            // the list is empty
222            if self.head.load(Ordering::Acquire) == tail {
223                return None;
224            }
225
226            // spin until tail next become non-null
227            let mut next;
228            let backoff = Backoff::new();
229            loop {
230                next = (*tail).next.load(Ordering::Acquire);
231                if !next.is_null() {
232                    break;
233                }
234                backoff.snooze();
235            }
236
237            assert!((*tail).value.is_none());
238            assert!((*next).value.is_some());
239
240            let v = (*next).value.as_ref().unwrap();
241            if !f(v) {
242                // no pop
243                return None;
244            }
245
246            // clear the link bit
247            assert!((*tail).refs & REF_COUNT_MASK != 0);
248            (*tail).refs &= REF_COUNT_MASK;
249
250            // clear the prev pointer indicate a new end point
251            (*next).prev = ptr::null_mut();
252            // move the tail to next
253            *self.tail.get() = next;
254
255            // we take the next value, this is why use option to host the value
256            let ret = (*next).value.take().unwrap();
257            (*tail).refs -= 1;
258            if (*tail).refs == 0 {
259                // release the node only when the ref count becomes 0
260                let _: Box<Node<T>> = Box::from_raw(tail);
261            }
262
263            Some(ret)
264        }
265    }
266
267    /// Pops some data from this queue.
268    pub fn pop(&self) -> Option<T> {
269        unsafe {
270            let tail = *self.tail.get();
271
272            // the list is empty
273            if self.head.load(Ordering::Acquire) == tail {
274                return None;
275            }
276
277            // clear the link bit
278            assert!((*tail).refs & REF_COUNT_MASK != 0);
279            (*tail).refs &= REF_COUNT_MASK;
280
281            // spin until tail next become non-null
282            let mut next;
283            let backoff = Backoff::new();
284            loop {
285                next = (*tail).next.load(Ordering::Acquire);
286                if !next.is_null() {
287                    break;
288                }
289                backoff.snooze();
290            }
291            (*next).prev = ptr::null_mut();
292            // move the tail to next
293            *self.tail.get() = next;
294
295            assert!((*tail).value.is_none());
296            assert!((*next).value.is_some());
297            // we tack the next value, this is why use option to host the value
298            let ret = (*next).value.take().unwrap();
299            (*tail).refs -= 1;
300            if (*tail).refs == 0 {
301                // release the node only when the ref count becomes 0
302                let _: Box<Node<T>> = Box::from_raw(tail);
303            }
304
305            Some(ret)
306        }
307    }
308}
309
310impl<T> Default for Queue<T> {
311    fn default() -> Self {
312        Queue::new()
313    }
314}
315
316impl<T> Drop for Queue<T> {
317    fn drop(&mut self) {
318        while self.pop().is_some() {}
319        // release the stub
320        let _: Box<Node<T>> = unsafe { Box::from_raw(*self.tail.get()) };
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use std::sync::mpsc::channel;
328    use std::sync::Arc;
329    use std::thread;
330
331    #[test]
332    fn test_queue() {
333        let q: Queue<usize> = Queue::new();
334        assert_eq!(q.pop(), None);
335        q.push(1);
336        q.push(2);
337        assert_eq!(q.pop(), Some(1));
338        assert_eq!(q.pop(), Some(2));
339        assert!(q.is_empty());
340        let a = q.push(3);
341        let b = q.push(4);
342        assert!(a.1);
343        assert_eq!(a.0.remove(), Some(3));
344        assert!(!b.1);
345        assert_eq!(b.0.remove(), None);
346        assert_eq!(q.pop(), Some(4));
347        assert!(q.is_empty());
348
349        q.push(5);
350        q.push(6);
351        q.push(7);
352        let co = |v: &usize| *v < 7;
353        assert_eq!(unsafe { q.peek() }, Some(&5));
354        assert_eq!(q.pop_if(&co), Some(5));
355        assert_eq!(q.pop_if(&co), Some(6));
356        assert_eq!(q.pop_if(&co), None);
357        assert_eq!(q.pop(), Some(7));
358    }
359
360    #[test]
361    fn test() {
362        let nthreads = 8;
363        let nmsgs = 1000;
364        let q = Queue::new();
365        match q.pop() {
366            None => {}
367            Some(..) => panic!(),
368        }
369        let (tx, rx) = channel();
370        let q = Arc::new(q);
371
372        for _ in 0..nthreads {
373            let tx = tx.clone();
374            let q = q.clone();
375            thread::spawn(move || {
376                for i in 0..nmsgs {
377                    q.push(i);
378                }
379                tx.send(()).unwrap();
380            });
381        }
382
383        let mut i = 0;
384        while i < nthreads * nmsgs {
385            match q.pop() {
386                None => {}
387                Some(_) => i += 1,
388            }
389        }
390        drop(tx);
391        for _ in 0..nthreads {
392            rx.recv().unwrap();
393        }
394    }
395}