Skip to main content

rill_core/queues/
command.rs

1//! # CommandQueue — non-blocking command queue
2//!
3//! [`CommandQueue`] provides safe command transfer from the control thread
4//! to the audio signal thread via a bounded crossbeam channel.
5
6use crossbeam_channel::{self, Receiver, Sender, TryRecvError, TrySendError};
7use std::fmt;
8
9/// Base trait for all commands.
10///
11/// Any type implementing this trait can be sent through a command queue.
12pub trait Command: Send + 'static + fmt::Debug {}
13
14/// Sender half of a command queue.
15///
16/// Cloned to share among multiple producer threads. Each clone references
17/// the same underlying crossbeam channel.
18pub struct CommandSender<T> {
19    /// The underlying crossbeam channel sender.
20    tx: Sender<T>,
21}
22
23impl<T> Clone for CommandSender<T> {
24    fn clone(&self) -> Self {
25        Self {
26            tx: self.tx.clone(),
27        }
28    }
29}
30
31impl<T: Send + 'static> CommandSender<T> {
32    /// Try to send a value into the queue.
33    ///
34    /// # Errors
35    /// Returns `QueueFull` if the queue is at capacity, or
36    /// `ChannelDisconnected` if the receiver has been dropped.
37    pub fn send(&self, value: T) -> Result<(), super::QueueError> {
38        self.tx.try_send(value).map_err(|e| match e {
39            TrySendError::Full(_) => super::QueueError::QueueFull,
40            TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
41        })
42    }
43}
44
45/// Receiver half of a command queue (consumed by the signal thread).
46pub struct CommandReceiver<T> {
47    /// The underlying crossbeam channel receiver.
48    rx: Receiver<T>,
49}
50
51impl<T: Send + 'static> CommandReceiver<T> {
52    /// Try to receive a value from the queue without blocking.
53    ///
54    /// # Errors
55    /// Returns `QueueEmpty` if no value is available, or
56    /// `ChannelDisconnected` if the sender has been dropped.
57    pub fn try_recv(&self) -> Result<T, super::QueueError> {
58        self.rx.try_recv().map_err(|e| match e {
59            TryRecvError::Empty => super::QueueError::QueueEmpty,
60            TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
61        })
62    }
63}
64
65/// Non-blocking bounded command queue.
66///
67/// Provides safe, lock-free transfer of commands from the control thread
68/// to the audio signal thread via a bounded crossbeam channel.
69///
70/// # Example
71/// ```
72/// use rill_core::queues::CommandQueue;
73///
74/// let queue = CommandQueue::<i32>::new("test", 16);
75/// queue.send(42).unwrap();
76/// assert_eq!(queue.try_recv(), Ok(42));
77/// ```
78pub struct CommandQueue<T> {
79    /// Inner crossbeam sender.
80    tx: Sender<T>,
81    /// Inner crossbeam receiver.
82    rx: Receiver<T>,
83    /// Human-readable queue name for debugging.
84    name: String,
85    /// Fixed capacity of the bounded channel.
86    capacity: usize,
87}
88
89impl<T: Send + 'static> CommandQueue<T> {
90    /// Create a new bounded queue with the given capacity.
91    pub fn new(name: &str, capacity: usize) -> Self {
92        let (tx, rx) = crossbeam_channel::bounded(capacity);
93        Self {
94            tx,
95            rx,
96            name: name.to_string(),
97            capacity,
98        }
99    }
100
101    /// Try to send a value into the queue (from the control thread).
102    ///
103    /// # Errors
104    /// Returns `QueueFull` if the queue is at capacity, or
105    /// `ChannelDisconnected` if the receiver has been dropped.
106    pub fn send(&self, value: T) -> Result<(), super::QueueError> {
107        self.tx.try_send(value).map_err(|e| match e {
108            TrySendError::Full(_) => super::QueueError::QueueFull,
109            TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
110        })
111    }
112
113    /// Try to receive a value from the queue (from the signal thread).
114    ///
115    /// # Errors
116    /// Returns `QueueEmpty` if no value is available, or
117    /// `ChannelDisconnected` if the sender has been dropped.
118    pub fn try_recv(&self) -> Result<T, super::QueueError> {
119        self.rx.try_recv().map_err(|e| match e {
120            TryRecvError::Empty => super::QueueError::QueueEmpty,
121            TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
122        })
123    }
124
125    /// Get a clone of the inner crossbeam sender.
126    pub fn sender(&self) -> Sender<T> {
127        self.tx.clone()
128    }
129
130    /// Get a clone of the inner crossbeam receiver.
131    pub fn receiver(&self) -> Receiver<T> {
132        self.rx.clone()
133    }
134
135    /// Return the human-readable queue name.
136    pub fn name(&self) -> &str {
137        &self.name
138    }
139
140    /// Return the fixed capacity of the queue.
141    pub fn capacity(&self) -> usize {
142        self.capacity
143    }
144
145    /// Return the number of elements currently in the queue.
146    pub fn len(&self) -> usize {
147        self.rx.len()
148    }
149
150    /// Return true if the queue is currently empty.
151    pub fn is_empty(&self) -> bool {
152        self.len() == 0
153    }
154}
155
156impl<T: fmt::Debug + Send + 'static> fmt::Debug for CommandQueue<T> {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        f.debug_struct("CommandQueue")
159            .field("name", &self.name)
160            .field("capacity", &self.capacity)
161            .field("len", &self.len())
162            .finish()
163    }
164}
165
166impl<T> Clone for CommandQueue<T> {
167    fn clone(&self) -> Self {
168        Self {
169            tx: self.tx.clone(),
170            rx: self.rx.clone(),
171            name: self.name.clone(),
172            capacity: self.capacity,
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::queues::QueueError;
181    use std::thread;
182
183    #[test]
184    fn test_command_queue_basic() {
185        let queue = CommandQueue::<i32>::new("test", 16);
186
187        queue.send(42).unwrap();
188        queue.send(43).unwrap();
189
190        assert_eq!(queue.try_recv(), Ok(42));
191        assert_eq!(queue.try_recv(), Ok(43));
192        assert_eq!(queue.try_recv(), Err(QueueError::QueueEmpty));
193    }
194
195    #[test]
196    fn test_command_queue_sender_receiver() {
197        let queue = CommandQueue::<i32>::new("test", 16);
198        let sender = queue.sender();
199        let receiver = queue.receiver();
200
201        sender.send(1).unwrap();
202        sender.send(2).unwrap();
203        drop(sender);
204
205        assert_eq!(receiver.try_recv().unwrap(), 1);
206        assert_eq!(receiver.try_recv().unwrap(), 2);
207    }
208
209    #[test]
210    fn test_command_queue_full() {
211        let queue = CommandQueue::<i32>::new("test", 2);
212
213        assert!(queue.send(1).is_ok());
214        assert!(queue.send(2).is_ok());
215        // Third send should return QueueFull
216        match queue.send(3) {
217            Err(QueueError::QueueFull) => {}
218            _ => panic!("Expected QueueFull"),
219        }
220    }
221
222    #[test]
223    fn test_command_queue_threaded() {
224        let queue = std::sync::Arc::new(CommandQueue::<i32>::new("test", 1024));
225        let q2 = queue.clone();
226
227        let producer = thread::spawn(move || {
228            for i in 0..100 {
229                q2.send(i).unwrap();
230            }
231        });
232
233        let consumer = thread::spawn(move || {
234            let mut received = 0;
235            while received < 100 {
236                if let Ok(val) = queue.try_recv() {
237                    assert_eq!(val, received);
238                    received += 1;
239                }
240            }
241        });
242
243        producer.join().unwrap();
244        consumer.join().unwrap();
245    }
246}