use std::sync::{Mutex, Arc};
use std::collections::VecDeque;
use std::thread;
use crate::sync_flag::SyncFlagRx;
#[derive(Clone)]
pub struct WorkQueue<T: Send> {
inner: Arc<Mutex<VecDeque<T>>>,
}
impl<T: Send> WorkQueue<T> {
pub fn new() -> Self {
Self { inner: Arc::new(Mutex::new(VecDeque::new())) }
}
pub fn with_capacity(capacity: usize) -> Self {
Self { inner: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))) }
}
pub fn pull_work(&mut self) -> Option<T> {
if let Ok(mut queue) = self.inner.lock() {
queue.pop_front()
} else {
panic!("WorkQueue::pull_work() tried to lock a poisoned mutex.");
}
}
pub fn push_work(&mut self, work_element: T) -> usize {
if let Ok(mut queue) = self.inner.lock() {
queue.push_back(work_element);
queue.len()
} else {
panic!("WorkQueue::push_work() tried to lock a poisoned mutex.");
}
}
pub fn wait(&mut self, run_flag: &SyncFlagRx) -> Option<T> {
while run_flag.get() {
if let Some(w) = self.pull_work() {
return Some(w);
}
thread::yield_now();
}
return None;
}
pub fn len(&self) -> usize {
if let Ok(queue) = self.inner.lock() {
queue.len()
} else {
panic!("WorkQueue::len() tried to lock a poisoned mutex.");
}
}
}
#[cfg(test)]
mod tests {
use super::WorkQueue;
#[test]
fn add_and_remove() {
let mut wq: WorkQueue<i32> = WorkQueue::new();
assert_eq!(
wq.len(),
0,
"Expected queue to be created empty, it was {} long.",
wq.len()
);
wq.push_work(0);
let len_after_2_pushes = wq.push_work(1);
assert_eq!(
len_after_2_pushes,
2,
"Expected queue to have 2 elements, it was {} long.",
len_after_2_pushes,
);
wq.pull_work();
let work = wq.pull_work();
assert_eq!(
work.unwrap(),
1,
"Expected to pull 1 second since 1 was pushed second. Instead got {}.",
work.unwrap(),
);
let work = wq.pull_work();
assert_eq!(
work,
None,
"Expected to get None when pulling from an empty queue; instead, got {:?}.",
work,
);
}
}