1use std::fmt;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18pub mod command;
24pub mod error;
26pub mod mpsc;
28pub mod observer;
30pub mod ring;
32pub mod rt_queue;
34pub mod signal;
36pub mod spsc;
38pub mod telemetry;
40pub mod telemetry_block;
42
43pub use command::CommandQueue;
44pub use error::{QueueError, QueueResult};
45pub use mpsc::MpscQueue;
46pub use rt_queue::RtQueue;
47pub use spsc::SpscQueue;
48pub use telemetry_block::TelemetryBlock;
49
50pub use signal::{
52 AutomatonCommand, CalibrationKind, CommandEnum, MappingType, SensorCommand, ServoCommand,
53 SetParameter, SignalSource,
54};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum OverflowPolicy {
63 OverwriteOldest,
65 DropNewest,
67 Panic,
69 Block,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum UnderflowPolicy {
76 ReturnNone,
78 Panic,
80}
81
82pub struct QueueStats {
88 pushes: AtomicUsize,
90 pops: AtomicUsize,
92 overflows: AtomicUsize,
94 underflows: AtomicUsize,
96 max_size: AtomicUsize,
98}
99
100impl QueueStats {
101 pub fn new() -> Self {
103 Self::default()
104 }
105
106 pub fn record_push(&self, current_size: usize) {
108 self.pushes.fetch_add(1, Ordering::Relaxed);
109 let prev = self.max_size.load(Ordering::Relaxed);
110 if current_size > prev {
111 let _ = self.max_size.compare_exchange(
112 prev,
113 current_size,
114 Ordering::Relaxed,
115 Ordering::Relaxed,
116 );
117 }
118 }
119
120 pub fn record_pop(&self) {
122 self.pops.fetch_add(1, Ordering::Relaxed);
123 }
124
125 pub fn record_overflow(&self) {
127 self.overflows.fetch_add(1, Ordering::Relaxed);
128 }
129
130 pub fn record_underflow(&self) {
132 self.underflows.fetch_add(1, Ordering::Relaxed);
133 }
134
135 pub fn snapshot(&self) -> QueueStatsSnapshot {
137 QueueStatsSnapshot {
138 pushes: self.pushes.load(Ordering::Relaxed),
139 pops: self.pops.load(Ordering::Relaxed),
140 overflows: self.overflows.load(Ordering::Relaxed),
141 underflows: self.underflows.load(Ordering::Relaxed),
142 max_size: self.max_size.load(Ordering::Relaxed),
143 }
144 }
145}
146
147impl Default for QueueStats {
148 fn default() -> Self {
149 Self {
150 pushes: AtomicUsize::new(0),
151 pops: AtomicUsize::new(0),
152 overflows: AtomicUsize::new(0),
153 underflows: AtomicUsize::new(0),
154 max_size: AtomicUsize::new(0),
155 }
156 }
157}
158
159#[derive(Debug, Clone, Copy, Default)]
161pub struct QueueStatsSnapshot {
162 pub pushes: usize,
164 pub pops: usize,
166 pub overflows: usize,
168 pub underflows: usize,
170 pub max_size: usize,
172}
173
174impl QueueStatsSnapshot {
175 pub fn new() -> Self {
177 Self::default()
178 }
179
180 pub fn merge(&self, other: &Self) -> Self {
182 Self {
183 pushes: self.pushes + other.pushes,
184 pops: self.pops + other.pops,
185 overflows: self.overflows + other.overflows,
186 underflows: self.underflows + other.underflows,
187 max_size: self.max_size.max(other.max_size),
188 }
189 }
190}
191
192impl fmt::Display for QueueStatsSnapshot {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 write!(
195 f,
196 "pushes: {}, pops: {}, overflows: {}, underflows: {}, max_size: {}",
197 self.pushes, self.pops, self.overflows, self.underflows, self.max_size
198 )
199 }
200}
201
202pub trait RtQueueBase<T>: Send + Sync {
212 fn push(&self, value: T) -> QueueResult<()>;
217
218 fn pop(&self) -> Option<T>;
220
221 fn len(&self) -> usize;
223
224 fn capacity(&self) -> usize;
226
227 fn is_empty(&self) -> bool {
229 self.len() == 0
230 }
231
232 fn is_full(&self) -> bool {
234 self.len() == self.capacity()
235 }
236
237 fn clear(&self);
239}
240
241#[inline]
247pub const fn is_power_of_two(n: usize) -> bool {
248 n != 0 && (n & (n - 1)) == 0
249}
250
251#[inline]
256pub const fn next_power_of_two(n: usize) -> usize {
257 let mut n = n - 1;
258 n |= n >> 1;
259 n |= n >> 2;
260 n |= n >> 4;
261 n |= n >> 8;
262 n |= n >> 16;
263 n += 1;
264 n
265}
266
267#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_stats_snapshot() {
277 let stats1 = QueueStatsSnapshot {
278 pushes: 10,
279 pops: 5,
280 overflows: 1,
281 underflows: 0,
282 max_size: 8,
283 };
284
285 let stats2 = QueueStatsSnapshot {
286 pushes: 20,
287 pops: 15,
288 overflows: 0,
289 underflows: 2,
290 max_size: 16,
291 };
292
293 let merged = stats1.merge(&stats2);
294 assert_eq!(merged.pushes, 30);
295 assert_eq!(merged.pops, 20);
296 assert_eq!(merged.overflows, 1);
297 assert_eq!(merged.underflows, 2);
298 assert_eq!(merged.max_size, 16);
299 }
300
301 #[test]
302 fn test_power_of_two() {
303 assert!(is_power_of_two(1));
304 assert!(is_power_of_two(2));
305 assert!(is_power_of_two(4));
306 assert!(is_power_of_two(8));
307 assert!(is_power_of_two(16));
308 assert!(!is_power_of_two(3));
309 assert!(!is_power_of_two(5));
310 assert!(!is_power_of_two(6));
311 assert!(!is_power_of_two(7));
312 }
313
314 #[test]
315 fn test_next_power_of_two() {
316 assert_eq!(next_power_of_two(1), 1);
317 assert_eq!(next_power_of_two(2), 2);
318 assert_eq!(next_power_of_two(3), 4);
319 assert_eq!(next_power_of_two(4), 4);
320 assert_eq!(next_power_of_two(5), 8);
321 assert_eq!(next_power_of_two(6), 8);
322 assert_eq!(next_power_of_two(7), 8);
323 assert_eq!(next_power_of_two(8), 8);
324 assert_eq!(next_power_of_two(9), 16);
325 }
326
327 #[test]
328 fn test_queue_stats_record() {
329 let stats = QueueStats::new();
330 stats.record_push(5);
331 stats.record_push(8);
332 stats.record_overflow();
333 stats.record_pop();
334
335 let snap = stats.snapshot();
336 assert_eq!(snap.pushes, 2);
337 assert_eq!(snap.pops, 1);
338 assert_eq!(snap.overflows, 1);
339 assert_eq!(snap.max_size, 8);
340 }
341}