Skip to main content

rill_core/queues/
mod.rs

1//! # Non-blocking queues for the dual-thread architecture
2//!
3//! This module provides queues for safe data exchange between the
4//! control thread (soft RT) and the signal processing thread (hard RT).
5//!
6//! ## Components
7//!
8//! - [`SpscQueue`](crate::queues::SpscQueue) — Single-producer single-consumer queue (maximum throughput)
9//! - [`RtQueueBase`](crate::queues::RtQueueBase) — Base trait for all queues
10//! - [`QueueError`](crate::queues::QueueError) — Queue operation error type
11//! - [`OverflowPolicy`](crate::queues::OverflowPolicy) — Overflow behaviour policies
12//! - [`UnderflowPolicy`](crate::queues::UnderflowPolicy) — Underflow behaviour policies
13
14use std::fmt;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17// =============================================================================
18// Submodules
19// =============================================================================
20
21/// Command trait for actor message types.
22pub mod command;
23/// Queue error types.
24pub mod error;
25/// Multi-producer single-consumer queue for automation.
26pub mod mpsc;
27/// Lock-free ring buffer for real-time use.
28pub mod ring;
29/// Base real-time queue implementation.
30pub mod rt_queue;
31/// Signal and command types for automation.
32pub mod signal;
33/// Lock-free single-producer single-consumer queue.
34pub mod spsc;
35/// Telemetry data types (future functionality).
36pub mod telemetry;
37/// Telemetry block batching utilities.
38pub mod telemetry_block;
39
40pub use error::{QueueError, QueueResult};
41pub use mpsc::MpscQueue;
42pub use rt_queue::RtQueue;
43pub use spsc::SpscQueue;
44pub use telemetry_block::TelemetryBlock;
45
46// Re-export key signal types
47pub use signal::{
48    AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
49    SetParameter, SignalOrigin,
50};
51
52// =============================================================================
53// Behaviour policies
54// =============================================================================
55
56/// Overflow behaviour policy for bounded queues.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum OverflowPolicy {
59    /// Overwrite the oldest element (ring-buffer behaviour).
60    OverwriteOldest,
61    /// Discard the newest element (drop on full).
62    DropNewest,
63    /// Panic on overflow (debug only).
64    Panic,
65    /// Block the producer (not safe for RT threads).
66    Block,
67}
68
69/// Underflow behaviour policy for bounded queues.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum UnderflowPolicy {
72    /// Return `None` on empty.
73    ReturnNone,
74    /// Panic on underflow (debug only).
75    Panic,
76}
77
78// =============================================================================
79// Queue statistics
80// =============================================================================
81
82/// Live queue statistics collected inside the queue.
83pub struct QueueStats {
84    /// Total number of successful push operations.
85    pushes: AtomicUsize,
86    /// Total number of successful pop operations.
87    pops: AtomicUsize,
88    /// Total number of overflow events.
89    overflows: AtomicUsize,
90    /// Total number of underflow events.
91    underflows: AtomicUsize,
92    /// Maximum observed queue size.
93    max_size: AtomicUsize,
94}
95
96impl QueueStats {
97    /// Create a new empty statistics counter.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Record a push operation and update the max size if needed.
103    pub fn record_push(&self, current_size: usize) {
104        self.pushes.fetch_add(1, Ordering::Relaxed);
105        let prev = self.max_size.load(Ordering::Relaxed);
106        if current_size > prev {
107            let _ = self.max_size.compare_exchange(
108                prev,
109                current_size,
110                Ordering::Relaxed,
111                Ordering::Relaxed,
112            );
113        }
114    }
115
116    /// Record a pop operation.
117    pub fn record_pop(&self) {
118        self.pops.fetch_add(1, Ordering::Relaxed);
119    }
120
121    /// Record an overflow event.
122    pub fn record_overflow(&self) {
123        self.overflows.fetch_add(1, Ordering::Relaxed);
124    }
125
126    /// Record an underflow event.
127    pub fn record_underflow(&self) {
128        self.underflows.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Take an atomic snapshot of the current statistics.
132    pub fn snapshot(&self) -> QueueStatsSnapshot {
133        QueueStatsSnapshot {
134            pushes: self.pushes.load(Ordering::Relaxed),
135            pops: self.pops.load(Ordering::Relaxed),
136            overflows: self.overflows.load(Ordering::Relaxed),
137            underflows: self.underflows.load(Ordering::Relaxed),
138            max_size: self.max_size.load(Ordering::Relaxed),
139        }
140    }
141}
142
143impl Default for QueueStats {
144    fn default() -> Self {
145        Self {
146            pushes: AtomicUsize::new(0),
147            pops: AtomicUsize::new(0),
148            overflows: AtomicUsize::new(0),
149            underflows: AtomicUsize::new(0),
150            max_size: AtomicUsize::new(0),
151        }
152    }
153}
154
155/// Point-in-time snapshot of queue statistics.
156#[derive(Debug, Clone, Copy, Default)]
157pub struct QueueStatsSnapshot {
158    /// Number of successful push operations.
159    pub pushes: usize,
160    /// Number of successful pop operations.
161    pub pops: usize,
162    /// Number of overflow events.
163    pub overflows: usize,
164    /// Number of underflow events.
165    pub underflows: usize,
166    /// Maximum observed queue size.
167    pub max_size: usize,
168}
169
170impl QueueStatsSnapshot {
171    /// Create a new empty snapshot.
172    pub fn new() -> Self {
173        Self::default()
174    }
175
176    /// Merge two snapshots by summing counts and taking the max size.
177    pub fn merge(&self, other: &Self) -> Self {
178        Self {
179            pushes: self.pushes + other.pushes,
180            pops: self.pops + other.pops,
181            overflows: self.overflows + other.overflows,
182            underflows: self.underflows + other.underflows,
183            max_size: self.max_size.max(other.max_size),
184        }
185    }
186}
187
188impl fmt::Display for QueueStatsSnapshot {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        write!(
191            f,
192            "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
193            self.pushes, self.pops, self.overflows, self.underflows, self.max_size
194        )
195    }
196}
197
198// =============================================================================
199// Base trait for all queues
200// =============================================================================
201
202/// Base trait for all real-time safe queues.
203///
204/// Implementations must be:
205/// - Lock-free (no mutexes)
206/// - RT-safe (no allocations, no blocking)
207pub trait RtQueueBase<T>: Send + Sync {
208    /// Push a value into the queue.
209    ///
210    /// # Errors
211    /// Returns `QueueFull` if the queue is at capacity.
212    fn push(&self, value: T) -> QueueResult<()>;
213
214    /// Pop a value from the queue, or `None` if empty.
215    fn pop(&self) -> Option<T>;
216
217    /// Current number of elements in the queue.
218    fn len(&self) -> usize;
219
220    /// Maximum capacity of the queue.
221    fn capacity(&self) -> usize;
222
223    /// Return true if the queue is empty.
224    fn is_empty(&self) -> bool {
225        self.len() == 0
226    }
227
228    /// Return true if the queue is full.
229    fn is_full(&self) -> bool {
230        self.len() == self.capacity()
231    }
232
233    /// Clear all elements from the queue.
234    fn clear(&self);
235}
236
237// =============================================================================
238// Helper functions
239// =============================================================================
240
241/// Return true if `n` is a power of two.
242#[inline]
243pub const fn is_power_of_two(n: usize) -> bool {
244    n != 0 && (n & (n - 1)) == 0
245}
246
247/// Compute the next power of two greater than or equal to `n`.
248///
249/// # Panics
250/// Panics when `n` is 0.
251#[inline]
252pub const fn next_power_of_two(n: usize) -> usize {
253    let mut n = n - 1;
254    n |= n >> 1;
255    n |= n >> 2;
256    n |= n >> 4;
257    n |= n >> 8;
258    n |= n >> 16;
259    n += 1;
260    n
261}
262
263// =============================================================================
264// Tests
265// =============================================================================
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_stats_snapshot() {
273        let stats1 = QueueStatsSnapshot {
274            pushes: 10,
275            pops: 5,
276            overflows: 1,
277            underflows: 0,
278            max_size: 8,
279        };
280
281        let stats2 = QueueStatsSnapshot {
282            pushes: 20,
283            pops: 15,
284            overflows: 0,
285            underflows: 2,
286            max_size: 16,
287        };
288
289        let merged = stats1.merge(&stats2);
290        assert_eq!(merged.pushes, 30);
291        assert_eq!(merged.pops, 20);
292        assert_eq!(merged.overflows, 1);
293        assert_eq!(merged.underflows, 2);
294        assert_eq!(merged.max_size, 16);
295    }
296
297    #[test]
298    fn test_power_of_two() {
299        assert!(is_power_of_two(1));
300        assert!(is_power_of_two(2));
301        assert!(is_power_of_two(4));
302        assert!(is_power_of_two(8));
303        assert!(is_power_of_two(16));
304        assert!(!is_power_of_two(3));
305        assert!(!is_power_of_two(5));
306        assert!(!is_power_of_two(6));
307        assert!(!is_power_of_two(7));
308    }
309
310    #[test]
311    fn test_next_power_of_two() {
312        assert_eq!(next_power_of_two(1), 1);
313        assert_eq!(next_power_of_two(2), 2);
314        assert_eq!(next_power_of_two(3), 4);
315        assert_eq!(next_power_of_two(4), 4);
316        assert_eq!(next_power_of_two(5), 8);
317        assert_eq!(next_power_of_two(6), 8);
318        assert_eq!(next_power_of_two(7), 8);
319        assert_eq!(next_power_of_two(8), 8);
320        assert_eq!(next_power_of_two(9), 16);
321    }
322
323    #[test]
324    fn test_queue_stats_record() {
325        let stats = QueueStats::new();
326        stats.record_push(5);
327        stats.record_push(8);
328        stats.record_overflow();
329        stats.record_pop();
330
331        let snap = stats.snapshot();
332        assert_eq!(snap.pushes, 2);
333        assert_eq!(snap.pops, 1);
334        assert_eq!(snap.overflows, 1);
335        assert_eq!(snap.max_size, 8);
336    }
337}