Skip to main content

rill_core/queues/
rt_queue.rs

1//! # Главная RT-safe очередь для двухпоточной архитектуры
2//!
3//! [`RtQueue`] — основная очередь для коммуникации между
4//! потоком управления и аудиопотоком. Объединяет функциональность
5//! SPSC и MPSC очередей с удобным API.
6
7use super::spsc::SpscQueue;
8use super::{QueueResult, QueueStatsSnapshot};
9
10/// Тип очереди
11#[derive(Debug, Clone, Copy)]
12pub enum QueueType {
13    /// Один производитель, один потребитель (максимальная скорость)
14    SingleProducer,
15    /// Много производителей, один потребитель
16    MultiProducer,
17}
18
19/// Главная RT-safe очередь
20///
21/// # Пример
22/// ```
23/// use rill_core::queues::RtQueue;
24///
25/// // Создаём очередь для команд
26/// let queue = RtQueue::<i32>::new(1024);
27///
28/// // Поток управления (soft RT)
29/// queue.push(42).unwrap();
30///
31/// // Аудиопоток (hard RT)
32/// if let Some(cmd) = queue.pop() {
33///     println!("Got command: {}", cmd);
34/// }
35/// ```
36pub struct RtQueue<T: Copy> {
37    /// Внутренняя реализация
38    inner: RtQueueInner<T>,
39}
40
41enum RtQueueInner<T: Copy> {
42    Spsc(SpscQueue<T, 1024>),        // Для одного производителя
43    Mpsc(super::mpsc::MpscQueue<T>), // Для многих производителей
44}
45
46impl<T: Copy + Default + Send + 'static> RtQueue<T> {
47    /// Создать новую очередь с фиксированным размером
48    pub fn new(capacity: usize) -> Self {
49        // По умолчанию используем SPSC для максимальной производительности
50        if capacity <= 1024 {
51            Self {
52                inner: RtQueueInner::Spsc(SpscQueue::new()),
53            }
54        } else {
55            Self {
56                inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
57            }
58        }
59    }
60
61    /// Создать очередь для одного производителя
62    pub fn new_spsc() -> Self {
63        Self {
64            inner: RtQueueInner::Spsc(SpscQueue::new()),
65        }
66    }
67
68    /// Создать очередь для многих производителей
69    pub fn new_mpsc(capacity: usize) -> Self {
70        Self {
71            inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
72        }
73    }
74
75    /// Добавить элемент (из потока управления)
76    pub fn push(&self, value: T) -> QueueResult<()> {
77        match &self.inner {
78            RtQueueInner::Spsc(q) => q.push(value),
79            RtQueueInner::Mpsc(q) => q.push(value),
80        }
81    }
82
83    /// Извлечь элемент (из аудиопотока)
84    pub fn pop(&self) -> Option<T> {
85        match &self.inner {
86            RtQueueInner::Spsc(q) => q.pop(),
87            RtQueueInner::Mpsc(q) => q.pop(),
88        }
89    }
90
91    /// Текущий размер
92    pub fn len(&self) -> usize {
93        match &self.inner {
94            RtQueueInner::Spsc(q) => q.len(),
95            RtQueueInner::Mpsc(q) => q.size(),
96        }
97    }
98
99    /// Вместимость
100    pub fn capacity(&self) -> usize {
101        match &self.inner {
102            RtQueueInner::Spsc(q) => q.capacity(),
103            RtQueueInner::Mpsc(q) => q.capacity(),
104        }
105    }
106
107    /// Проверить, пуста ли очередь
108    pub fn is_empty(&self) -> bool {
109        self.len() == 0
110    }
111
112    /// Получить статистику
113    pub fn stats(&self) -> QueueStatsSnapshot {
114        match &self.inner {
115            RtQueueInner::Spsc(q) => q.stats(),
116            RtQueueInner::Mpsc(_q) => {
117                // Заглушка для MPSC
118                QueueStatsSnapshot {
119                    pushes: 0,
120                    pops: 0,
121                    overflows: 0,
122                    underflows: 0,
123                    max_size: 0,
124                }
125            }
126        }
127    }
128}
129
130impl<T: Copy> Clone for RtQueue<T> {
131    fn clone(&self) -> Self {
132        // Только для MPSC очередей, SPSC не клонируются
133        match &self.inner {
134            RtQueueInner::Spsc(_) => panic!("Cannot clone SPSC queue"),
135            RtQueueInner::Mpsc(q) => Self {
136                inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(q.capacity())),
137            },
138        }
139    }
140}
141
142#[allow(unsafe_code)]
143unsafe impl<T: Copy + Send> Send for RtQueue<T> {}
144#[allow(unsafe_code)]
145unsafe impl<T: Copy + Sync> Sync for RtQueue<T> {}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_rt_queue_spsc() {
153        let queue = RtQueue::<i32>::new_spsc();
154
155        queue.push(42).unwrap();
156        assert_eq!(queue.pop(), Some(42));
157        assert_eq!(queue.pop(), None);
158    }
159
160    #[test]
161    fn test_rt_queue_mpsc() {
162        let queue = RtQueue::<i32>::new_mpsc(16);
163
164        queue.push(1).unwrap();
165        queue.push(2).unwrap();
166        queue.push(3).unwrap();
167
168        assert_eq!(queue.pop(), Some(1));
169        assert_eq!(queue.pop(), Some(2));
170        assert_eq!(queue.pop(), Some(3));
171    }
172}