rill_core/queues/
command.rs1use crossbeam_channel::{self, Receiver, Sender, TryRecvError, TrySendError};
8use std::fmt;
9
10pub trait Command: Send + 'static + fmt::Debug {}
14
15pub struct CommandSender<T> {
17 tx: Sender<T>,
18}
19
20impl<T> Clone for CommandSender<T> {
21 fn clone(&self) -> Self {
22 Self {
23 tx: self.tx.clone(),
24 }
25 }
26}
27
28impl<T: Send + 'static> CommandSender<T> {
29 pub fn send(&self, value: T) -> Result<(), super::QueueError> {
30 self.tx.try_send(value).map_err(|e| match e {
31 TrySendError::Full(_) => super::QueueError::QueueFull,
32 TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
33 })
34 }
35}
36
37pub struct CommandReceiver<T> {
39 rx: Receiver<T>,
40}
41
42impl<T: Send + 'static> CommandReceiver<T> {
43 pub fn try_recv(&self) -> Result<T, super::QueueError> {
44 self.rx.try_recv().map_err(|e| match e {
45 TryRecvError::Empty => super::QueueError::QueueEmpty,
46 TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
47 })
48 }
49}
50
51pub struct CommandQueue<T> {
62 tx: Sender<T>,
63 rx: Receiver<T>,
64 name: String,
65 capacity: usize,
66}
67
68impl<T: Send + 'static> CommandQueue<T> {
69 pub fn new(name: &str, capacity: usize) -> Self {
71 let (tx, rx) = crossbeam_channel::bounded(capacity);
72 Self {
73 tx,
74 rx,
75 name: name.to_string(),
76 capacity,
77 }
78 }
79
80 pub fn send(&self, value: T) -> Result<(), super::QueueError> {
82 self.tx.try_send(value).map_err(|e| match e {
83 TrySendError::Full(_) => super::QueueError::QueueFull,
84 TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
85 })
86 }
87
88 pub fn try_recv(&self) -> Result<T, super::QueueError> {
90 self.rx.try_recv().map_err(|e| match e {
91 TryRecvError::Empty => super::QueueError::QueueEmpty,
92 TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
93 })
94 }
95
96 pub fn sender(&self) -> Sender<T> {
98 self.tx.clone()
99 }
100
101 pub fn receiver(&self) -> Receiver<T> {
103 self.rx.clone()
104 }
105
106 pub fn name(&self) -> &str {
108 &self.name
109 }
110
111 pub fn capacity(&self) -> usize {
113 self.capacity
114 }
115
116 pub fn len(&self) -> usize {
118 self.rx.len()
119 }
120
121 pub fn is_empty(&self) -> bool {
123 self.len() == 0
124 }
125}
126
127impl<T: fmt::Debug + Send + 'static> fmt::Debug for CommandQueue<T> {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("CommandQueue")
130 .field("name", &self.name)
131 .field("capacity", &self.capacity)
132 .field("len", &self.len())
133 .finish()
134 }
135}
136
137impl<T> Clone for CommandQueue<T> {
138 fn clone(&self) -> Self {
139 Self {
140 tx: self.tx.clone(),
141 rx: self.rx.clone(),
142 name: self.name.clone(),
143 capacity: self.capacity,
144 }
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::queues::QueueError;
152 use std::thread;
153
154 #[test]
155 fn test_command_queue_basic() {
156 let queue = CommandQueue::<i32>::new("test", 16);
157
158 queue.send(42).unwrap();
159 queue.send(43).unwrap();
160
161 assert_eq!(queue.try_recv(), Ok(42));
162 assert_eq!(queue.try_recv(), Ok(43));
163 assert_eq!(queue.try_recv(), Err(QueueError::QueueEmpty));
164 }
165
166 #[test]
167 fn test_command_queue_sender_receiver() {
168 let queue = CommandQueue::<i32>::new("test", 16);
169 let sender = queue.sender();
170 let receiver = queue.receiver();
171
172 sender.send(1).unwrap();
173 sender.send(2).unwrap();
174 drop(sender);
175
176 assert_eq!(receiver.try_recv().unwrap(), 1);
177 assert_eq!(receiver.try_recv().unwrap(), 2);
178 }
179
180 #[test]
181 fn test_command_queue_full() {
182 let queue = CommandQueue::<i32>::new("test", 2);
183
184 assert!(queue.send(1).is_ok());
185 assert!(queue.send(2).is_ok());
186 match queue.send(3) {
188 Err(QueueError::QueueFull) => {}
189 _ => panic!("Expected QueueFull"),
190 }
191 }
192
193 #[test]
194 fn test_command_queue_threaded() {
195 let queue = std::sync::Arc::new(CommandQueue::<i32>::new("test", 1024));
196 let q2 = queue.clone();
197
198 let producer = thread::spawn(move || {
199 for i in 0..100 {
200 q2.send(i).unwrap();
201 }
202 });
203
204 let consumer = thread::spawn(move || {
205 let mut received = 0;
206 while received < 100 {
207 if let Ok(val) = queue.try_recv() {
208 assert_eq!(val, received);
209 received += 1;
210 }
211 }
212 });
213
214 producer.join().unwrap();
215 consumer.join().unwrap();
216 }
217}