Skip to main content

rill_core/queues/
mod.rs

1//! # Неблокирующие очереди для двухпоточной архитектуры
2//!
3//! Этот модуль предоставляет очереди для безопасного обмена
4//! данными между потоком управления (soft RT) и аудиопотоком (hard RT).
5//!
6//! ## Основные компоненты
7//!
8//! - [`SpscQueue`] — Single-producer single-consumer очередь (максимальная скорость)
9//! - [`RtQueueBase`] — базовый трейт для всех очередей
10//! - [`QueueError`] — ошибки операций с очередями (thiserror)
11//! - [`CommandQueue`] — команды из control thread в audio thread
12//! - [`OverflowPolicy`] — политики поведения при переполнении
13//! - [`UnderflowPolicy`] — политики поведения при опустошении
14
15use std::fmt;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18// =============================================================================
19// Подмодули
20// =============================================================================
21
22pub mod command;
23pub mod error;
24pub mod mpsc;
25pub mod observer;
26pub mod ring;
27pub mod rt_queue;
28pub mod signal;
29pub mod spsc;
30pub mod telemetry;
31pub mod telemetry_block;
32
33pub use command::CommandQueue;
34pub use error::{QueueError, QueueResult};
35pub use mpsc::MpscQueue;
36pub use rt_queue::RtQueue;
37pub use spsc::SpscQueue;
38pub use telemetry_block::TelemetryBlock;
39
40// Re-export key signal types
41pub use signal::{
42    AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
43    SetParameter, SignalSource,
44};
45
46// =============================================================================
47// Политики поведения
48// =============================================================================
49
50/// Политика поведения при переполнении очереди
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum OverflowPolicy {
53    /// Перезаписать самый старый элемент (кольцевой буфер)
54    OverwriteOldest,
55    /// Отбросить новый элемент
56    DropNewest,
57    /// Вызвать панику (только для отладки)
58    Panic,
59    /// Блокировать производителя (не для RT-потоков)
60    Block,
61}
62
63/// Политика поведения при пустой очереди
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum UnderflowPolicy {
66    /// Вернуть None
67    ReturnNone,
68    /// Вызвать панику (только для отладки)
69    Panic,
70}
71
72// =============================================================================
73// Статистика очереди
74// =============================================================================
75
76/// Живая статистика очереди (собирается внутри очереди)
77pub struct QueueStats {
78    pushes: AtomicUsize,
79    pops: AtomicUsize,
80    overflows: AtomicUsize,
81    underflows: AtomicUsize,
82    max_size: AtomicUsize,
83}
84
85impl QueueStats {
86    pub fn new() -> Self {
87        Self::default()
88    }
89
90    pub fn record_push(&self, current_size: usize) {
91        self.pushes.fetch_add(1, Ordering::Relaxed);
92        let prev = self.max_size.load(Ordering::Relaxed);
93        if current_size > prev {
94            let _ = self.max_size.compare_exchange(
95                prev,
96                current_size,
97                Ordering::Relaxed,
98                Ordering::Relaxed,
99            );
100        }
101    }
102
103    pub fn record_pop(&self) {
104        self.pops.fetch_add(1, Ordering::Relaxed);
105    }
106
107    pub fn record_overflow(&self) {
108        self.overflows.fetch_add(1, Ordering::Relaxed);
109    }
110
111    pub fn record_underflow(&self) {
112        self.underflows.fetch_add(1, Ordering::Relaxed);
113    }
114
115    pub fn snapshot(&self) -> QueueStatsSnapshot {
116        QueueStatsSnapshot {
117            pushes: self.pushes.load(Ordering::Relaxed),
118            pops: self.pops.load(Ordering::Relaxed),
119            overflows: self.overflows.load(Ordering::Relaxed),
120            underflows: self.underflows.load(Ordering::Relaxed),
121            max_size: self.max_size.load(Ordering::Relaxed),
122        }
123    }
124}
125
126impl Default for QueueStats {
127    fn default() -> Self {
128        Self {
129            pushes: AtomicUsize::new(0),
130            pops: AtomicUsize::new(0),
131            overflows: AtomicUsize::new(0),
132            underflows: AtomicUsize::new(0),
133            max_size: AtomicUsize::new(0),
134        }
135    }
136}
137
138/// Снимок статистики очереди
139#[derive(Debug, Clone, Copy, Default)]
140pub struct QueueStatsSnapshot {
141    /// Количество успешных push операций
142    pub pushes: usize,
143    /// Количество успешных pop операций
144    pub pops: usize,
145    /// Количество переполнений
146    pub overflows: usize,
147    /// Количество опустошений
148    pub underflows: usize,
149    /// Максимальный достигнутый размер
150    pub max_size: usize,
151}
152
153impl QueueStatsSnapshot {
154    /// Создать новую статистику
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Объединить две статистики
160    pub fn merge(&self, other: &Self) -> Self {
161        Self {
162            pushes: self.pushes + other.pushes,
163            pops: self.pops + other.pops,
164            overflows: self.overflows + other.overflows,
165            underflows: self.underflows + other.underflows,
166            max_size: self.max_size.max(other.max_size),
167        }
168    }
169}
170
171impl fmt::Display for QueueStatsSnapshot {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(
174            f,
175            "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
176            self.pushes, self.pops, self.overflows, self.underflows, self.max_size
177        )
178    }
179}
180
181// =============================================================================
182// Базовый трейт для всех очередей
183// =============================================================================
184
185/// Базовый трейт для всех очередей, безопасных для реального времени
186///
187/// Все реализации должны быть:
188/// - Lock-free (никаких мьютексов)
189/// - RT-safe (без аллокаций, без блокировок)
190pub trait RtQueueBase<T>: Send + Sync {
191    /// Добавить элемент в очередь
192    fn push(&self, value: T) -> QueueResult<()>;
193
194    /// Извлечь элемент из очереди
195    fn pop(&self) -> Option<T>;
196
197    /// Текущий размер очереди
198    fn len(&self) -> usize;
199
200    /// Вместимость очереди
201    fn capacity(&self) -> usize;
202
203    /// Проверить, пуста ли очередь
204    fn is_empty(&self) -> bool {
205        self.len() == 0
206    }
207
208    /// Проверить, полна ли очередь
209    fn is_full(&self) -> bool {
210        self.len() == self.capacity()
211    }
212
213    /// Очистить очередь
214    fn clear(&self);
215}
216
217// =============================================================================
218// Вспомогательные функции
219// =============================================================================
220
221/// Проверка, является ли число степенью двойки
222#[inline]
223pub const fn is_power_of_two(n: usize) -> bool {
224    n != 0 && (n & (n - 1)) == 0
225}
226
227/// Вычислить следующую степень двойки
228#[inline]
229pub const fn next_power_of_two(n: usize) -> usize {
230    let mut n = n - 1;
231    n |= n >> 1;
232    n |= n >> 2;
233    n |= n >> 4;
234    n |= n >> 8;
235    n |= n >> 16;
236    n += 1;
237    n
238}
239
240// =============================================================================
241// Тесты
242// =============================================================================
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn test_stats_snapshot() {
250        let stats1 = QueueStatsSnapshot {
251            pushes: 10,
252            pops: 5,
253            overflows: 1,
254            underflows: 0,
255            max_size: 8,
256        };
257
258        let stats2 = QueueStatsSnapshot {
259            pushes: 20,
260            pops: 15,
261            overflows: 0,
262            underflows: 2,
263            max_size: 16,
264        };
265
266        let merged = stats1.merge(&stats2);
267        assert_eq!(merged.pushes, 30);
268        assert_eq!(merged.pops, 20);
269        assert_eq!(merged.overflows, 1);
270        assert_eq!(merged.underflows, 2);
271        assert_eq!(merged.max_size, 16);
272    }
273
274    #[test]
275    fn test_power_of_two() {
276        assert!(is_power_of_two(1));
277        assert!(is_power_of_two(2));
278        assert!(is_power_of_two(4));
279        assert!(is_power_of_two(8));
280        assert!(is_power_of_two(16));
281        assert!(!is_power_of_two(3));
282        assert!(!is_power_of_two(5));
283        assert!(!is_power_of_two(6));
284        assert!(!is_power_of_two(7));
285    }
286
287    #[test]
288    fn test_next_power_of_two() {
289        assert_eq!(next_power_of_two(1), 1);
290        assert_eq!(next_power_of_two(2), 2);
291        assert_eq!(next_power_of_two(3), 4);
292        assert_eq!(next_power_of_two(4), 4);
293        assert_eq!(next_power_of_two(5), 8);
294        assert_eq!(next_power_of_two(6), 8);
295        assert_eq!(next_power_of_two(7), 8);
296        assert_eq!(next_power_of_two(8), 8);
297        assert_eq!(next_power_of_two(9), 16);
298    }
299
300    #[test]
301    fn test_queue_stats_record() {
302        let stats = QueueStats::new();
303        stats.record_push(5);
304        stats.record_push(8);
305        stats.record_overflow();
306        stats.record_pop();
307
308        let snap = stats.snapshot();
309        assert_eq!(snap.pushes, 2);
310        assert_eq!(snap.pops, 1);
311        assert_eq!(snap.overflows, 1);
312        assert_eq!(snap.max_size, 8);
313    }
314}