arch-event-queues 0.1.0

In-memory and RocksDB-backed event queues for Arch services.
Documentation
use std::{
    collections::VecDeque,
    sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, WaitTimeoutResult},
    time::Duration,
};

use thiserror::Error;

/// Error returned by in-memory queue operations.
#[derive(Error, Debug)]
pub enum EventQueueError {
    /// The queue mutex was poisoned by a panicking thread.
    #[error("Poisoned lock")]
    PoisonedLock,
    /// The queue condition variable wait returned with a poisoned lock.
    #[error("Poisoned condvar")]
    PoisonedCondvar,
}

impl<T> From<PoisonError<MutexGuard<'_, VecDeque<T>>>> for EventQueueError {
    fn from(_: PoisonError<MutexGuard<'_, VecDeque<T>>>) -> Self {
        EventQueueError::PoisonedLock
    }
}

impl<T> From<PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>> for EventQueueError {
    fn from(_: PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>) -> Self {
        EventQueueError::PoisonedCondvar
    }
}

pub mod error {
    pub use super::EventQueueError;
}

/// Thread-safe in-memory FIFO event queue.
#[derive(Default, Debug)]
pub struct EventQueue<T> {
    queue: Arc<Mutex<VecDeque<T>>>,
    condvar: Arc<Condvar>,
}

impl<T> EventQueue<T> {
    /// Creates an empty event queue.
    pub fn new() -> Self {
        Self {
            queue: Arc::new(Mutex::new(VecDeque::new())),
            condvar: Arc::new(Condvar::new()),
        }
    }

    /// Pushes an event to the back of the queue and wakes one waiter.
    pub fn push(&self, e: T) -> Result<(), EventQueueError> {
        let mut locked_queue = self.queue.lock()?;
        locked_queue.push_back(e);
        self.condvar.notify_one();
        Ok(())
    }

    /// Pushes an event to the front of the queue and wakes one waiter.
    pub fn push_front(&self, e: T) -> Result<(), EventQueueError> {
        let mut locked_queue = self.queue.lock()?;
        locked_queue.push_front(e);
        self.condvar.notify_one();
        Ok(())
    }

    /// Waits briefly for an event, then returns the next event if one is available.
    pub fn poll(&self) -> Result<Option<T>, EventQueueError> {
        let mut locked_queue = self.queue.lock()?;
        if locked_queue.is_empty() {
            locked_queue = self
                .condvar
                .wait_timeout(locked_queue, Duration::from_millis(100))?
                .0;
        }
        Ok(locked_queue.pop_front())
    }

    /// Returns the next event immediately, or `None` if the queue is empty.
    pub fn pop(&self) -> Result<Option<T>, EventQueueError> {
        let mut locked_queue = self.queue.lock()?;
        Ok(locked_queue.pop_front())
    }

    /// Returns the number of events currently in the queue.
    pub fn len(&self) -> Result<usize, EventQueueError> {
        let locked_queue = self.queue.lock()?;
        Ok(locked_queue.len())
    }

    /// Returns whether the queue currently contains no events.
    pub fn is_empty(&self) -> Result<bool, EventQueueError> {
        let locked_queue = self.queue.lock()?;
        Ok(locked_queue.is_empty())
    }
}

#[cfg(test)]
mod test {
    use super::EventQueue;
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_event_queue_new() {
        let queue: EventQueue<i32> = EventQueue::new();
        assert!(queue.is_empty().unwrap());
    }

    #[test]
    fn test_event_queue_push() {
        let queue = EventQueue::new();
        queue.push(1).unwrap();
        assert_eq!(queue.len().unwrap(), 1);
    }

    #[test]
    fn test_event_queue_pop() {
        let queue = EventQueue::new();
        queue.push(1).unwrap();
        queue.push(2).unwrap();
        assert_eq!(queue.pop().unwrap(), Some(1));
        assert_eq!(queue.pop().unwrap(), Some(2));
        assert_eq!(queue.pop().unwrap(), None);
    }

    #[test]
    fn test_event_queue_len() {
        let queue = EventQueue::new();
        assert_eq!(queue.len().unwrap(), 0);
        queue.push(1).unwrap();
        assert_eq!(queue.len().unwrap(), 1);
    }

    #[test]
    fn test_event_queue_is_empty() {
        let queue = EventQueue::new();
        assert!(queue.is_empty().unwrap());
        queue.push(1).unwrap();
        assert!(!queue.is_empty().unwrap());
    }

    #[test]
    fn test_event_queue_poll() {
        let queue = Arc::new(EventQueue::new());
        let queue_clone = Arc::clone(&queue);

        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(50));
            queue_clone.push(1).unwrap();
        });

        let polled_value = queue.poll();

        assert_eq!(polled_value.unwrap(), Some(1));
        assert!(queue.is_empty().unwrap());

        handle.join().unwrap();
    }
}