Skip to main content

rill_core/queues/
mod.rs

1//! # Non-blocking queues for the dual-thread architecture
2//!
3//! This module provides queues for safe data exchange between the
4//! control thread (soft RT) and the audio signal thread (hard RT).
5//!
6//! ## Components
7//!
8//! - [`SpscQueue`](crate::queues::SpscQueue) — Single-producer single-consumer queue (maximum throughput)
9//! - [`RtQueueBase`](crate::queues::RtQueueBase) — Base trait for all queues
10//! - [`QueueError`](crate::queues::QueueError) — Queue operation error type
11//! - [`CommandQueue`](crate::queues::CommandQueue) — Commands from control thread to signal thread
12//! - [`OverflowPolicy`](crate::queues::OverflowPolicy) — Overflow behaviour policies
13//! - [`UnderflowPolicy`](crate::queues::UnderflowPolicy) — Underflow behaviour policies
14
15use std::fmt;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18// =============================================================================
19// Подмодули
20// =============================================================================
21
22/// Bounded command queue using a crossbeam channel.
23pub mod command;
24/// Queue error types.
25pub mod error;
26/// Multi-producer single-consumer queue for automation.
27pub mod mpsc;
28/// Observer pattern helpers for queue monitoring.
29pub mod observer;
30/// Lock-free ring buffer for real-time use.
31pub mod ring;
32/// Base real-time queue implementation.
33pub mod rt_queue;
34/// Signal and command types for automation.
35pub mod signal;
36/// Lock-free single-producer single-consumer queue.
37pub mod spsc;
38/// Telemetry data types and senders.
39pub mod telemetry;
40/// Telemetry block batching utilities.
41pub mod telemetry_block;
42
43pub use command::CommandQueue;
44pub use error::{QueueError, QueueResult};
45pub use mpsc::MpscQueue;
46pub use rt_queue::RtQueue;
47pub use spsc::SpscQueue;
48pub use telemetry_block::TelemetryBlock;
49
50// Re-export key signal types
51pub use signal::{
52    AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
53    SetParameter, SignalSource,
54};
55
56// =============================================================================
57// Политики поведения
58// =============================================================================
59
60/// Overflow behaviour policy for bounded queues.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum OverflowPolicy {
63    /// Overwrite the oldest element (ring-buffer behaviour).
64    OverwriteOldest,
65    /// Discard the newest element (drop on full).
66    DropNewest,
67    /// Panic on overflow (debug only).
68    Panic,
69    /// Block the producer (not safe for RT threads).
70    Block,
71}
72
73/// Underflow behaviour policy for bounded queues.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum UnderflowPolicy {
76    /// Return `None` on empty.
77    ReturnNone,
78    /// Panic on underflow (debug only).
79    Panic,
80}
81
82// =============================================================================
83// Статистика очереди
84// =============================================================================
85
86/// Live queue statistics collected inside the queue.
87pub struct QueueStats {
88    /// Total number of successful push operations.
89    pushes: AtomicUsize,
90    /// Total number of successful pop operations.
91    pops: AtomicUsize,
92    /// Total number of overflow events.
93    overflows: AtomicUsize,
94    /// Total number of underflow events.
95    underflows: AtomicUsize,
96    /// Maximum observed queue size.
97    max_size: AtomicUsize,
98}
99
100impl QueueStats {
101    /// Create a new empty statistics counter.
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Record a push operation and update the max size if needed.
107    pub fn record_push(&self, current_size: usize) {
108        self.pushes.fetch_add(1, Ordering::Relaxed);
109        let prev = self.max_size.load(Ordering::Relaxed);
110        if current_size > prev {
111            let _ = self.max_size.compare_exchange(
112                prev,
113                current_size,
114                Ordering::Relaxed,
115                Ordering::Relaxed,
116            );
117        }
118    }
119
120    /// Record a pop operation.
121    pub fn record_pop(&self) {
122        self.pops.fetch_add(1, Ordering::Relaxed);
123    }
124
125    /// Record an overflow event.
126    pub fn record_overflow(&self) {
127        self.overflows.fetch_add(1, Ordering::Relaxed);
128    }
129
130    /// Record an underflow event.
131    pub fn record_underflow(&self) {
132        self.underflows.fetch_add(1, Ordering::Relaxed);
133    }
134
135    /// Take an atomic snapshot of the current statistics.
136    pub fn snapshot(&self) -> QueueStatsSnapshot {
137        QueueStatsSnapshot {
138            pushes: self.pushes.load(Ordering::Relaxed),
139            pops: self.pops.load(Ordering::Relaxed),
140            overflows: self.overflows.load(Ordering::Relaxed),
141            underflows: self.underflows.load(Ordering::Relaxed),
142            max_size: self.max_size.load(Ordering::Relaxed),
143        }
144    }
145}
146
147impl Default for QueueStats {
148    fn default() -> Self {
149        Self {
150            pushes: AtomicUsize::new(0),
151            pops: AtomicUsize::new(0),
152            overflows: AtomicUsize::new(0),
153            underflows: AtomicUsize::new(0),
154            max_size: AtomicUsize::new(0),
155        }
156    }
157}
158
159/// Point-in-time snapshot of queue statistics.
160#[derive(Debug, Clone, Copy, Default)]
161pub struct QueueStatsSnapshot {
162    /// Number of successful push operations.
163    pub pushes: usize,
164    /// Number of successful pop operations.
165    pub pops: usize,
166    /// Number of overflow events.
167    pub overflows: usize,
168    /// Number of underflow events.
169    pub underflows: usize,
170    /// Maximum observed queue size.
171    pub max_size: usize,
172}
173
174impl QueueStatsSnapshot {
175    /// Create a new empty snapshot.
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Merge two snapshots by summing counts and taking the max size.
181    pub fn merge(&self, other: &Self) -> Self {
182        Self {
183            pushes: self.pushes + other.pushes,
184            pops: self.pops + other.pops,
185            overflows: self.overflows + other.overflows,
186            underflows: self.underflows + other.underflows,
187            max_size: self.max_size.max(other.max_size),
188        }
189    }
190}
191
192impl fmt::Display for QueueStatsSnapshot {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        write!(
195            f,
196            "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
197            self.pushes, self.pops, self.overflows, self.underflows, self.max_size
198        )
199    }
200}
201
202// =============================================================================
203// Базовый трейт для всех очередей
204// =============================================================================
205
206/// Base trait for all real-time safe queues.
207///
208/// Implementations must be:
209/// - Lock-free (no mutexes)
210/// - RT-safe (no allocations, no blocking)
211pub trait RtQueueBase<T>: Send + Sync {
212    /// Push a value into the queue.
213    ///
214    /// # Errors
215    /// Returns `QueueFull` if the queue is at capacity.
216    fn push(&self, value: T) -> QueueResult<()>;
217
218    /// Pop a value from the queue, or `None` if empty.
219    fn pop(&self) -> Option<T>;
220
221    /// Current number of elements in the queue.
222    fn len(&self) -> usize;
223
224    /// Maximum capacity of the queue.
225    fn capacity(&self) -> usize;
226
227    /// Return true if the queue is empty.
228    fn is_empty(&self) -> bool {
229        self.len() == 0
230    }
231
232    /// Return true if the queue is full.
233    fn is_full(&self) -> bool {
234        self.len() == self.capacity()
235    }
236
237    /// Clear all elements from the queue.
238    fn clear(&self);
239}
240
241// =============================================================================
242// Вспомогательные функции
243// =============================================================================
244
245/// Return true if `n` is a power of two.
246#[inline]
247pub const fn is_power_of_two(n: usize) -> bool {
248    n != 0 && (n & (n - 1)) == 0
249}
250
251/// Compute the next power of two greater than or equal to `n`.
252///
253/// # Panics
254/// Panics when `n` is 0.
255#[inline]
256pub const fn next_power_of_two(n: usize) -> usize {
257    let mut n = n - 1;
258    n |= n >> 1;
259    n |= n >> 2;
260    n |= n >> 4;
261    n |= n >> 8;
262    n |= n >> 16;
263    n += 1;
264    n
265}
266
267// =============================================================================
268// Тесты
269// =============================================================================
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_stats_snapshot() {
277        let stats1 = QueueStatsSnapshot {
278            pushes: 10,
279            pops: 5,
280            overflows: 1,
281            underflows: 0,
282            max_size: 8,
283        };
284
285        let stats2 = QueueStatsSnapshot {
286            pushes: 20,
287            pops: 15,
288            overflows: 0,
289            underflows: 2,
290            max_size: 16,
291        };
292
293        let merged = stats1.merge(&stats2);
294        assert_eq!(merged.pushes, 30);
295        assert_eq!(merged.pops, 20);
296        assert_eq!(merged.overflows, 1);
297        assert_eq!(merged.underflows, 2);
298        assert_eq!(merged.max_size, 16);
299    }
300
301    #[test]
302    fn test_power_of_two() {
303        assert!(is_power_of_two(1));
304        assert!(is_power_of_two(2));
305        assert!(is_power_of_two(4));
306        assert!(is_power_of_two(8));
307        assert!(is_power_of_two(16));
308        assert!(!is_power_of_two(3));
309        assert!(!is_power_of_two(5));
310        assert!(!is_power_of_two(6));
311        assert!(!is_power_of_two(7));
312    }
313
314    #[test]
315    fn test_next_power_of_two() {
316        assert_eq!(next_power_of_two(1), 1);
317        assert_eq!(next_power_of_two(2), 2);
318        assert_eq!(next_power_of_two(3), 4);
319        assert_eq!(next_power_of_two(4), 4);
320        assert_eq!(next_power_of_two(5), 8);
321        assert_eq!(next_power_of_two(6), 8);
322        assert_eq!(next_power_of_two(7), 8);
323        assert_eq!(next_power_of_two(8), 8);
324        assert_eq!(next_power_of_two(9), 16);
325    }
326
327    #[test]
328    fn test_queue_stats_record() {
329        let stats = QueueStats::new();
330        stats.record_push(5);
331        stats.record_push(8);
332        stats.record_overflow();
333        stats.record_pop();
334
335        let snap = stats.snapshot();
336        assert_eq!(snap.pushes, 2);
337        assert_eq!(snap.pops, 1);
338        assert_eq!(snap.overflows, 1);
339        assert_eq!(snap.max_size, 8);
340    }
341}