Skip to main content

rill_core/queues/
command.rs

1//! # CommandQueue — неблокирующая очередь команд
2//!
3//! [`CommandQueue`] обеспечивает безопасную передачу команд
4//! из потока управления (control thread) в аудиопоток (signal thread)
5//! через bounded crossbeam channel.
6
7use crossbeam_channel::{self, Receiver, Sender, TryRecvError, TrySendError};
8use std::fmt;
9
10/// Базовый трейт для всех команд
11///
12/// Любой тип, реализующий этот трейт, может передаваться через очередь.
13pub trait Command: Send + 'static + fmt::Debug {}
14
15/// Две половинки CommandQueue
16pub struct CommandSender<T> {
17    tx: Sender<T>,
18}
19
20impl<T> Clone for CommandSender<T> {
21    fn clone(&self) -> Self {
22        Self {
23            tx: self.tx.clone(),
24        }
25    }
26}
27
28impl<T: Send + 'static> CommandSender<T> {
29    pub fn send(&self, value: T) -> Result<(), super::QueueError> {
30        self.tx.try_send(value).map_err(|e| match e {
31            TrySendError::Full(_) => super::QueueError::QueueFull,
32            TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
33        })
34    }
35}
36
37/// Потребитель команд (signal thread)
38pub struct CommandReceiver<T> {
39    rx: Receiver<T>,
40}
41
42impl<T: Send + 'static> CommandReceiver<T> {
43    pub fn try_recv(&self) -> Result<T, super::QueueError> {
44        self.rx.try_recv().map_err(|e| match e {
45            TryRecvError::Empty => super::QueueError::QueueEmpty,
46            TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
47        })
48    }
49}
50
51/// Неблокирующая bounded очередь команд
52///
53/// # Пример
54/// ```
55/// use rill_core::queues::CommandQueue;
56///
57/// let queue = CommandQueue::<i32>::new("test", 16);
58/// queue.send(42).unwrap();
59/// assert_eq!(queue.try_recv(), Ok(42));
60/// ```
61pub struct CommandQueue<T> {
62    tx: Sender<T>,
63    rx: Receiver<T>,
64    name: String,
65    capacity: usize,
66}
67
68impl<T: Send + 'static> CommandQueue<T> {
69    /// Создать новую очередь с фиксированной ёмкостью
70    pub fn new(name: &str, capacity: usize) -> Self {
71        let (tx, rx) = crossbeam_channel::bounded(capacity);
72        Self {
73            tx,
74            rx,
75            name: name.to_string(),
76            capacity,
77        }
78    }
79
80    /// Отправить команду (из control thread)
81    pub fn send(&self, value: T) -> Result<(), super::QueueError> {
82        self.tx.try_send(value).map_err(|e| match e {
83            TrySendError::Full(_) => super::QueueError::QueueFull,
84            TrySendError::Disconnected(_) => super::QueueError::ChannelDisconnected,
85        })
86    }
87
88    /// Попытаться получить команду (из signal thread)
89    pub fn try_recv(&self) -> Result<T, super::QueueError> {
90        self.rx.try_recv().map_err(|e| match e {
91            TryRecvError::Empty => super::QueueError::QueueEmpty,
92            TryRecvError::Disconnected => super::QueueError::ChannelDisconnected,
93        })
94    }
95
96    /// Получить отправителя
97    pub fn sender(&self) -> Sender<T> {
98        self.tx.clone()
99    }
100
101    /// Получить получателя
102    pub fn receiver(&self) -> Receiver<T> {
103        self.rx.clone()
104    }
105
106    /// Имя очереди
107    pub fn name(&self) -> &str {
108        &self.name
109    }
110
111    /// Ёмкость
112    pub fn capacity(&self) -> usize {
113        self.capacity
114    }
115
116    /// Текущий размер
117    pub fn len(&self) -> usize {
118        self.rx.len()
119    }
120
121    /// Пуста ли
122    pub fn is_empty(&self) -> bool {
123        self.len() == 0
124    }
125}
126
127impl<T: fmt::Debug + Send + 'static> fmt::Debug for CommandQueue<T> {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        f.debug_struct("CommandQueue")
130            .field("name", &self.name)
131            .field("capacity", &self.capacity)
132            .field("len", &self.len())
133            .finish()
134    }
135}
136
137impl<T> Clone for CommandQueue<T> {
138    fn clone(&self) -> Self {
139        Self {
140            tx: self.tx.clone(),
141            rx: self.rx.clone(),
142            name: self.name.clone(),
143            capacity: self.capacity,
144        }
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::queues::QueueError;
152    use std::thread;
153
154    #[test]
155    fn test_command_queue_basic() {
156        let queue = CommandQueue::<i32>::new("test", 16);
157
158        queue.send(42).unwrap();
159        queue.send(43).unwrap();
160
161        assert_eq!(queue.try_recv(), Ok(42));
162        assert_eq!(queue.try_recv(), Ok(43));
163        assert_eq!(queue.try_recv(), Err(QueueError::QueueEmpty));
164    }
165
166    #[test]
167    fn test_command_queue_sender_receiver() {
168        let queue = CommandQueue::<i32>::new("test", 16);
169        let sender = queue.sender();
170        let receiver = queue.receiver();
171
172        sender.send(1).unwrap();
173        sender.send(2).unwrap();
174        drop(sender);
175
176        assert_eq!(receiver.try_recv().unwrap(), 1);
177        assert_eq!(receiver.try_recv().unwrap(), 2);
178    }
179
180    #[test]
181    fn test_command_queue_full() {
182        let queue = CommandQueue::<i32>::new("test", 2);
183
184        assert!(queue.send(1).is_ok());
185        assert!(queue.send(2).is_ok());
186        // Третья отправка должна вернуть QueueFull
187        match queue.send(3) {
188            Err(QueueError::QueueFull) => {}
189            _ => panic!("Expected QueueFull"),
190        }
191    }
192
193    #[test]
194    fn test_command_queue_threaded() {
195        let queue = std::sync::Arc::new(CommandQueue::<i32>::new("test", 1024));
196        let q2 = queue.clone();
197
198        let producer = thread::spawn(move || {
199            for i in 0..100 {
200                q2.send(i).unwrap();
201            }
202        });
203
204        let consumer = thread::spawn(move || {
205            let mut received = 0;
206            while received < 100 {
207                if let Ok(val) = queue.try_recv() {
208                    assert_eq!(val, received);
209                    received += 1;
210                }
211            }
212        });
213
214        producer.join().unwrap();
215        consumer.join().unwrap();
216    }
217}