command_executor/
blocking_queue.rs1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::{Duration, Instant};
4
5struct QueueFlags {
6 empty: bool,
7 full: bool,
8}
9
10impl QueueFlags {
11 fn new() -> QueueFlags {
12 QueueFlags {
13 empty: true,
14 full: false,
15 }
16 }
17}
18
19pub struct BlockingQueue<E> where E: Send + Sync {
25 flags: Mutex<QueueFlags>,
26 empty: Condvar,
27 full: Condvar,
28 elements: Mutex<VecDeque<E>>,
29 capacity: usize,
30}
31
32impl<E> BlockingQueue<E> where E: Send + Sync {
33 pub fn new(capacity: usize) -> BlockingQueue<E> {
39 BlockingQueue::<E> {
40 flags: Mutex::new(QueueFlags::new()),
41 empty: Condvar::new(),
42 full: Condvar::new(),
43 elements: Mutex::new(VecDeque::with_capacity(capacity)),
44 capacity,
45 }
46 }
47
48 pub fn len(&self) -> usize {
58 self.elements.lock().unwrap().len()
59 }
60
61 pub fn capacity(&self) -> usize {
64 self.capacity
65 }
66
67 pub fn is_empty(&self) -> bool {
70 self.elements.lock().unwrap().is_empty()
71 }
72
73 pub fn is_full(&self) -> bool {
76 self.len() == self.capacity()
77 }
78
79 pub fn wait_empty(&self, timeout: Duration) -> bool {
84 let flags_lock = &self.flags;
85 let empty = &self.empty;
86 let mut flags = flags_lock.lock().unwrap();
87 let mut t = timeout;
88 let mut start = Instant::now();
89 while !flags.empty {
90 let (f, timeout_result) = empty.wait_timeout(flags, t).unwrap();
91 {
92 flags = f;
93 if timeout_result.timed_out() {
94 break;
95 } else {
96 let elapsed = start.elapsed();
97 if elapsed < t {
98 t -= elapsed;
99 start = Instant::now();
100 } else {
101 break;
102 }
103 }
104 }
105 }
106 flags.empty
107 }
108
109 pub fn enqueue(&self, element: E) {
111 self.try_enqueue(element, Duration::MAX);
112 }
113
114 pub fn try_enqueue(&self, element: E, timeout: Duration) -> Option<E> {
116 let flags_lock = &self.flags;
117 let empty = &self.empty;
118 let full = &self.full;
119 let mut flags = flags_lock.lock().unwrap();
120 let mut timed_out = false;
121 let mut t = timeout;
122 let mut start = Instant::now();
123 while flags.full {
124 let (f, timeout_result) = full.wait_timeout(flags, t).unwrap();
125 {
126 flags = f;
127 if timeout_result.timed_out() {
128 timed_out = true;
129 break;
130 } else {
131 let elapsed = start.elapsed();
132 if elapsed < t {
133 t -= elapsed;
134 start = Instant::now();
135 } else {
136 timed_out = true;
137 break;
138 }
139 }
140 }
141 }
142
143 if timed_out {
144 Some(element)
145 } else {
146 let mut elements = self.elements.lock().unwrap();
147 elements.push_back(element);
148 flags.empty = false;
149 empty.notify_one();
150 if elements.len() == self.capacity() {
151 flags.full = true;
152 full.notify_all()
153 }
154 None
155 }
156 }
157
158 pub fn dequeue(&self) -> Option<E> {
161 self.try_dequeue(Duration::MAX)
162 }
163
164 pub fn try_dequeue(&self, timeout: Duration) -> Option<E> {
166 let flags_lock = &self.flags;
167 let empty = &self.empty;
168 let full = &self.full;
169 let mut flags = flags_lock.lock().unwrap();
170 let mut timed_out = false;
171 let mut t = timeout;
172 let mut start = Instant::now();
173 while flags.empty {
174 let (f, timeout_result) = empty.wait_timeout(flags, t).unwrap();
175 {
176 flags = f;
177 if timeout_result.timed_out() {
178 timed_out = true;
179 break;
180 } else {
181 let elapsed = start.elapsed();
182 if elapsed < t {
183 t -= elapsed;
184 start = Instant::now();
185 } else {
186 timed_out = true;
187 break;
188 }
189 }
190 }
191 }
192
193 if timed_out {
194 None
195 } else {
196 let mut elements = self.elements.lock().unwrap();
197 let element = elements.pop_front();
198 flags.full = false;
199 full.notify_one();
200 if elements.len() == 0 {
201 flags.empty = true;
202 empty.notify_all();
203 }
204 element
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::sync::Arc;
212 use std::thread::Builder;
213
214 use super::*;
215
216 #[test]
217 fn test_try_dequeue() {
218 let q = BlockingQueue::<i32>::new(128);
219
220 let r = q.try_dequeue(Duration::from_millis(0));
221 assert_eq!(r, None);
222 let r = q.try_dequeue(Duration::from_millis(10));
223 assert_eq!(r, None);
224 }
225
226 #[test]
227 fn test_try_enqueue() {
228 let q = BlockingQueue::<i32>::new(128);
229 for i in 0..128 {
230 q.enqueue(i);
231 }
232
233 let r = q.try_enqueue(128, Duration::from_millis(0));
234 assert_eq!(r, Some(128));
235 let r = q.try_enqueue(128, Duration::from_millis(10));
236 assert_eq!(r, Some(128));
237 }
238
239 #[test]
240 fn test_fifo() {
241 let q = BlockingQueue::<i32>::new(128);
242 for i in 0..128 {
243 q.enqueue(i);
244 }
245
246 for i in 0..128 {
247 assert_eq!(q.dequeue().unwrap(), i);
248 }
249 }
250
251 #[test]
252 fn test_mpsc() {
253 let q = Arc::new(BlockingQueue::<(i32, i32)>::new(16));
254 let qp1 = q.clone();
255 let qp2 = q.clone();
256 let qc1 = q.clone();
257
258 let p1 = Builder::new()
259 .spawn(
260 move || {
261 for i in 0..2048 {
262 qp1.enqueue((1, i));
263 }
264 }
265 );
266
267 let p2 = Builder::new()
268 .spawn(
269 move || {
270 for i in 0..2048 {
271 qp2.enqueue((2, i));
272 }
273 }
274 );
275
276 let c1 = Builder::new()
277 .spawn(
278 move || {
279 let mut collector = Vec::<(i32, i32)>::new();
280 loop {
281 let element = qc1.dequeue();
282 collector.push(element.unwrap());
283 if collector.len() == 4096 {
284 break collector;
285 }
286 }
287 }
288 );
289 p1.unwrap().join().expect("failed to join producer");
290 p2.unwrap().join().expect("failed to join producer");
291
292 let mut collector = c1.unwrap().join().expect("failed to join consumer");
293 for i in 0..2048 {
294 let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
295 collector.remove(i1);
296 let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
297 collector.remove(i2);
298 }
299 assert!(collector.is_empty());
300 }
301
302 #[test]
303 fn test_mpmc() {
304 let q = Arc::new(BlockingQueue::<(i32, i32)>::new(16));
305 let qp1 = q.clone();
306 let qp2 = q.clone();
307 let qc1 = q.clone();
308 let qc2 = q.clone();
309
310 let p1 = Builder::new()
311 .spawn(
312 move || {
313 for i in 0..2048 {
314 qp1.enqueue((1, i));
315 }
316 }
317 );
318
319 let p2 = Builder::new()
320 .spawn(
321 move || {
322 for i in 0..2048 {
323 qp2.enqueue((2, i));
324 }
325 }
326 );
327
328 let c1 = Builder::new()
329 .spawn(
330 move || {
331 let mut collector = Vec::<(i32, i32)>::new();
332 loop {
333 let element = qc1.dequeue();
334 match element {
335 None => {}
336 Some((-1, -1)) => {
337 break collector;
338 }
339 Some(e) => {
340 collector.push(e);
341 }
342 }
343 }
344 }
345 );
346
347 let c2 = Builder::new()
348 .spawn(
349 move || {
350 let mut collector = Vec::<(i32, i32)>::new();
351 loop {
352 let element = qc2.dequeue();
353 match element {
354 None => {}
355 Some((-1, -1)) => {
356 break collector;
357 }
358 Some(e) => {
359 collector.push(e);
360 }
361 }
362 }
363 }
364 );
365
366 p1.unwrap().join().expect("failed to join producer");
367 p2.unwrap().join().expect("failed to join producer");
368
369 q.enqueue((-1, -1));
370 q.enqueue((-1, -1));
371
372 let mut collector1 = c1.unwrap().join().expect("failed to join consumer");
373 let mut collector2 = c2.unwrap().join().expect("failed to join consumer");
374
375 let mut collector = Vec::<(i32, i32)>::new();
376 collector.append(&mut collector1);
377 collector.append(&mut collector2);
378
379 for i in 0..2048 {
380 let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
381 collector.remove(i1);
382 let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
383 collector.remove(i2);
384 }
385 assert!(collector.is_empty());
386 }
387}