1use std::fmt;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17pub mod command;
23pub mod control_event;
25pub mod error;
27pub mod mpsc;
29pub mod ring;
31pub mod rt_queue;
33pub mod signal;
34pub mod spsc;
36pub mod telemetry;
38pub mod telemetry_block;
40
41pub use error::{QueueError, QueueResult};
42pub use mpsc::MpscQueue;
43pub use rt_queue::RtQueue;
44pub use spsc::SpscQueue;
45pub use telemetry_block::TelemetryBlock;
46
47pub use signal::{
49 AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
50 SetParameter, SignalOrigin,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum OverflowPolicy {
60 OverwriteOldest,
62 DropNewest,
64 Panic,
66 Block,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum UnderflowPolicy {
73 ReturnNone,
75 Panic,
77}
78
79pub struct QueueStats {
85 pushes: AtomicUsize,
87 pops: AtomicUsize,
89 overflows: AtomicUsize,
91 underflows: AtomicUsize,
93 max_size: AtomicUsize,
95}
96
97impl QueueStats {
98 pub fn new() -> Self {
100 Self::default()
101 }
102
103 pub fn record_push(&self, current_size: usize) {
105 self.pushes.fetch_add(1, Ordering::Relaxed);
106 let prev = self.max_size.load(Ordering::Relaxed);
107 if current_size > prev {
108 let _ = self.max_size.compare_exchange(
109 prev,
110 current_size,
111 Ordering::Relaxed,
112 Ordering::Relaxed,
113 );
114 }
115 }
116
117 pub fn record_pop(&self) {
119 self.pops.fetch_add(1, Ordering::Relaxed);
120 }
121
122 pub fn record_overflow(&self) {
124 self.overflows.fetch_add(1, Ordering::Relaxed);
125 }
126
127 pub fn record_underflow(&self) {
129 self.underflows.fetch_add(1, Ordering::Relaxed);
130 }
131
132 pub fn snapshot(&self) -> QueueStatsSnapshot {
134 QueueStatsSnapshot {
135 pushes: self.pushes.load(Ordering::Relaxed),
136 pops: self.pops.load(Ordering::Relaxed),
137 overflows: self.overflows.load(Ordering::Relaxed),
138 underflows: self.underflows.load(Ordering::Relaxed),
139 max_size: self.max_size.load(Ordering::Relaxed),
140 }
141 }
142}
143
144impl Default for QueueStats {
145 fn default() -> Self {
146 Self {
147 pushes: AtomicUsize::new(0),
148 pops: AtomicUsize::new(0),
149 overflows: AtomicUsize::new(0),
150 underflows: AtomicUsize::new(0),
151 max_size: AtomicUsize::new(0),
152 }
153 }
154}
155
156#[derive(Debug, Clone, Copy, Default)]
158pub struct QueueStatsSnapshot {
159 pub pushes: usize,
161 pub pops: usize,
163 pub overflows: usize,
165 pub underflows: usize,
167 pub max_size: usize,
169}
170
171impl QueueStatsSnapshot {
172 pub fn new() -> Self {
174 Self::default()
175 }
176
177 pub fn merge(&self, other: &Self) -> Self {
179 Self {
180 pushes: self.pushes + other.pushes,
181 pops: self.pops + other.pops,
182 overflows: self.overflows + other.overflows,
183 underflows: self.underflows + other.underflows,
184 max_size: self.max_size.max(other.max_size),
185 }
186 }
187}
188
189impl fmt::Display for QueueStatsSnapshot {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 write!(
192 f,
193 "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
194 self.pushes, self.pops, self.overflows, self.underflows, self.max_size
195 )
196 }
197}
198
199pub trait RtQueueBase<T>: Send + Sync {
209 fn push(&self, value: T) -> QueueResult<()>;
214
215 fn pop(&self) -> Option<T>;
217
218 fn len(&self) -> usize;
220
221 fn capacity(&self) -> usize;
223
224 fn is_empty(&self) -> bool {
226 self.len() == 0
227 }
228
229 fn is_full(&self) -> bool {
231 self.len() == self.capacity()
232 }
233
234 fn clear(&self);
236}
237
238#[inline]
244pub const fn is_power_of_two(n: usize) -> bool {
245 n != 0 && (n & (n - 1)) == 0
246}
247
248#[inline]
253pub const fn next_power_of_two(n: usize) -> usize {
254 let mut n = n - 1;
255 n |= n >> 1;
256 n |= n >> 2;
257 n |= n >> 4;
258 n |= n >> 8;
259 n |= n >> 16;
260 n += 1;
261 n
262}
263
264#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn test_stats_snapshot() {
274 let stats1 = QueueStatsSnapshot {
275 pushes: 10,
276 pops: 5,
277 overflows: 1,
278 underflows: 0,
279 max_size: 8,
280 };
281
282 let stats2 = QueueStatsSnapshot {
283 pushes: 20,
284 pops: 15,
285 overflows: 0,
286 underflows: 2,
287 max_size: 16,
288 };
289
290 let merged = stats1.merge(&stats2);
291 assert_eq!(merged.pushes, 30);
292 assert_eq!(merged.pops, 20);
293 assert_eq!(merged.overflows, 1);
294 assert_eq!(merged.underflows, 2);
295 assert_eq!(merged.max_size, 16);
296 }
297
298 #[test]
299 fn test_power_of_two() {
300 assert!(is_power_of_two(1));
301 assert!(is_power_of_two(2));
302 assert!(is_power_of_two(4));
303 assert!(is_power_of_two(8));
304 assert!(is_power_of_two(16));
305 assert!(!is_power_of_two(3));
306 assert!(!is_power_of_two(5));
307 assert!(!is_power_of_two(6));
308 assert!(!is_power_of_two(7));
309 }
310
311 #[test]
312 fn test_next_power_of_two() {
313 assert_eq!(next_power_of_two(1), 1);
314 assert_eq!(next_power_of_two(2), 2);
315 assert_eq!(next_power_of_two(3), 4);
316 assert_eq!(next_power_of_two(4), 4);
317 assert_eq!(next_power_of_two(5), 8);
318 assert_eq!(next_power_of_two(6), 8);
319 assert_eq!(next_power_of_two(7), 8);
320 assert_eq!(next_power_of_two(8), 8);
321 assert_eq!(next_power_of_two(9), 16);
322 }
323
324 #[test]
325 fn test_queue_stats_record() {
326 let stats = QueueStats::new();
327 stats.record_push(5);
328 stats.record_push(8);
329 stats.record_overflow();
330 stats.record_pop();
331
332 let snap = stats.snapshot();
333 assert_eq!(snap.pushes, 2);
334 assert_eq!(snap.pops, 1);
335 assert_eq!(snap.overflows, 1);
336 assert_eq!(snap.max_size, 8);
337 }
338}