Skip to main content

arch_event_queues/
event_queue.rs

1use std::{
2    collections::VecDeque,
3    sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, WaitTimeoutResult},
4    time::Duration,
5};
6
7use thiserror::Error;
8
9/// Error returned by in-memory queue operations.
10#[derive(Error, Debug)]
11pub enum EventQueueError {
12    /// The queue mutex was poisoned by a panicking thread.
13    #[error("Poisoned lock")]
14    PoisonedLock,
15    /// The queue condition variable wait returned with a poisoned lock.
16    #[error("Poisoned condvar")]
17    PoisonedCondvar,
18}
19
20impl<T> From<PoisonError<MutexGuard<'_, VecDeque<T>>>> for EventQueueError {
21    fn from(_: PoisonError<MutexGuard<'_, VecDeque<T>>>) -> Self {
22        EventQueueError::PoisonedLock
23    }
24}
25
26impl<T> From<PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>> for EventQueueError {
27    fn from(_: PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>) -> Self {
28        EventQueueError::PoisonedCondvar
29    }
30}
31
32pub mod error {
33    pub use super::EventQueueError;
34}
35
36/// Thread-safe in-memory FIFO event queue.
37#[derive(Default, Debug)]
38pub struct EventQueue<T> {
39    queue: Arc<Mutex<VecDeque<T>>>,
40    condvar: Arc<Condvar>,
41}
42
43impl<T> EventQueue<T> {
44    /// Creates an empty event queue.
45    pub fn new() -> Self {
46        Self {
47            queue: Arc::new(Mutex::new(VecDeque::new())),
48            condvar: Arc::new(Condvar::new()),
49        }
50    }
51
52    /// Pushes an event to the back of the queue and wakes one waiter.
53    pub fn push(&self, e: T) -> Result<(), EventQueueError> {
54        let mut locked_queue = self.queue.lock()?;
55        locked_queue.push_back(e);
56        self.condvar.notify_one();
57        Ok(())
58    }
59
60    /// Pushes an event to the front of the queue and wakes one waiter.
61    pub fn push_front(&self, e: T) -> Result<(), EventQueueError> {
62        let mut locked_queue = self.queue.lock()?;
63        locked_queue.push_front(e);
64        self.condvar.notify_one();
65        Ok(())
66    }
67
68    /// Waits briefly for an event, then returns the next event if one is available.
69    pub fn poll(&self) -> Result<Option<T>, EventQueueError> {
70        let mut locked_queue = self.queue.lock()?;
71        if locked_queue.is_empty() {
72            locked_queue = self
73                .condvar
74                .wait_timeout(locked_queue, Duration::from_millis(100))?
75                .0;
76        }
77        Ok(locked_queue.pop_front())
78    }
79
80    /// Returns the next event immediately, or `None` if the queue is empty.
81    pub fn pop(&self) -> Result<Option<T>, EventQueueError> {
82        let mut locked_queue = self.queue.lock()?;
83        Ok(locked_queue.pop_front())
84    }
85
86    /// Returns the number of events currently in the queue.
87    pub fn len(&self) -> Result<usize, EventQueueError> {
88        let locked_queue = self.queue.lock()?;
89        Ok(locked_queue.len())
90    }
91
92    /// Returns whether the queue currently contains no events.
93    pub fn is_empty(&self) -> Result<bool, EventQueueError> {
94        let locked_queue = self.queue.lock()?;
95        Ok(locked_queue.is_empty())
96    }
97}
98
99#[cfg(test)]
100mod test {
101    use super::EventQueue;
102    use std::sync::Arc;
103    use std::thread;
104    use std::time::Duration;
105
106    #[test]
107    fn test_event_queue_new() {
108        let queue: EventQueue<i32> = EventQueue::new();
109        assert!(queue.is_empty().unwrap());
110    }
111
112    #[test]
113    fn test_event_queue_push() {
114        let queue = EventQueue::new();
115        queue.push(1).unwrap();
116        assert_eq!(queue.len().unwrap(), 1);
117    }
118
119    #[test]
120    fn test_event_queue_pop() {
121        let queue = EventQueue::new();
122        queue.push(1).unwrap();
123        queue.push(2).unwrap();
124        assert_eq!(queue.pop().unwrap(), Some(1));
125        assert_eq!(queue.pop().unwrap(), Some(2));
126        assert_eq!(queue.pop().unwrap(), None);
127    }
128
129    #[test]
130    fn test_event_queue_len() {
131        let queue = EventQueue::new();
132        assert_eq!(queue.len().unwrap(), 0);
133        queue.push(1).unwrap();
134        assert_eq!(queue.len().unwrap(), 1);
135    }
136
137    #[test]
138    fn test_event_queue_is_empty() {
139        let queue = EventQueue::new();
140        assert!(queue.is_empty().unwrap());
141        queue.push(1).unwrap();
142        assert!(!queue.is_empty().unwrap());
143    }
144
145    #[test]
146    fn test_event_queue_poll() {
147        let queue = Arc::new(EventQueue::new());
148        let queue_clone = Arc::clone(&queue);
149
150        let handle = thread::spawn(move || {
151            thread::sleep(Duration::from_millis(50));
152            queue_clone.push(1).unwrap();
153        });
154
155        let polled_value = queue.poll();
156
157        assert_eq!(polled_value.unwrap(), Some(1));
158        assert!(queue.is_empty().unwrap());
159
160        handle.join().unwrap();
161    }
162}