parcoll/
consumer.rs

1//! This module provides the [`Consumer`] and the [`LockFreeConsumer`] traits.
2use orengine_utils::hints::{unreachable_hint, unwrap_or_bug_hint};
3use orengine_utils::hints::unlikely;
4use crate::single_producer::{SingleLockFreeProducer, SingleProducer};
5use crate::LockFreePopErr;
6use std::mem::MaybeUninit;
7
8/// A consumer of a queue.
9pub trait Consumer<T> {
10    /// Returns the capacity of the queue.
11    fn capacity(&self) -> usize;
12
13    /// Returns the length of the queue.
14    fn len(&self) -> usize;
15
16    /// Returns whether the queue is empty.
17    #[inline]
18    fn is_empty(&self) -> bool {
19        self.len() == 0
20    }
21
22    /// Pops many values from the queue and returns the number of read values.
23    ///
24    /// It may be non-lock-free.
25    fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize;
26
27    /// Pops a value from the queue and returns it.
28    ///
29    /// It may be non-lock-free.
30    fn pop(&self) -> Option<T> {
31        let mut uninit_item = MaybeUninit::uninit();
32        let n = self.pop_many(unsafe { &mut *(&raw mut uninit_item).cast::<[_; 1]>() });
33
34        if n == 1 {
35            Some(unsafe { uninit_item.assume_init() })
36        } else {
37            debug_assert_eq!(n, 0, "pop_many returned more than one value for [T; 1]");
38
39            None
40        }
41    }
42
43    /// Steals some values from the consumer and places them into `dst`.
44    /// Returns the number of stolen values.
45    ///
46    /// It requires that the other queue to be empty.
47    /// Expected to steal the half of the queue,
48    /// but other implementations may steal another number of values.
49    ///
50    /// It may be non-lock-free.
51    ///
52    /// # Panics
53    ///
54    /// Panics if the other queue is not empty.
55    fn steal_into(&self, dst: &impl SingleProducer<T>) -> usize {
56        debug_assert!(
57            dst.is_empty(),
58            "steal_into requires the other queue to be empty"
59        );
60
61        let max_stolen = self.len() / 2;
62        if !cfg!(feature = "always_steal") && max_stolen < 4 || max_stolen == 0 {
63            // we don't steal less than 4 by default
64            // because else we may lose more because of cache locality and NUMA awareness
65            return 0;
66        }
67
68        let mut stolen = 0;
69        let dst_capacity = dst.capacity();
70
71        while stolen < max_stolen && stolen < dst_capacity {
72            if let Some(item) = self.pop() {
73                unwrap_or_bug_hint(dst.maybe_push(item));
74
75                stolen += 1;
76            } else {
77                break;
78            }
79        }
80
81        stolen
82    }
83}
84
85/// A lock-free consumer of a queue.
86pub trait LockFreeConsumer<T>: Consumer<T> {
87    /// Pops many values from the queue.
88    /// Returns the number of popped values and whether the operation failed
89    /// because it should wait.
90    ///
91    /// It is lock-free.
92    /// If you can lock, you can look at the [`Consumer::pop_many`] method
93    /// because if it is implemented not as lock-free, it should have better performance.
94    fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool);
95
96    /// Pops a value from the queue. On failure, returns <code>Err([`LockFreePopErr`])</code>.
97    ///
98    /// It is lock-free.
99    /// If you can lock, you can look at the [`Consumer::pop`] method
100    /// because if it is implemented not as lock-free, it should have better performance.
101    fn lock_free_pop(&self) -> Result<T, LockFreePopErr> {
102        let mut uninit_item = MaybeUninit::uninit();
103        let (n, should_wait) =
104            self.lock_free_pop_many(unsafe { &mut *(&raw mut uninit_item).cast::<[_; 1]>() });
105
106        if unlikely(should_wait) {
107            return Err(LockFreePopErr::ShouldWait);
108        }
109
110        if n == 1 {
111            Ok(unsafe { uninit_item.assume_init() })
112        } else {
113            debug_assert_eq!(
114                n, 0,
115                "lock_free_pop_many returned more than one value for [T; 1]"
116            );
117
118            Err(LockFreePopErr::Empty)
119        }
120    }
121
122    /// Steals some values from the consumer and places them into `dst`.
123    /// Returns the number of stolen values and whether the operation failed
124    /// because it should wait.
125    ///
126    /// It requires that the other queue to be empty.
127    /// Expected to steal the half of the queue,
128    /// but other implementations may steal another number of values.
129    ///
130    /// It is lock-free.
131    /// If you can lock, you can look at the [`Consumer::steal_into`] method
132    /// because if it is implemented not as lock-free, it should have better performance.
133    ///
134    /// # Panics
135    ///
136    /// Panics if the other queue is not empty.
137    fn lock_free_steal_into(&self, dst: &impl SingleLockFreeProducer<T>) -> (usize, bool) {
138        debug_assert!(
139            dst.is_empty(),
140            "steal_into requires the other queue to be empty"
141        );
142
143        let max_stolen = self.len() / 2;
144        if !cfg!(feature = "always_steal") && max_stolen < 4 || max_stolen == 0 {
145            // we don't steal less than 4 by default
146            // because else we may lose more because of cache locality and NUMA awareness
147            return (0, false);
148        }
149
150        let mut stolen = 0;
151        let dst_capacity = dst.capacity();
152
153        while stolen < max_stolen && stolen < dst_capacity {
154            match self.lock_free_pop() {
155                Ok(item) => {
156                    unwrap_or_bug_hint(dst.lock_free_maybe_push(item));
157
158                    stolen += 1;
159                }
160                Err(LockFreePopErr::Empty) => return (stolen, false),
161                Err(LockFreePopErr::ShouldWait) => return (stolen, true),
162            }
163        }
164
165        unreachable_hint()
166    }
167}