1use std::fmt;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18pub mod command;
23pub mod error;
24pub mod mpsc;
25pub mod observer;
26pub mod ring;
27pub mod rt_queue;
28pub mod signal;
29pub mod spsc;
30pub mod telemetry;
31pub mod telemetry_block;
32
33pub use command::CommandQueue;
34pub use error::{QueueError, QueueResult};
35pub use mpsc::MpscQueue;
36pub use rt_queue::RtQueue;
37pub use spsc::SpscQueue;
38pub use telemetry_block::TelemetryBlock;
39
40pub use signal::{
42 AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
43 SetParameter, SignalSource,
44};
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum OverflowPolicy {
53 OverwriteOldest,
55 DropNewest,
57 Panic,
59 Block,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum UnderflowPolicy {
66 ReturnNone,
68 Panic,
70}
71
72pub struct QueueStats {
78 pushes: AtomicUsize,
79 pops: AtomicUsize,
80 overflows: AtomicUsize,
81 underflows: AtomicUsize,
82 max_size: AtomicUsize,
83}
84
85impl QueueStats {
86 pub fn new() -> Self {
87 Self::default()
88 }
89
90 pub fn record_push(&self, current_size: usize) {
91 self.pushes.fetch_add(1, Ordering::Relaxed);
92 let prev = self.max_size.load(Ordering::Relaxed);
93 if current_size > prev {
94 let _ = self.max_size.compare_exchange(
95 prev,
96 current_size,
97 Ordering::Relaxed,
98 Ordering::Relaxed,
99 );
100 }
101 }
102
103 pub fn record_pop(&self) {
104 self.pops.fetch_add(1, Ordering::Relaxed);
105 }
106
107 pub fn record_overflow(&self) {
108 self.overflows.fetch_add(1, Ordering::Relaxed);
109 }
110
111 pub fn record_underflow(&self) {
112 self.underflows.fetch_add(1, Ordering::Relaxed);
113 }
114
115 pub fn snapshot(&self) -> QueueStatsSnapshot {
116 QueueStatsSnapshot {
117 pushes: self.pushes.load(Ordering::Relaxed),
118 pops: self.pops.load(Ordering::Relaxed),
119 overflows: self.overflows.load(Ordering::Relaxed),
120 underflows: self.underflows.load(Ordering::Relaxed),
121 max_size: self.max_size.load(Ordering::Relaxed),
122 }
123 }
124}
125
126impl Default for QueueStats {
127 fn default() -> Self {
128 Self {
129 pushes: AtomicUsize::new(0),
130 pops: AtomicUsize::new(0),
131 overflows: AtomicUsize::new(0),
132 underflows: AtomicUsize::new(0),
133 max_size: AtomicUsize::new(0),
134 }
135 }
136}
137
138#[derive(Debug, Clone, Copy, Default)]
140pub struct QueueStatsSnapshot {
141 pub pushes: usize,
143 pub pops: usize,
145 pub overflows: usize,
147 pub underflows: usize,
149 pub max_size: usize,
151}
152
153impl QueueStatsSnapshot {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn merge(&self, other: &Self) -> Self {
161 Self {
162 pushes: self.pushes + other.pushes,
163 pops: self.pops + other.pops,
164 overflows: self.overflows + other.overflows,
165 underflows: self.underflows + other.underflows,
166 max_size: self.max_size.max(other.max_size),
167 }
168 }
169}
170
171impl fmt::Display for QueueStatsSnapshot {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(
174 f,
175 "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
176 self.pushes, self.pops, self.overflows, self.underflows, self.max_size
177 )
178 }
179}
180
181pub trait RtQueueBase<T>: Send + Sync {
191 fn push(&self, value: T) -> QueueResult<()>;
193
194 fn pop(&self) -> Option<T>;
196
197 fn len(&self) -> usize;
199
200 fn capacity(&self) -> usize;
202
203 fn is_empty(&self) -> bool {
205 self.len() == 0
206 }
207
208 fn is_full(&self) -> bool {
210 self.len() == self.capacity()
211 }
212
213 fn clear(&self);
215}
216
217#[inline]
223pub const fn is_power_of_two(n: usize) -> bool {
224 n != 0 && (n & (n - 1)) == 0
225}
226
227#[inline]
229pub const fn next_power_of_two(n: usize) -> usize {
230 let mut n = n - 1;
231 n |= n >> 1;
232 n |= n >> 2;
233 n |= n >> 4;
234 n |= n >> 8;
235 n |= n >> 16;
236 n += 1;
237 n
238}
239
240#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn test_stats_snapshot() {
250 let stats1 = QueueStatsSnapshot {
251 pushes: 10,
252 pops: 5,
253 overflows: 1,
254 underflows: 0,
255 max_size: 8,
256 };
257
258 let stats2 = QueueStatsSnapshot {
259 pushes: 20,
260 pops: 15,
261 overflows: 0,
262 underflows: 2,
263 max_size: 16,
264 };
265
266 let merged = stats1.merge(&stats2);
267 assert_eq!(merged.pushes, 30);
268 assert_eq!(merged.pops, 20);
269 assert_eq!(merged.overflows, 1);
270 assert_eq!(merged.underflows, 2);
271 assert_eq!(merged.max_size, 16);
272 }
273
274 #[test]
275 fn test_power_of_two() {
276 assert!(is_power_of_two(1));
277 assert!(is_power_of_two(2));
278 assert!(is_power_of_two(4));
279 assert!(is_power_of_two(8));
280 assert!(is_power_of_two(16));
281 assert!(!is_power_of_two(3));
282 assert!(!is_power_of_two(5));
283 assert!(!is_power_of_two(6));
284 assert!(!is_power_of_two(7));
285 }
286
287 #[test]
288 fn test_next_power_of_two() {
289 assert_eq!(next_power_of_two(1), 1);
290 assert_eq!(next_power_of_two(2), 2);
291 assert_eq!(next_power_of_two(3), 4);
292 assert_eq!(next_power_of_two(4), 4);
293 assert_eq!(next_power_of_two(5), 8);
294 assert_eq!(next_power_of_two(6), 8);
295 assert_eq!(next_power_of_two(7), 8);
296 assert_eq!(next_power_of_two(8), 8);
297 assert_eq!(next_power_of_two(9), 16);
298 }
299
300 #[test]
301 fn test_queue_stats_record() {
302 let stats = QueueStats::new();
303 stats.record_push(5);
304 stats.record_push(8);
305 stats.record_overflow();
306 stats.record_pop();
307
308 let snap = stats.snapshot();
309 assert_eq!(snap.pushes, 2);
310 assert_eq!(snap.pops, 1);
311 assert_eq!(snap.overflows, 1);
312 assert_eq!(snap.max_size, 8);
313 }
314}