rill_core/queues/
command.rs1use crossbeam_channel::{self, Receiver, Sender, TryRecvError, TrySendError};
7use std::fmt;
8
9pub trait Command: Send + 'static + fmt::Debug {}
13
14pub struct CommandSender<T> {
19 tx: Sender<T>,
21}
22
23impl<T> Clone for CommandSender<T> {
24 fn clone(&self) -> Self {
25 Self {
26 tx: self.tx.clone(),
27 }
28 }
29}
30
31impl<T: Send + 'static> CommandSender<T> {
32 pub fn send(&self, value: T) -> Result<(), super::QueueError> {
38 self.tx.try_send(value).map_err(|e| match e {
39 TrySendError::Full(_) => super::QueueError::QueueFull,
40 TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
41 })
42 }
43}
44
45pub struct CommandReceiver<T> {
47 rx: Receiver<T>,
49}
50
51impl<T: Send + 'static> CommandReceiver<T> {
52 pub fn try_recv(&self) -> Result<T, super::QueueError> {
58 self.rx.try_recv().map_err(|e| match e {
59 TryRecvError::Empty => super::QueueError::QueueEmpty,
60 TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
61 })
62 }
63}
64
65pub struct CommandQueue<T> {
79 tx: Sender<T>,
81 rx: Receiver<T>,
83 name: String,
85 capacity: usize,
87}
88
89impl<T: Send + 'static> CommandQueue<T> {
90 pub fn new(name: &str, capacity: usize) -> Self {
92 let (tx, rx) = crossbeam_channel::bounded(capacity);
93 Self {
94 tx,
95 rx,
96 name: name.to_string(),
97 capacity,
98 }
99 }
100
101 pub fn send(&self, value: T) -> Result<(), super::QueueError> {
107 self.tx.try_send(value).map_err(|e| match e {
108 TrySendError::Full(_) => super::QueueError::QueueFull,
109 TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
110 })
111 }
112
113 pub fn try_recv(&self) -> Result<T, super::QueueError> {
119 self.rx.try_recv().map_err(|e| match e {
120 TryRecvError::Empty => super::QueueError::QueueEmpty,
121 TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
122 })
123 }
124
125 pub fn sender(&self) -> Sender<T> {
127 self.tx.clone()
128 }
129
130 pub fn receiver(&self) -> Receiver<T> {
132 self.rx.clone()
133 }
134
135 pub fn name(&self) -> &str {
137 &self.name
138 }
139
140 pub fn capacity(&self) -> usize {
142 self.capacity
143 }
144
145 pub fn len(&self) -> usize {
147 self.rx.len()
148 }
149
150 pub fn is_empty(&self) -> bool {
152 self.len() == 0
153 }
154}
155
156impl<T: fmt::Debug + Send + 'static> fmt::Debug for CommandQueue<T> {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 f.debug_struct("CommandQueue")
159 .field("name", &self.name)
160 .field("capacity", &self.capacity)
161 .field("len", &self.len())
162 .finish()
163 }
164}
165
166impl<T> Clone for CommandQueue<T> {
167 fn clone(&self) -> Self {
168 Self {
169 tx: self.tx.clone(),
170 rx: self.rx.clone(),
171 name: self.name.clone(),
172 capacity: self.capacity,
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::queues::QueueError;
181 use std::thread;
182
183 #[test]
184 fn test_command_queue_basic() {
185 let queue = CommandQueue::<i32>::new("test", 16);
186
187 queue.send(42).unwrap();
188 queue.send(43).unwrap();
189
190 assert_eq!(queue.try_recv(), Ok(42));
191 assert_eq!(queue.try_recv(), Ok(43));
192 assert_eq!(queue.try_recv(), Err(QueueError::QueueEmpty));
193 }
194
195 #[test]
196 fn test_command_queue_sender_receiver() {
197 let queue = CommandQueue::<i32>::new("test", 16);
198 let sender = queue.sender();
199 let receiver = queue.receiver();
200
201 sender.send(1).unwrap();
202 sender.send(2).unwrap();
203 drop(sender);
204
205 assert_eq!(receiver.try_recv().unwrap(), 1);
206 assert_eq!(receiver.try_recv().unwrap(), 2);
207 }
208
209 #[test]
210 fn test_command_queue_full() {
211 let queue = CommandQueue::<i32>::new("test", 2);
212
213 assert!(queue.send(1).is_ok());
214 assert!(queue.send(2).is_ok());
215 match queue.send(3) {
217 Err(QueueError::QueueFull) => {}
218 _ => panic!("Expected QueueFull"),
219 }
220 }
221
222 #[test]
223 fn test_command_queue_threaded() {
224 let queue = std::sync::Arc::new(CommandQueue::<i32>::new("test", 1024));
225 let q2 = queue.clone();
226
227 let producer = thread::spawn(move || {
228 for i in 0..100 {
229 q2.send(i).unwrap();
230 }
231 });
232
233 let consumer = thread::spawn(move || {
234 let mut received = 0;
235 while received < 100 {
236 if let Ok(val) = queue.try_recv() {
237 assert_eq!(val, received);
238 received += 1;
239 }
240 }
241 });
242
243 producer.join().unwrap();
244 consumer.join().unwrap();
245 }
246}