command_executor/
blocking_queue.rs

1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::{Duration, Instant};
4
5struct QueueFlags {
6    empty: bool,
7    full: bool,
8}
9
10impl QueueFlags {
11    fn new() -> QueueFlags {
12        QueueFlags {
13            empty: true,
14            full: false,
15        }
16    }
17}
18
19/// Blocking bounded queue
20///
21/// `E: Send + Sync` - the element type
22/// This is a multiple producers / multiple consumers blocking bounded queue.
23/// Reference: [Producer-Consumer](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
24pub struct BlockingQueue<E> where E: Send + Sync {
25    flags: Mutex<QueueFlags>,
26    empty: Condvar,
27    full: Condvar,
28    elements: Mutex<VecDeque<E>>,
29    capacity: usize,
30}
31
32impl<E> BlockingQueue<E> where E: Send + Sync {
33    /// Create a new queue with `size` capacity
34    /// ```
35    /// use command_executor::blocking_queue::BlockingQueue;
36    /// let q: BlockingQueue<i32> = BlockingQueue::new(4);
37    /// ```
38    pub fn new(capacity: usize) -> BlockingQueue<E> {
39        BlockingQueue::<E> {
40            flags: Mutex::new(QueueFlags::new()),
41            empty: Condvar::new(),
42            full: Condvar::new(),
43            elements: Mutex::new(VecDeque::with_capacity(capacity)),
44            capacity,
45        }
46    }
47
48    /// The current length of the queue. Note that the reported length is correct at the time
49    /// of checking, the actual length may change between the call and the access to the result
50    /// value. Should be used for diagnostic and monitoring only.
51    /// ```
52    /// use command_executor::blocking_queue::BlockingQueue;
53    /// let q: BlockingQueue<i32> = BlockingQueue::new(4);
54    /// q.enqueue(11);
55    /// assert_eq!(q.len(), 1);
56    /// ```
57    pub fn len(&self) -> usize {
58        self.elements.lock().unwrap().len()
59    }
60
61    /// The declared capacity of the queue. May be smaller than the actual capacity of the actual
62    /// storage
63    pub fn capacity(&self) -> usize {
64        self.capacity
65    }
66
67    /// Indication if the queue is empty in this point of time. Should be used for diagnostic
68    /// and monitoring only.
69    pub fn is_empty(&self) -> bool {
70        self.elements.lock().unwrap().is_empty()
71    }
72
73    /// Indication if the queue is full in this point of time. Should be used for diagnostic
74    /// and monitoring only.
75    pub fn is_full(&self) -> bool {
76        self.len() == self.capacity()
77    }
78
79    /// Wait until the queue is empty.
80    ///
81    /// Note that the empty state is temporary. This method is mostly useful when we know that no
82    /// elements are to be enqueued and we want an indication of completion.
83    pub fn wait_empty(&self, timeout: Duration) -> bool {
84        let flags_lock = &self.flags;
85        let empty = &self.empty;
86        let mut flags = flags_lock.lock().unwrap();
87        let mut t = timeout;
88        let mut start = Instant::now();
89        while !flags.empty {
90            let (f, timeout_result) = empty.wait_timeout(flags, t).unwrap();
91            {
92                flags = f;
93                if timeout_result.timed_out() {
94                    break;
95                } else {
96                    let elapsed = start.elapsed();
97                    if elapsed < t {
98                        t -= elapsed;
99                        start = Instant::now();
100                    } else {
101                        break;
102                    }
103                }
104            }
105        }
106        flags.empty
107    }
108
109    /// Enqueue an element. When the queue is full will block until space available.
110    pub fn enqueue(&self, element: E) {
111        self.try_enqueue(element, Duration::MAX);
112    }
113
114    /// Enqueue an element with timeout. When timeout is exceeded return the element to caller.
115    pub fn try_enqueue(&self, element: E, timeout: Duration) -> Option<E> {
116        let flags_lock = &self.flags;
117        let empty = &self.empty;
118        let full = &self.full;
119        let mut flags = flags_lock.lock().unwrap();
120        let mut timed_out = false;
121        let mut t = timeout;
122        let mut start = Instant::now();
123        while flags.full {
124            let (f, timeout_result) = full.wait_timeout(flags, t).unwrap();
125            {
126                flags = f;
127                if timeout_result.timed_out() {
128                    timed_out = true;
129                    break;
130                } else {
131                    let elapsed = start.elapsed();
132                    if elapsed < t {
133                        t -= elapsed;
134                        start = Instant::now();
135                    } else {
136                        timed_out = true;
137                        break;
138                    }
139                }
140            }
141        }
142
143        if timed_out {
144            Some(element)
145        } else {
146            let mut elements = self.elements.lock().unwrap();
147            elements.push_back(element);
148            flags.empty = false;
149            empty.notify_one();
150            if elements.len() == self.capacity() {
151                flags.full = true;
152                full.notify_all()
153            }
154            None
155        }
156    }
157
158    /// Dequeue an element from the queue. When the queue is empty will block until an element is
159    /// available
160    pub fn dequeue(&self) -> Option<E> {
161        self.try_dequeue(Duration::MAX)
162    }
163
164    /// Dequeue and element from the queue with timeout.
165    pub fn try_dequeue(&self, timeout: Duration) -> Option<E> {
166        let flags_lock = &self.flags;
167        let empty = &self.empty;
168        let full = &self.full;
169        let mut flags = flags_lock.lock().unwrap();
170        let mut timed_out = false;
171        let mut t = timeout;
172        let mut start = Instant::now();
173        while flags.empty {
174            let (f, timeout_result) = empty.wait_timeout(flags, t).unwrap();
175            {
176                flags = f;
177                if timeout_result.timed_out() {
178                    timed_out = true;
179                    break;
180                } else {
181                    let elapsed = start.elapsed();
182                    if elapsed < t {
183                        t -= elapsed;
184                        start = Instant::now();
185                    } else {
186                        timed_out = true;
187                        break;
188                    }
189                }
190            }
191        }
192
193        if timed_out {
194            None
195        } else {
196            let mut elements = self.elements.lock().unwrap();
197            let element = elements.pop_front();
198            flags.full = false;
199            full.notify_one();
200            if elements.len() == 0 {
201                flags.empty = true;
202                empty.notify_all();
203            }
204            element
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::sync::Arc;
212    use std::thread::Builder;
213
214    use super::*;
215
216    #[test]
217    fn test_try_dequeue() {
218        let q = BlockingQueue::<i32>::new(128);
219
220        let r = q.try_dequeue(Duration::from_millis(0));
221        assert_eq!(r, None);
222        let r = q.try_dequeue(Duration::from_millis(10));
223        assert_eq!(r, None);
224    }
225
226    #[test]
227    fn test_try_enqueue() {
228        let q = BlockingQueue::<i32>::new(128);
229        for i in 0..128 {
230            q.enqueue(i);
231        }
232
233        let r = q.try_enqueue(128, Duration::from_millis(0));
234        assert_eq!(r, Some(128));
235        let r = q.try_enqueue(128, Duration::from_millis(10));
236        assert_eq!(r, Some(128));
237    }
238
239    #[test]
240    fn test_fifo() {
241        let q = BlockingQueue::<i32>::new(128);
242        for i in 0..128 {
243            q.enqueue(i);
244        }
245
246        for i in 0..128 {
247            assert_eq!(q.dequeue().unwrap(), i);
248        }
249    }
250
251    #[test]
252    fn test_mpsc() {
253        let q = Arc::new(BlockingQueue::<(i32, i32)>::new(16));
254        let qp1 = q.clone();
255        let qp2 = q.clone();
256        let qc1 = q.clone();
257
258        let p1 = Builder::new()
259            .spawn(
260                move || {
261                    for i in 0..2048 {
262                        qp1.enqueue((1, i));
263                    }
264                }
265            );
266
267        let p2 = Builder::new()
268            .spawn(
269                move || {
270                    for i in 0..2048 {
271                        qp2.enqueue((2, i));
272                    }
273                }
274            );
275
276        let c1 = Builder::new()
277            .spawn(
278                move || {
279                    let mut collector = Vec::<(i32, i32)>::new();
280                    loop {
281                        let element = qc1.dequeue();
282                        collector.push(element.unwrap());
283                        if collector.len() == 4096 {
284                            break collector;
285                        }
286                    }
287                }
288            );
289        p1.unwrap().join().expect("failed to join producer");
290        p2.unwrap().join().expect("failed to join producer");
291
292        let mut collector = c1.unwrap().join().expect("failed to join consumer");
293        for i in 0..2048 {
294            let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
295            collector.remove(i1);
296            let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
297            collector.remove(i2);
298        }
299        assert!(collector.is_empty());
300    }
301
302    #[test]
303    fn test_mpmc() {
304        let q = Arc::new(BlockingQueue::<(i32, i32)>::new(16));
305        let qp1 = q.clone();
306        let qp2 = q.clone();
307        let qc1 = q.clone();
308        let qc2 = q.clone();
309
310        let p1 = Builder::new()
311            .spawn(
312                move || {
313                    for i in 0..2048 {
314                        qp1.enqueue((1, i));
315                    }
316                }
317            );
318
319        let p2 = Builder::new()
320            .spawn(
321                move || {
322                    for i in 0..2048 {
323                        qp2.enqueue((2, i));
324                    }
325                }
326            );
327
328        let c1 = Builder::new()
329            .spawn(
330                move || {
331                    let mut collector = Vec::<(i32, i32)>::new();
332                    loop {
333                        let element = qc1.dequeue();
334                        match element {
335                            None => {}
336                            Some((-1, -1)) => {
337                                break collector;
338                            }
339                            Some(e) => {
340                                collector.push(e);
341                            }
342                        }
343                    }
344                }
345            );
346
347        let c2 = Builder::new()
348            .spawn(
349                move || {
350                    let mut collector = Vec::<(i32, i32)>::new();
351                    loop {
352                        let element = qc2.dequeue();
353                        match element {
354                            None => {}
355                            Some((-1, -1)) => {
356                                break collector;
357                            }
358                            Some(e) => {
359                                collector.push(e);
360                            }
361                        }
362                    }
363                }
364            );
365
366        p1.unwrap().join().expect("failed to join producer");
367        p2.unwrap().join().expect("failed to join producer");
368
369        q.enqueue((-1, -1));
370        q.enqueue((-1, -1));
371
372        let mut collector1 = c1.unwrap().join().expect("failed to join consumer");
373        let mut collector2 = c2.unwrap().join().expect("failed to join consumer");
374
375        let mut collector = Vec::<(i32, i32)>::new();
376        collector.append(&mut collector1);
377        collector.append(&mut collector2);
378
379        for i in 0..2048 {
380            let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
381            collector.remove(i1);
382            let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
383            collector.remove(i2);
384        }
385        assert!(collector.is_empty());
386    }
387}