parcoll/consumer.rs
1//! This module provides the [`Consumer`] and the [`LockFreeConsumer`] traits.
2use orengine_utils::hints::{unreachable_hint, unwrap_or_bug_hint};
3use orengine_utils::hints::unlikely;
4use crate::single_producer::{SingleLockFreeProducer, SingleProducer};
5use crate::LockFreePopErr;
6use std::mem::MaybeUninit;
7
8/// A consumer of a queue.
9pub trait Consumer<T> {
10 /// Returns the capacity of the queue.
11 fn capacity(&self) -> usize;
12
13 /// Returns the length of the queue.
14 fn len(&self) -> usize;
15
16 /// Returns whether the queue is empty.
17 #[inline]
18 fn is_empty(&self) -> bool {
19 self.len() == 0
20 }
21
22 /// Pops many values from the queue and returns the number of read values.
23 ///
24 /// It may be non-lock-free.
25 fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize;
26
27 /// Pops a value from the queue and returns it.
28 ///
29 /// It may be non-lock-free.
30 fn pop(&self) -> Option<T> {
31 let mut uninit_item = MaybeUninit::uninit();
32 let n = self.pop_many(unsafe { &mut *(&raw mut uninit_item).cast::<[_; 1]>() });
33
34 if n == 1 {
35 Some(unsafe { uninit_item.assume_init() })
36 } else {
37 debug_assert_eq!(n, 0, "pop_many returned more than one value for [T; 1]");
38
39 None
40 }
41 }
42
43 /// Steals some values from the consumer and places them into `dst`.
44 /// Returns the number of stolen values.
45 ///
46 /// It requires that the other queue to be empty.
47 /// Expected to steal the half of the queue,
48 /// but other implementations may steal another number of values.
49 ///
50 /// It may be non-lock-free.
51 ///
52 /// # Panics
53 ///
54 /// Panics if the other queue is not empty.
55 fn steal_into(&self, dst: &impl SingleProducer<T>) -> usize {
56 debug_assert!(
57 dst.is_empty(),
58 "steal_into requires the other queue to be empty"
59 );
60
61 let max_stolen = self.len() / 2;
62 if !cfg!(feature = "always_steal") && max_stolen < 4 || max_stolen == 0 {
63 // we don't steal less than 4 by default
64 // because else we may lose more because of cache locality and NUMA awareness
65 return 0;
66 }
67
68 let mut stolen = 0;
69 let dst_capacity = dst.capacity();
70
71 while stolen < max_stolen && stolen < dst_capacity {
72 if let Some(item) = self.pop() {
73 unwrap_or_bug_hint(dst.maybe_push(item));
74
75 stolen += 1;
76 } else {
77 break;
78 }
79 }
80
81 stolen
82 }
83}
84
85/// A lock-free consumer of a queue.
86pub trait LockFreeConsumer<T>: Consumer<T> {
87 /// Pops many values from the queue.
88 /// Returns the number of popped values and whether the operation failed
89 /// because it should wait.
90 ///
91 /// It is lock-free.
92 /// If you can lock, you can look at the [`Consumer::pop_many`] method
93 /// because if it is implemented not as lock-free, it should have better performance.
94 fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool);
95
96 /// Pops a value from the queue. On failure, returns <code>Err([`LockFreePopErr`])</code>.
97 ///
98 /// It is lock-free.
99 /// If you can lock, you can look at the [`Consumer::pop`] method
100 /// because if it is implemented not as lock-free, it should have better performance.
101 fn lock_free_pop(&self) -> Result<T, LockFreePopErr> {
102 let mut uninit_item = MaybeUninit::uninit();
103 let (n, should_wait) =
104 self.lock_free_pop_many(unsafe { &mut *(&raw mut uninit_item).cast::<[_; 1]>() });
105
106 if unlikely(should_wait) {
107 return Err(LockFreePopErr::ShouldWait);
108 }
109
110 if n == 1 {
111 Ok(unsafe { uninit_item.assume_init() })
112 } else {
113 debug_assert_eq!(
114 n, 0,
115 "lock_free_pop_many returned more than one value for [T; 1]"
116 );
117
118 Err(LockFreePopErr::Empty)
119 }
120 }
121
122 /// Steals some values from the consumer and places them into `dst`.
123 /// Returns the number of stolen values and whether the operation failed
124 /// because it should wait.
125 ///
126 /// It requires that the other queue to be empty.
127 /// Expected to steal the half of the queue,
128 /// but other implementations may steal another number of values.
129 ///
130 /// It is lock-free.
131 /// If you can lock, you can look at the [`Consumer::steal_into`] method
132 /// because if it is implemented not as lock-free, it should have better performance.
133 ///
134 /// # Panics
135 ///
136 /// Panics if the other queue is not empty.
137 fn lock_free_steal_into(&self, dst: &impl SingleLockFreeProducer<T>) -> (usize, bool) {
138 debug_assert!(
139 dst.is_empty(),
140 "steal_into requires the other queue to be empty"
141 );
142
143 let max_stolen = self.len() / 2;
144 if !cfg!(feature = "always_steal") && max_stolen < 4 || max_stolen == 0 {
145 // we don't steal less than 4 by default
146 // because else we may lose more because of cache locality and NUMA awareness
147 return (0, false);
148 }
149
150 let mut stolen = 0;
151 let dst_capacity = dst.capacity();
152
153 while stolen < max_stolen && stolen < dst_capacity {
154 match self.lock_free_pop() {
155 Ok(item) => {
156 unwrap_or_bug_hint(dst.lock_free_maybe_push(item));
157
158 stolen += 1;
159 }
160 Err(LockFreePopErr::Empty) => return (stolen, false),
161 Err(LockFreePopErr::ShouldWait) => return (stolen, true),
162 }
163 }
164
165 unreachable_hint()
166 }
167}