command_executor/
crossbeam_blocking_queue.rs1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crossbeam::queue::ArrayQueue;
5
6pub struct CrossbeamBlockingQueue<E> where E: Send + Sync {
7 elements: Arc<ArrayQueue<E>>,
8}
9
10impl<E> CrossbeamBlockingQueue<E> where E: Send + Sync {
11 pub fn new(size: usize) -> CrossbeamBlockingQueue<E> {
12 CrossbeamBlockingQueue::<E> {
13 elements: Arc::new(ArrayQueue::new(size)),
14 }
15 }
16
17 pub fn len(&self) -> usize {
18 self.elements.len()
19 }
20
21 pub fn capacity(&self) -> usize {
22 self.elements.capacity()
23 }
24
25 pub fn is_empty(&self) -> bool {
26 self.elements.is_empty()
27 }
28
29 pub fn is_full(&self) -> bool {
30 self.elements.is_full()
31 }
32
33 pub fn wait_empty(&self, timeout: Duration) -> bool {
34 let backoff = crossbeam::utils::Backoff::new();
35 let mut t = timeout;
36 let mut start = Instant::now();
37 while !self.elements.is_empty() {
38 let elapsed = start.elapsed();
39 if elapsed < t {
40 t -= elapsed;
41 start = Instant::now();
42 } else {
43 break;
44 }
45 backoff.spin();
46 }
47 self.elements.is_empty()
48 }
49
50 pub fn enqueue(&self, element: E) {
51 self.try_enqueue(element, Duration::MAX);
52 }
53
54 pub fn try_enqueue(&self, element: E, timeout: Duration) -> Option<E> {
55 let backoff = crossbeam::utils::Backoff::new();
56 let mut t = timeout;
57 let mut start = Instant::now();
58 let mut e = element;
59 loop {
60 let result = self.elements.push(e);
61 match result {
62 Ok(_) => {
63 break None;
64 }
65 Err(element) => {
66 e = element;
67 let elapsed = start.elapsed();
68 if elapsed < t {
69 t -= elapsed;
70 start = Instant::now();
71 } else {
72 break Some(e);
73 }
74 backoff.spin();
75 }
76 }
77 }
78 }
79
80 pub fn dequeue(&self) -> Option<E> {
81 self.try_dequeue(Duration::MAX)
82 }
83
84 pub fn try_dequeue(&self, timeout: Duration) -> Option<E> {
85 let backoff = crossbeam::utils::Backoff::new();
86 let mut t = timeout;
87 let mut start = Instant::now();
88 loop {
89 let element = self.elements.pop();
90 if element.is_none() {
91 let elapsed = start.elapsed();
92 if elapsed < t {
93 t -= elapsed;
94 start = Instant::now();
95 } else {
96 break None;
97 }
98 } else {
99 break element;
100 }
101 backoff.spin();
102 }
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::thread::Builder;
109
110 use super::*;
111
112 #[test]
113 fn test_try_dequeue() {
114 let q = CrossbeamBlockingQueue::<i32>::new(128);
115
116 let r = q.try_dequeue(Duration::from_millis(0));
117 assert_eq!(r, None);
118 let r = q.try_dequeue(Duration::from_millis(10));
119 assert_eq!(r, None);
120 }
121
122 #[test]
123 fn test_try_enqueue() {
124 let q = CrossbeamBlockingQueue::<i32>::new(128);
125 for i in 0..128 {
126 q.enqueue(i);
127 }
128
129 let r = q.try_enqueue(128, Duration::from_millis(0));
130 assert_eq!(r, Some(128));
131 let r = q.try_enqueue(128, Duration::from_millis(10));
132 assert_eq!(r, Some(128));
133 }
134
135 #[test]
136 fn test_fifo() {
137 let q = CrossbeamBlockingQueue::<i32>::new(128);
138 for i in 0..128 {
139 q.enqueue(i);
140 }
141
142 for i in 0..128 {
143 assert_eq!(q.dequeue().unwrap(), i);
144 }
145 }
146
147 #[test]
148 fn test_mpsc() {
149 let q = Arc::new(CrossbeamBlockingQueue::<(i32, i32)>::new(16));
150 let qp1 = q.clone();
151 let qp2 = q.clone();
152 let qc1 = q.clone();
153
154 let p1 = Builder::new()
155 .spawn(
156 move || {
157 for i in 0..2048 {
158 qp1.enqueue((1, i));
159 }
160 }
161 );
162
163 let p2 = Builder::new()
164 .spawn(
165 move || {
166 for i in 0..2048 {
167 qp2.enqueue((2, i));
168 }
169 }
170 );
171
172 let c1 = Builder::new()
173 .spawn(
174 move || {
175 let mut collector = Vec::<(i32, i32)>::new();
176 loop {
177 let element = qc1.dequeue();
178 collector.push(element.unwrap());
179 if collector.len() == 4096 {
180 break collector;
181 }
182 }
183 }
184 );
185 p1.unwrap().join().expect("failed to join producer");
186 p2.unwrap().join().expect("failed to join producer");
187
188 let mut collector = c1.unwrap().join().expect("failed to join consumer");
189 for i in 0..2048 {
190 let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
191 collector.remove(i1);
192 let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
193 collector.remove(i2);
194 }
195 assert!(collector.is_empty());
196 }
197
198 #[test]
199 fn test_mpmc() {
200 let q = Arc::new(CrossbeamBlockingQueue::<(i32, i32)>::new(16));
201 let qp1 = q.clone();
202 let qp2 = q.clone();
203 let qc1 = q.clone();
204 let qc2 = q.clone();
205
206 let p1 = Builder::new()
207 .spawn(
208 move || {
209 for i in 0..2048 {
210 qp1.enqueue((1, i));
211 }
212 }
213 );
214
215 let p2 = Builder::new()
216 .spawn(
217 move || {
218 for i in 0..2048 {
219 qp2.enqueue((2, i));
220 }
221 }
222 );
223
224 let c1 = Builder::new()
225 .spawn(
226 move || {
227 let mut collector = Vec::<(i32, i32)>::new();
228 loop {
229 let element = qc1.dequeue();
230 match element {
231 None => {}
232 Some((-1, -1)) => {
233 break collector;
234 }
235 Some(e) => {
236 collector.push(e);
237 }
238 }
239 }
240 }
241 );
242
243 let c2 = Builder::new()
244 .spawn(
245 move || {
246 let mut collector = Vec::<(i32, i32)>::new();
247 loop {
248 let element = qc2.dequeue();
249 match element {
250 None => {}
251 Some((-1, -1)) => {
252 break collector;
253 }
254 Some(e) => {
255 collector.push(e);
256 }
257 }
258 }
259 }
260 );
261 p1.unwrap().join().expect("failed to join producer");
262 p2.unwrap().join().expect("failed to join producer");
263
264 q.enqueue((-1, -1));
265 q.enqueue((-1, -1));
266
267 let mut collector1 = c1.unwrap().join().expect("failed to join consumer");
268 let mut collector2 = c2.unwrap().join().expect("failed to join consumer");
269
270 let mut collector = Vec::<(i32, i32)>::new();
271 collector.append(&mut collector1);
272 collector.append(&mut collector2);
273
274 for i in 0..2048 {
275 let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
276 collector.remove(i1);
277 let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
278 collector.remove(i2);
279 }
280 assert!(collector.is_empty());
281 }
282}