Skip to main content

rill_core/queues/
mod.rs

1//! # Non-blocking queues for the dual-thread architecture
2//!
3//! This module provides queues for safe data exchange between the
4//! control thread (soft RT) and the signal processing thread (hard RT).
5//!
6//! ## Components
7//!
8//! - [`SpscQueue`](crate::queues::SpscQueue) — Single-producer single-consumer queue (maximum throughput)
9//! - [`RtQueueBase`](crate::queues::RtQueueBase) — Base trait for all queues
10//! - [`QueueError`](crate::queues::QueueError) — Queue operation error type
11//! - [`OverflowPolicy`](crate::queues::OverflowPolicy) — Overflow behaviour policies
12//! - [`UnderflowPolicy`](crate::queues::UnderflowPolicy) — Underflow behaviour policies
13
14use std::fmt;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17// =============================================================================
18// Submodules
19// =============================================================================
20
21/// Command trait for actor message types.
22pub mod command;
23/// Signal and command types for automation.
24pub mod control_event;
25/// Queue error types.
26pub mod error;
27/// Multi-producer single-consumer queue for automation.
28pub mod mpsc;
29/// Lock-free ring buffer for real-time use.
30pub mod ring;
31/// Base real-time queue implementation.
32pub mod rt_queue;
33pub mod signal;
34/// Lock-free single-producer single-consumer queue.
35pub mod spsc;
36/// Telemetry data types (future functionality).
37pub mod telemetry;
38/// Telemetry block batching utilities.
39pub mod telemetry_block;
40
41pub use error::{QueueError, QueueResult};
42pub use mpsc::MpscQueue;
43pub use rt_queue::RtQueue;
44pub use spsc::SpscQueue;
45pub use telemetry_block::TelemetryBlock;
46
47// Re-export key signal types
48pub use signal::{
49    AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
50    SetParameter, SignalOrigin,
51};
52
53// =============================================================================
54// Behaviour policies
55// =============================================================================
56
57/// Overflow behaviour policy for bounded queues.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum OverflowPolicy {
60    /// Overwrite the oldest element (ring-buffer behaviour).
61    OverwriteOldest,
62    /// Discard the newest element (drop on full).
63    DropNewest,
64    /// Panic on overflow (debug only).
65    Panic,
66    /// Block the producer (not safe for RT threads).
67    Block,
68}
69
70/// Underflow behaviour policy for bounded queues.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum UnderflowPolicy {
73    /// Return `None` on empty.
74    ReturnNone,
75    /// Panic on underflow (debug only).
76    Panic,
77}
78
79// =============================================================================
80// Queue statistics
81// =============================================================================
82
83/// Live queue statistics collected inside the queue.
84pub struct QueueStats {
85    /// Total number of successful push operations.
86    pushes: AtomicUsize,
87    /// Total number of successful pop operations.
88    pops: AtomicUsize,
89    /// Total number of overflow events.
90    overflows: AtomicUsize,
91    /// Total number of underflow events.
92    underflows: AtomicUsize,
93    /// Maximum observed queue size.
94    max_size: AtomicUsize,
95}
96
97impl QueueStats {
98    /// Create a new empty statistics counter.
99    pub fn new() -> Self {
100        Self::default()
101    }
102
103    /// Record a push operation and update the max size if needed.
104    pub fn record_push(&self, current_size: usize) {
105        self.pushes.fetch_add(1, Ordering::Relaxed);
106        let prev = self.max_size.load(Ordering::Relaxed);
107        if current_size > prev {
108            let _ = self.max_size.compare_exchange(
109                prev,
110                current_size,
111                Ordering::Relaxed,
112                Ordering::Relaxed,
113            );
114        }
115    }
116
117    /// Record a pop operation.
118    pub fn record_pop(&self) {
119        self.pops.fetch_add(1, Ordering::Relaxed);
120    }
121
122    /// Record an overflow event.
123    pub fn record_overflow(&self) {
124        self.overflows.fetch_add(1, Ordering::Relaxed);
125    }
126
127    /// Record an underflow event.
128    pub fn record_underflow(&self) {
129        self.underflows.fetch_add(1, Ordering::Relaxed);
130    }
131
132    /// Take an atomic snapshot of the current statistics.
133    pub fn snapshot(&self) -> QueueStatsSnapshot {
134        QueueStatsSnapshot {
135            pushes: self.pushes.load(Ordering::Relaxed),
136            pops: self.pops.load(Ordering::Relaxed),
137            overflows: self.overflows.load(Ordering::Relaxed),
138            underflows: self.underflows.load(Ordering::Relaxed),
139            max_size: self.max_size.load(Ordering::Relaxed),
140        }
141    }
142}
143
144impl Default for QueueStats {
145    fn default() -> Self {
146        Self {
147            pushes: AtomicUsize::new(0),
148            pops: AtomicUsize::new(0),
149            overflows: AtomicUsize::new(0),
150            underflows: AtomicUsize::new(0),
151            max_size: AtomicUsize::new(0),
152        }
153    }
154}
155
156/// Point-in-time snapshot of queue statistics.
157#[derive(Debug, Clone, Copy, Default)]
158pub struct QueueStatsSnapshot {
159    /// Number of successful push operations.
160    pub pushes: usize,
161    /// Number of successful pop operations.
162    pub pops: usize,
163    /// Number of overflow events.
164    pub overflows: usize,
165    /// Number of underflow events.
166    pub underflows: usize,
167    /// Maximum observed queue size.
168    pub max_size: usize,
169}
170
171impl QueueStatsSnapshot {
172    /// Create a new empty snapshot.
173    pub fn new() -> Self {
174        Self::default()
175    }
176
177    /// Merge two snapshots by summing counts and taking the max size.
178    pub fn merge(&self, other: &Self) -> Self {
179        Self {
180            pushes: self.pushes + other.pushes,
181            pops: self.pops + other.pops,
182            overflows: self.overflows + other.overflows,
183            underflows: self.underflows + other.underflows,
184            max_size: self.max_size.max(other.max_size),
185        }
186    }
187}
188
189impl fmt::Display for QueueStatsSnapshot {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        write!(
192            f,
193            "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
194            self.pushes, self.pops, self.overflows, self.underflows, self.max_size
195        )
196    }
197}
198
199// =============================================================================
200// Base trait for all queues
201// =============================================================================
202
203/// Base trait for all real-time safe queues.
204///
205/// Implementations must be:
206/// - Lock-free (no mutexes)
207/// - RT-safe (no allocations, no blocking)
208pub trait RtQueueBase<T>: Send + Sync {
209    /// Push a value into the queue.
210    ///
211    /// # Errors
212    /// Returns `QueueFull` if the queue is at capacity.
213    fn push(&self, value: T) -> QueueResult<()>;
214
215    /// Pop a value from the queue, or `None` if empty.
216    fn pop(&self) -> Option<T>;
217
218    /// Current number of elements in the queue.
219    fn len(&self) -> usize;
220
221    /// Maximum capacity of the queue.
222    fn capacity(&self) -> usize;
223
224    /// Return true if the queue is empty.
225    fn is_empty(&self) -> bool {
226        self.len() == 0
227    }
228
229    /// Return true if the queue is full.
230    fn is_full(&self) -> bool {
231        self.len() == self.capacity()
232    }
233
234    /// Clear all elements from the queue.
235    fn clear(&self);
236}
237
238// =============================================================================
239// Helper functions
240// =============================================================================
241
242/// Return true if `n` is a power of two.
243#[inline]
244pub const fn is_power_of_two(n: usize) -> bool {
245    n != 0 && (n & (n - 1)) == 0
246}
247
248/// Compute the next power of two greater than or equal to `n`.
249///
250/// # Panics
251/// Panics when `n` is 0.
252#[inline]
253pub const fn next_power_of_two(n: usize) -> usize {
254    let mut n = n - 1;
255    n |= n >> 1;
256    n |= n >> 2;
257    n |= n >> 4;
258    n |= n >> 8;
259    n |= n >> 16;
260    n += 1;
261    n
262}
263
264// =============================================================================
265// Tests
266// =============================================================================
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_stats_snapshot() {
274        let stats1 = QueueStatsSnapshot {
275            pushes: 10,
276            pops: 5,
277            overflows: 1,
278            underflows: 0,
279            max_size: 8,
280        };
281
282        let stats2 = QueueStatsSnapshot {
283            pushes: 20,
284            pops: 15,
285            overflows: 0,
286            underflows: 2,
287            max_size: 16,
288        };
289
290        let merged = stats1.merge(&stats2);
291        assert_eq!(merged.pushes, 30);
292        assert_eq!(merged.pops, 20);
293        assert_eq!(merged.overflows, 1);
294        assert_eq!(merged.underflows, 2);
295        assert_eq!(merged.max_size, 16);
296    }
297
298    #[test]
299    fn test_power_of_two() {
300        assert!(is_power_of_two(1));
301        assert!(is_power_of_two(2));
302        assert!(is_power_of_two(4));
303        assert!(is_power_of_two(8));
304        assert!(is_power_of_two(16));
305        assert!(!is_power_of_two(3));
306        assert!(!is_power_of_two(5));
307        assert!(!is_power_of_two(6));
308        assert!(!is_power_of_two(7));
309    }
310
311    #[test]
312    fn test_next_power_of_two() {
313        assert_eq!(next_power_of_two(1), 1);
314        assert_eq!(next_power_of_two(2), 2);
315        assert_eq!(next_power_of_two(3), 4);
316        assert_eq!(next_power_of_two(4), 4);
317        assert_eq!(next_power_of_two(5), 8);
318        assert_eq!(next_power_of_two(6), 8);
319        assert_eq!(next_power_of_two(7), 8);
320        assert_eq!(next_power_of_two(8), 8);
321        assert_eq!(next_power_of_two(9), 16);
322    }
323
324    #[test]
325    fn test_queue_stats_record() {
326        let stats = QueueStats::new();
327        stats.record_push(5);
328        stats.record_push(8);
329        stats.record_overflow();
330        stats.record_pop();
331
332        let snap = stats.snapshot();
333        assert_eq!(snap.pushes, 2);
334        assert_eq!(snap.pops, 1);
335        assert_eq!(snap.overflows, 1);
336        assert_eq!(snap.max_size, 8);
337    }
338}