mecha10_core/priority_queue/mod.rs
1//! Priority queue for message processing
2//!
3//! Enables quality-of-service (QoS) by prioritizing critical messages over
4//! low-priority ones. Useful for safety-critical systems where some messages
5//! (e.g., emergency stop) must be processed before others (e.g., telemetry).
6//!
7//! # Features
8//!
9//! - **Priority Levels**: Define message importance (Critical, High, Normal, Low)
10//! - **Fair Processing**: Round-robin within same priority level
11//! - **Starvation Prevention**: Ensures low-priority messages eventually process
12//! - **Backpressure Support**: Integrates with bounded channels
13//! - **Zero-Copy**: Messages moved, not copied
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::priority_queue::{PriorityReceiver, Priority};
20//!
21//! # async fn example(ctx: &Context) -> Result<()> {
22//! // Create priority receiver
23//! let mut prio_rx = PriorityReceiver::new();
24//!
25//! // Subscribe to multiple topics with different priorities
26//! let emergency = ctx.subscribe("/safety/emergency").await?;
27//! let commands = ctx.subscribe("/motor/command").await?;
28//! let telemetry = ctx.subscribe("/sensor/telemetry").await?;
29//!
30//! // Add with priorities
31//! prio_rx.add_source(emergency, Priority::Critical);
32//! prio_rx.add_source(commands, Priority::High);
33//! prio_rx.add_source(telemetry, Priority::Low);
34//!
35//! // Process messages in priority order
36//! while let Some((msg, priority)) = prio_rx.recv().await {
37//! match priority {
38//! Priority::Critical => handle_emergency(msg).await?,
39//! Priority::High => handle_command(msg).await?,
40//! _ => handle_telemetry(msg).await?,
41//! }
42//! }
43//! # Ok(())
44//! # }
45//! ```
46
47use crate::context::Receiver;
48use std::collections::VecDeque;
49
50/// Message priority levels
51///
52/// Higher priority messages are processed before lower priority ones.
53/// Within the same priority level, messages are processed FIFO.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
55pub enum Priority {
56 /// Lowest priority - background tasks, non-critical telemetry
57 Low = 0,
58 /// Normal priority - standard operations
59 #[default]
60 Normal = 1,
61 /// High priority - time-sensitive commands
62 High = 2,
63 /// Highest priority - safety-critical, emergency stop
64 Critical = 3,
65}
66
67/// Configuration for priority queue behavior
68#[derive(Debug, Clone)]
69pub struct PriorityQueueConfig {
70 /// Maximum messages to buffer per priority level
71 /// None = unbounded (risk of memory growth)
72 pub max_per_priority: Option<usize>,
73
74 /// Enable starvation prevention
75 /// If true, occasionally process lower priority messages even when
76 /// higher priority messages are available
77 pub prevent_starvation: bool,
78
79 /// Starvation threshold: after processing N high-priority messages,
80 /// process one lower-priority message (if available)
81 pub starvation_threshold: usize,
82}
83
84impl Default for PriorityQueueConfig {
85 fn default() -> Self {
86 Self {
87 max_per_priority: Some(1000),
88 prevent_starvation: true,
89 starvation_threshold: 100,
90 }
91 }
92}
93
94/// Priority receiver that processes messages by priority
95///
96/// Combines multiple receivers and delivers messages in priority order.
97/// Messages with higher priority are processed before lower priority ones.
98pub struct PriorityReceiver<T> {
99 sources: Vec<PrioritySource<T>>,
100 config: PriorityQueueConfig,
101 high_priority_count: usize,
102}
103
104struct PrioritySource<T> {
105 receiver: Receiver<T>,
106 priority: Priority,
107 buffer: VecDeque<T>,
108}
109
110impl<T> PriorityReceiver<T>
111where
112 T: Send + 'static,
113{
114 /// Create a new priority receiver with default configuration
115 pub fn new() -> Self {
116 Self::with_config(PriorityQueueConfig::default())
117 }
118
119 /// Create a new priority receiver with custom configuration
120 pub fn with_config(config: PriorityQueueConfig) -> Self {
121 Self {
122 sources: Vec::new(),
123 config,
124 high_priority_count: 0,
125 }
126 }
127
128 /// Add a message source with the specified priority
129 ///
130 /// # Example
131 ///
132 /// ```rust
133 /// # use mecha10::prelude::*;
134 /// # use mecha10::priority_queue::{PriorityReceiver, Priority};
135 /// # async fn example(ctx: &Context) -> Result<()> {
136 /// let mut prio_rx = PriorityReceiver::new();
137 ///
138 /// let emergency = ctx.subscribe("/safety/emergency").await?;
139 /// prio_rx.add_source(emergency, Priority::Critical);
140 ///
141 /// let telemetry = ctx.subscribe("/sensor/telemetry").await?;
142 /// prio_rx.add_source(telemetry, Priority::Low);
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub fn add_source(&mut self, receiver: Receiver<T>, priority: Priority) {
147 self.sources.push(PrioritySource {
148 receiver,
149 priority,
150 buffer: VecDeque::new(),
151 });
152 }
153
154 /// Receive the next message in priority order
155 ///
156 /// Returns `None` when all sources are closed.
157 /// Returns `Some((message, priority))` with the highest priority message available.
158 ///
159 /// # Starvation Prevention
160 ///
161 /// If configured, this method will occasionally process lower-priority messages
162 /// even when higher-priority messages are available, to prevent starvation.
163 pub async fn recv(&mut self) -> Option<(T, Priority)> {
164 if self.sources.is_empty() {
165 return None;
166 }
167
168 loop {
169 // Try to fill buffers from all sources (non-blocking)
170 self.try_fill_buffers();
171
172 // Check if we should process a lower-priority message for starvation prevention
173 if self.config.prevent_starvation && self.high_priority_count >= self.config.starvation_threshold {
174 // Try to get a lower-priority message
175 if let Some((msg, prio)) = self.take_lower_priority_message() {
176 self.high_priority_count = 0;
177 return Some((msg, prio));
178 }
179 }
180
181 // Get highest priority message from buffers
182 if let Some((msg, prio)) = self.take_highest_priority_message() {
183 if prio >= Priority::High {
184 self.high_priority_count += 1;
185 }
186 return Some((msg, prio));
187 }
188
189 // No messages in buffers, wait for any source to have data
190 if !self.wait_for_message().await {
191 // All sources closed
192 return None;
193 }
194 }
195 }
196
197 /// Try to receive messages from all sources without blocking
198 fn try_fill_buffers(&mut self) {
199 for source in &mut self.sources {
200 // Respect buffer size limits
201 let max_size = self.config.max_per_priority.unwrap_or(usize::MAX);
202
203 while source.buffer.len() < max_size {
204 match source.receiver.try_recv() {
205 Ok(msg) => source.buffer.push_back(msg),
206 Err(_) => break, // Empty or closed
207 }
208 }
209 }
210 }
211
212 /// Wait for at least one message from any source
213 async fn wait_for_message(&mut self) -> bool {
214 if self.sources.is_empty() {
215 return false;
216 }
217
218 // Use tokio::select! to wait on all sources simultaneously
219 // This is efficient and doesn't poll
220 loop {
221 for source in &mut self.sources {
222 // Try each source
223 if let Some(msg) = source.receiver.recv().await {
224 source.buffer.push_back(msg);
225 return true;
226 }
227 }
228
229 // Remove closed sources
230 self.sources.retain(|s| !s.buffer.is_empty() || !s.receiver.is_empty());
231
232 if self.sources.is_empty() {
233 return false;
234 }
235 }
236 }
237
238 /// Take the highest priority message from buffers
239 fn take_highest_priority_message(&mut self) -> Option<(T, Priority)> {
240 // Find source with highest priority that has messages
241 let mut best_idx = None;
242 let mut best_priority = Priority::Low;
243
244 for (idx, source) in self.sources.iter().enumerate() {
245 if !source.buffer.is_empty() && source.priority > best_priority {
246 best_idx = Some(idx);
247 best_priority = source.priority;
248 }
249 }
250
251 best_idx.and_then(|idx| {
252 self.sources[idx]
253 .buffer
254 .pop_front()
255 .map(|msg| (msg, self.sources[idx].priority))
256 })
257 }
258
259 /// Take a lower-priority message for starvation prevention
260 fn take_lower_priority_message(&mut self) -> Option<(T, Priority)> {
261 // Find source with lowest priority that has messages
262 let mut best_idx = None;
263 let mut best_priority = Priority::Critical;
264
265 for (idx, source) in self.sources.iter().enumerate() {
266 if !source.buffer.is_empty() && source.priority < best_priority {
267 best_idx = Some(idx);
268 best_priority = source.priority;
269 }
270 }
271
272 best_idx.and_then(|idx| {
273 self.sources[idx]
274 .buffer
275 .pop_front()
276 .map(|msg| (msg, self.sources[idx].priority))
277 })
278 }
279
280 /// Get statistics about the priority queue
281 pub fn stats(&self) -> PriorityQueueStats {
282 let mut counts_by_priority = [0; 4];
283 let mut total_buffered = 0;
284
285 for source in &self.sources {
286 let idx = source.priority as usize;
287 counts_by_priority[idx] += source.buffer.len();
288 total_buffered += source.buffer.len();
289 }
290
291 PriorityQueueStats {
292 total_buffered,
293 critical_buffered: counts_by_priority[Priority::Critical as usize],
294 high_buffered: counts_by_priority[Priority::High as usize],
295 normal_buffered: counts_by_priority[Priority::Normal as usize],
296 low_buffered: counts_by_priority[Priority::Low as usize],
297 source_count: self.sources.len(),
298 }
299 }
300}
301
302impl<T> Default for PriorityReceiver<T>
303where
304 T: Send + 'static,
305{
306 fn default() -> Self {
307 Self::new()
308 }
309}
310
311/// Statistics about priority queue state
312#[derive(Debug, Clone)]
313pub struct PriorityQueueStats {
314 /// Total messages buffered across all priorities
315 pub total_buffered: usize,
316 /// Messages buffered at Critical priority
317 pub critical_buffered: usize,
318 /// Messages buffered at High priority
319 pub high_buffered: usize,
320 /// Messages buffered at Normal priority
321 pub normal_buffered: usize,
322 /// Messages buffered at Low priority
323 pub low_buffered: usize,
324 /// Number of active sources
325 pub source_count: usize,
326}
327
328impl PriorityQueueStats {
329 /// Check if any priority level is approaching its limit
330 pub fn is_near_limit(&self, config: &PriorityQueueConfig) -> bool {
331 if let Some(max) = config.max_per_priority {
332 let threshold = (max as f32 * 0.8) as usize;
333 self.critical_buffered > threshold
334 || self.high_buffered > threshold
335 || self.normal_buffered > threshold
336 || self.low_buffered > threshold
337 } else {
338 false
339 }
340 }
341}