use std::{
collections::VecDeque,
sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, WaitTimeoutResult},
time::Duration,
};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum EventQueueError {
#[error("Poisoned lock")]
PoisonedLock,
#[error("Poisoned condvar")]
PoisonedCondvar,
}
impl<T> From<PoisonError<MutexGuard<'_, VecDeque<T>>>> for EventQueueError {
fn from(_: PoisonError<MutexGuard<'_, VecDeque<T>>>) -> Self {
EventQueueError::PoisonedLock
}
}
impl<T> From<PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>> for EventQueueError {
fn from(_: PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>) -> Self {
EventQueueError::PoisonedCondvar
}
}
pub mod error {
pub use super::EventQueueError;
}
#[derive(Default, Debug)]
pub struct EventQueue<T> {
queue: Arc<Mutex<VecDeque<T>>>,
condvar: Arc<Condvar>,
}
impl<T> EventQueue<T> {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
condvar: Arc::new(Condvar::new()),
}
}
pub fn push(&self, e: T) -> Result<(), EventQueueError> {
let mut locked_queue = self.queue.lock()?;
locked_queue.push_back(e);
self.condvar.notify_one();
Ok(())
}
pub fn push_front(&self, e: T) -> Result<(), EventQueueError> {
let mut locked_queue = self.queue.lock()?;
locked_queue.push_front(e);
self.condvar.notify_one();
Ok(())
}
pub fn poll(&self) -> Result<Option<T>, EventQueueError> {
let mut locked_queue = self.queue.lock()?;
if locked_queue.is_empty() {
locked_queue = self
.condvar
.wait_timeout(locked_queue, Duration::from_millis(100))?
.0;
}
Ok(locked_queue.pop_front())
}
pub fn pop(&self) -> Result<Option<T>, EventQueueError> {
let mut locked_queue = self.queue.lock()?;
Ok(locked_queue.pop_front())
}
pub fn len(&self) -> Result<usize, EventQueueError> {
let locked_queue = self.queue.lock()?;
Ok(locked_queue.len())
}
pub fn is_empty(&self) -> Result<bool, EventQueueError> {
let locked_queue = self.queue.lock()?;
Ok(locked_queue.is_empty())
}
}
#[cfg(test)]
mod test {
use super::EventQueue;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn test_event_queue_new() {
let queue: EventQueue<i32> = EventQueue::new();
assert!(queue.is_empty().unwrap());
}
#[test]
fn test_event_queue_push() {
let queue = EventQueue::new();
queue.push(1).unwrap();
assert_eq!(queue.len().unwrap(), 1);
}
#[test]
fn test_event_queue_pop() {
let queue = EventQueue::new();
queue.push(1).unwrap();
queue.push(2).unwrap();
assert_eq!(queue.pop().unwrap(), Some(1));
assert_eq!(queue.pop().unwrap(), Some(2));
assert_eq!(queue.pop().unwrap(), None);
}
#[test]
fn test_event_queue_len() {
let queue = EventQueue::new();
assert_eq!(queue.len().unwrap(), 0);
queue.push(1).unwrap();
assert_eq!(queue.len().unwrap(), 1);
}
#[test]
fn test_event_queue_is_empty() {
let queue = EventQueue::new();
assert!(queue.is_empty().unwrap());
queue.push(1).unwrap();
assert!(!queue.is_empty().unwrap());
}
#[test]
fn test_event_queue_poll() {
let queue = Arc::new(EventQueue::new());
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
queue_clone.push(1).unwrap();
});
let polled_value = queue.poll();
assert_eq!(polled_value.unwrap(), Some(1));
assert!(queue.is_empty().unwrap());
handle.join().unwrap();
}
}