cargo/util/
queue.rs

1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5/// A simple, threadsafe, queue of items of type `T`
6///
7/// This is a sort of channel where any thread can push to a queue and any
8/// thread can pop from a queue.
9///
10/// This supports both bounded and unbounded operations. `push` will never block,
11/// and allows the queue to grow without bounds. `push_bounded` will block if the
12/// queue is over capacity, and will resume once there is enough capacity.
13pub struct Queue<T> {
14    state: Mutex<State<T>>,
15    popper_cv: Condvar,
16    bounded_cv: Condvar,
17    bound: usize,
18}
19
20struct State<T> {
21    items: VecDeque<T>,
22}
23
24impl<T> Queue<T> {
25    pub fn new(bound: usize) -> Queue<T> {
26        Queue {
27            state: Mutex::new(State {
28                items: VecDeque::new(),
29            }),
30            popper_cv: Condvar::new(),
31            bounded_cv: Condvar::new(),
32            bound,
33        }
34    }
35
36    pub fn push(&self, item: T) {
37        self.state.lock().unwrap().items.push_back(item);
38        self.popper_cv.notify_one();
39    }
40
41    /// Pushes an item onto the queue, blocking if the queue is full.
42    pub fn push_bounded(&self, item: T) {
43        let locked_state = self.state.lock().unwrap();
44        let mut state = self
45            .bounded_cv
46            .wait_while(locked_state, |s| s.items.len() >= self.bound)
47            .unwrap();
48        state.items.push_back(item);
49        self.popper_cv.notify_one();
50    }
51
52    pub fn pop(&self, timeout: Duration) -> Option<T> {
53        let (mut state, result) = self
54            .popper_cv
55            .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
56            .unwrap();
57        if result.timed_out() {
58            None
59        } else {
60            let value = state.items.pop_front()?;
61            if state.items.len() < self.bound {
62                // Assumes threads cannot be canceled.
63                self.bounded_cv.notify_one();
64            }
65            Some(value)
66        }
67    }
68
69    pub fn try_pop_all(&self) -> Vec<T> {
70        let mut state = self.state.lock().unwrap();
71        let result = state.items.drain(..).collect();
72        self.bounded_cv.notify_all();
73        result
74    }
75}