1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5pub struct Queue<T> {
14 state: Mutex<State<T>>,
15 popper_cv: Condvar,
16 bounded_cv: Condvar,
17 bound: usize,
18}
19
20struct State<T> {
21 items: VecDeque<T>,
22}
23
24impl<T> Queue<T> {
25 pub fn new(bound: usize) -> Queue<T> {
26 Queue {
27 state: Mutex::new(State {
28 items: VecDeque::new(),
29 }),
30 popper_cv: Condvar::new(),
31 bounded_cv: Condvar::new(),
32 bound,
33 }
34 }
35
36 pub fn push(&self, item: T) {
37 self.state.lock().unwrap().items.push_back(item);
38 self.popper_cv.notify_one();
39 }
40
41 pub fn push_bounded(&self, item: T) {
43 let locked_state = self.state.lock().unwrap();
44 let mut state = self
45 .bounded_cv
46 .wait_while(locked_state, |s| s.items.len() >= self.bound)
47 .unwrap();
48 state.items.push_back(item);
49 self.popper_cv.notify_one();
50 }
51
52 pub fn pop(&self, timeout: Duration) -> Option<T> {
53 let (mut state, result) = self
54 .popper_cv
55 .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
56 .unwrap();
57 if result.timed_out() {
58 None
59 } else {
60 let value = state.items.pop_front()?;
61 if state.items.len() < self.bound {
62 self.bounded_cv.notify_one();
64 }
65 Some(value)
66 }
67 }
68
69 pub fn try_pop_all(&self) -> Vec<T> {
70 let mut state = self.state.lock().unwrap();
71 let result = state.items.drain(..).collect();
72 self.bounded_cv.notify_all();
73 result
74 }
75}