arch_event_queues/
event_queue.rs1use std::{
2 collections::VecDeque,
3 sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, WaitTimeoutResult},
4 time::Duration,
5};
6
7use thiserror::Error;
8
9#[derive(Error, Debug)]
11pub enum EventQueueError {
12 #[error("Poisoned lock")]
14 PoisonedLock,
15 #[error("Poisoned condvar")]
17 PoisonedCondvar,
18}
19
20impl<T> From<PoisonError<MutexGuard<'_, VecDeque<T>>>> for EventQueueError {
21 fn from(_: PoisonError<MutexGuard<'_, VecDeque<T>>>) -> Self {
22 EventQueueError::PoisonedLock
23 }
24}
25
26impl<T> From<PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>> for EventQueueError {
27 fn from(_: PoisonError<(MutexGuard<'_, VecDeque<T>>, WaitTimeoutResult)>) -> Self {
28 EventQueueError::PoisonedCondvar
29 }
30}
31
32pub mod error {
33 pub use super::EventQueueError;
34}
35
36#[derive(Default, Debug)]
38pub struct EventQueue<T> {
39 queue: Arc<Mutex<VecDeque<T>>>,
40 condvar: Arc<Condvar>,
41}
42
43impl<T> EventQueue<T> {
44 pub fn new() -> Self {
46 Self {
47 queue: Arc::new(Mutex::new(VecDeque::new())),
48 condvar: Arc::new(Condvar::new()),
49 }
50 }
51
52 pub fn push(&self, e: T) -> Result<(), EventQueueError> {
54 let mut locked_queue = self.queue.lock()?;
55 locked_queue.push_back(e);
56 self.condvar.notify_one();
57 Ok(())
58 }
59
60 pub fn push_front(&self, e: T) -> Result<(), EventQueueError> {
62 let mut locked_queue = self.queue.lock()?;
63 locked_queue.push_front(e);
64 self.condvar.notify_one();
65 Ok(())
66 }
67
68 pub fn poll(&self) -> Result<Option<T>, EventQueueError> {
70 let mut locked_queue = self.queue.lock()?;
71 if locked_queue.is_empty() {
72 locked_queue = self
73 .condvar
74 .wait_timeout(locked_queue, Duration::from_millis(100))?
75 .0;
76 }
77 Ok(locked_queue.pop_front())
78 }
79
80 pub fn pop(&self) -> Result<Option<T>, EventQueueError> {
82 let mut locked_queue = self.queue.lock()?;
83 Ok(locked_queue.pop_front())
84 }
85
86 pub fn len(&self) -> Result<usize, EventQueueError> {
88 let locked_queue = self.queue.lock()?;
89 Ok(locked_queue.len())
90 }
91
92 pub fn is_empty(&self) -> Result<bool, EventQueueError> {
94 let locked_queue = self.queue.lock()?;
95 Ok(locked_queue.is_empty())
96 }
97}
98
99#[cfg(test)]
100mod test {
101 use super::EventQueue;
102 use std::sync::Arc;
103 use std::thread;
104 use std::time::Duration;
105
106 #[test]
107 fn test_event_queue_new() {
108 let queue: EventQueue<i32> = EventQueue::new();
109 assert!(queue.is_empty().unwrap());
110 }
111
112 #[test]
113 fn test_event_queue_push() {
114 let queue = EventQueue::new();
115 queue.push(1).unwrap();
116 assert_eq!(queue.len().unwrap(), 1);
117 }
118
119 #[test]
120 fn test_event_queue_pop() {
121 let queue = EventQueue::new();
122 queue.push(1).unwrap();
123 queue.push(2).unwrap();
124 assert_eq!(queue.pop().unwrap(), Some(1));
125 assert_eq!(queue.pop().unwrap(), Some(2));
126 assert_eq!(queue.pop().unwrap(), None);
127 }
128
129 #[test]
130 fn test_event_queue_len() {
131 let queue = EventQueue::new();
132 assert_eq!(queue.len().unwrap(), 0);
133 queue.push(1).unwrap();
134 assert_eq!(queue.len().unwrap(), 1);
135 }
136
137 #[test]
138 fn test_event_queue_is_empty() {
139 let queue = EventQueue::new();
140 assert!(queue.is_empty().unwrap());
141 queue.push(1).unwrap();
142 assert!(!queue.is_empty().unwrap());
143 }
144
145 #[test]
146 fn test_event_queue_poll() {
147 let queue = Arc::new(EventQueue::new());
148 let queue_clone = Arc::clone(&queue);
149
150 let handle = thread::spawn(move || {
151 thread::sleep(Duration::from_millis(50));
152 queue_clone.push(1).unwrap();
153 });
154
155 let polled_value = queue.poll();
156
157 assert_eq!(polled_value.unwrap(), Some(1));
158 assert!(queue.is_empty().unwrap());
159
160 handle.join().unwrap();
161 }
162}