mecha10_core/priority_queue/
mod.rs

1//! Priority queue for message processing
2//!
3//! Enables quality-of-service (QoS) by prioritizing critical messages over
4//! low-priority ones. Useful for safety-critical systems where some messages
5//! (e.g., emergency stop) must be processed before others (e.g., telemetry).
6//!
7//! # Features
8//!
9//! - **Priority Levels**: Define message importance (Critical, High, Normal, Low)
10//! - **Fair Processing**: Round-robin within same priority level
11//! - **Starvation Prevention**: Ensures low-priority messages eventually process
12//! - **Backpressure Support**: Integrates with bounded channels
13//! - **Zero-Copy**: Messages moved, not copied
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::priority_queue::{PriorityReceiver, Priority};
20//!
21//! # async fn example(ctx: &Context) -> Result<()> {
22//! // Create priority receiver
23//! let mut prio_rx = PriorityReceiver::new();
24//!
25//! // Subscribe to multiple topics with different priorities
26//! let emergency = ctx.subscribe("/safety/emergency").await?;
27//! let commands = ctx.subscribe("/motor/command").await?;
28//! let telemetry = ctx.subscribe("/sensor/telemetry").await?;
29//!
30//! // Add with priorities
31//! prio_rx.add_source(emergency, Priority::Critical);
32//! prio_rx.add_source(commands, Priority::High);
33//! prio_rx.add_source(telemetry, Priority::Low);
34//!
35//! // Process messages in priority order
36//! while let Some((msg, priority)) = prio_rx.recv().await {
37//!     match priority {
38//!         Priority::Critical => handle_emergency(msg).await?,
39//!         Priority::High => handle_command(msg).await?,
40//!         _ => handle_telemetry(msg).await?,
41//!     }
42//! }
43//! # Ok(())
44//! # }
45//! ```
46
47use crate::context::Receiver;
48use std::collections::VecDeque;
49
50/// Message priority levels
51///
52/// Higher priority messages are processed before lower priority ones.
53/// Within the same priority level, messages are processed FIFO.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
55pub enum Priority {
56    /// Lowest priority - background tasks, non-critical telemetry
57    Low = 0,
58    /// Normal priority - standard operations
59    #[default]
60    Normal = 1,
61    /// High priority - time-sensitive commands
62    High = 2,
63    /// Highest priority - safety-critical, emergency stop
64    Critical = 3,
65}
66
67/// Configuration for priority queue behavior
68#[derive(Debug, Clone)]
69pub struct PriorityQueueConfig {
70    /// Maximum messages to buffer per priority level
71    /// None = unbounded (risk of memory growth)
72    pub max_per_priority: Option<usize>,
73
74    /// Enable starvation prevention
75    /// If true, occasionally process lower priority messages even when
76    /// higher priority messages are available
77    pub prevent_starvation: bool,
78
79    /// Starvation threshold: after processing N high-priority messages,
80    /// process one lower-priority message (if available)
81    pub starvation_threshold: usize,
82}
83
84impl Default for PriorityQueueConfig {
85    fn default() -> Self {
86        Self {
87            max_per_priority: Some(1000),
88            prevent_starvation: true,
89            starvation_threshold: 100,
90        }
91    }
92}
93
94/// Priority receiver that processes messages by priority
95///
96/// Combines multiple receivers and delivers messages in priority order.
97/// Messages with higher priority are processed before lower priority ones.
98pub struct PriorityReceiver<T> {
99    sources: Vec<PrioritySource<T>>,
100    config: PriorityQueueConfig,
101    high_priority_count: usize,
102}
103
104struct PrioritySource<T> {
105    receiver: Receiver<T>,
106    priority: Priority,
107    buffer: VecDeque<T>,
108}
109
110impl<T> PriorityReceiver<T>
111where
112    T: Send + 'static,
113{
114    /// Create a new priority receiver with default configuration
115    pub fn new() -> Self {
116        Self::with_config(PriorityQueueConfig::default())
117    }
118
119    /// Create a new priority receiver with custom configuration
120    pub fn with_config(config: PriorityQueueConfig) -> Self {
121        Self {
122            sources: Vec::new(),
123            config,
124            high_priority_count: 0,
125        }
126    }
127
128    /// Add a message source with the specified priority
129    ///
130    /// # Example
131    ///
132    /// ```rust
133    /// # use mecha10::prelude::*;
134    /// # use mecha10::priority_queue::{PriorityReceiver, Priority};
135    /// # async fn example(ctx: &Context) -> Result<()> {
136    /// let mut prio_rx = PriorityReceiver::new();
137    ///
138    /// let emergency = ctx.subscribe("/safety/emergency").await?;
139    /// prio_rx.add_source(emergency, Priority::Critical);
140    ///
141    /// let telemetry = ctx.subscribe("/sensor/telemetry").await?;
142    /// prio_rx.add_source(telemetry, Priority::Low);
143    /// # Ok(())
144    /// # }
145    /// ```
146    pub fn add_source(&mut self, receiver: Receiver<T>, priority: Priority) {
147        self.sources.push(PrioritySource {
148            receiver,
149            priority,
150            buffer: VecDeque::new(),
151        });
152    }
153
154    /// Receive the next message in priority order
155    ///
156    /// Returns `None` when all sources are closed.
157    /// Returns `Some((message, priority))` with the highest priority message available.
158    ///
159    /// # Starvation Prevention
160    ///
161    /// If configured, this method will occasionally process lower-priority messages
162    /// even when higher-priority messages are available, to prevent starvation.
163    pub async fn recv(&mut self) -> Option<(T, Priority)> {
164        if self.sources.is_empty() {
165            return None;
166        }
167
168        loop {
169            // Try to fill buffers from all sources (non-blocking)
170            self.try_fill_buffers();
171
172            // Check if we should process a lower-priority message for starvation prevention
173            if self.config.prevent_starvation && self.high_priority_count >= self.config.starvation_threshold {
174                // Try to get a lower-priority message
175                if let Some((msg, prio)) = self.take_lower_priority_message() {
176                    self.high_priority_count = 0;
177                    return Some((msg, prio));
178                }
179            }
180
181            // Get highest priority message from buffers
182            if let Some((msg, prio)) = self.take_highest_priority_message() {
183                if prio >= Priority::High {
184                    self.high_priority_count += 1;
185                }
186                return Some((msg, prio));
187            }
188
189            // No messages in buffers, wait for any source to have data
190            if !self.wait_for_message().await {
191                // All sources closed
192                return None;
193            }
194        }
195    }
196
197    /// Try to receive messages from all sources without blocking
198    fn try_fill_buffers(&mut self) {
199        for source in &mut self.sources {
200            // Respect buffer size limits
201            let max_size = self.config.max_per_priority.unwrap_or(usize::MAX);
202
203            while source.buffer.len() < max_size {
204                match source.receiver.try_recv() {
205                    Ok(msg) => source.buffer.push_back(msg),
206                    Err(_) => break, // Empty or closed
207                }
208            }
209        }
210    }
211
212    /// Wait for at least one message from any source
213    async fn wait_for_message(&mut self) -> bool {
214        if self.sources.is_empty() {
215            return false;
216        }
217
218        // Use tokio::select! to wait on all sources simultaneously
219        // This is efficient and doesn't poll
220        loop {
221            for source in &mut self.sources {
222                // Try each source
223                if let Some(msg) = source.receiver.recv().await {
224                    source.buffer.push_back(msg);
225                    return true;
226                }
227            }
228
229            // Remove closed sources
230            self.sources.retain(|s| !s.buffer.is_empty() || !s.receiver.is_empty());
231
232            if self.sources.is_empty() {
233                return false;
234            }
235        }
236    }
237
238    /// Take the highest priority message from buffers
239    fn take_highest_priority_message(&mut self) -> Option<(T, Priority)> {
240        // Find source with highest priority that has messages
241        let mut best_idx = None;
242        let mut best_priority = Priority::Low;
243
244        for (idx, source) in self.sources.iter().enumerate() {
245            if !source.buffer.is_empty() && source.priority > best_priority {
246                best_idx = Some(idx);
247                best_priority = source.priority;
248            }
249        }
250
251        best_idx.and_then(|idx| {
252            self.sources[idx]
253                .buffer
254                .pop_front()
255                .map(|msg| (msg, self.sources[idx].priority))
256        })
257    }
258
259    /// Take a lower-priority message for starvation prevention
260    fn take_lower_priority_message(&mut self) -> Option<(T, Priority)> {
261        // Find source with lowest priority that has messages
262        let mut best_idx = None;
263        let mut best_priority = Priority::Critical;
264
265        for (idx, source) in self.sources.iter().enumerate() {
266            if !source.buffer.is_empty() && source.priority < best_priority {
267                best_idx = Some(idx);
268                best_priority = source.priority;
269            }
270        }
271
272        best_idx.and_then(|idx| {
273            self.sources[idx]
274                .buffer
275                .pop_front()
276                .map(|msg| (msg, self.sources[idx].priority))
277        })
278    }
279
280    /// Get statistics about the priority queue
281    pub fn stats(&self) -> PriorityQueueStats {
282        let mut counts_by_priority = [0; 4];
283        let mut total_buffered = 0;
284
285        for source in &self.sources {
286            let idx = source.priority as usize;
287            counts_by_priority[idx] += source.buffer.len();
288            total_buffered += source.buffer.len();
289        }
290
291        PriorityQueueStats {
292            total_buffered,
293            critical_buffered: counts_by_priority[Priority::Critical as usize],
294            high_buffered: counts_by_priority[Priority::High as usize],
295            normal_buffered: counts_by_priority[Priority::Normal as usize],
296            low_buffered: counts_by_priority[Priority::Low as usize],
297            source_count: self.sources.len(),
298        }
299    }
300}
301
302impl<T> Default for PriorityReceiver<T>
303where
304    T: Send + 'static,
305{
306    fn default() -> Self {
307        Self::new()
308    }
309}
310
311/// Statistics about priority queue state
312#[derive(Debug, Clone)]
313pub struct PriorityQueueStats {
314    /// Total messages buffered across all priorities
315    pub total_buffered: usize,
316    /// Messages buffered at Critical priority
317    pub critical_buffered: usize,
318    /// Messages buffered at High priority
319    pub high_buffered: usize,
320    /// Messages buffered at Normal priority
321    pub normal_buffered: usize,
322    /// Messages buffered at Low priority
323    pub low_buffered: usize,
324    /// Number of active sources
325    pub source_count: usize,
326}
327
328impl PriorityQueueStats {
329    /// Check if any priority level is approaching its limit
330    pub fn is_near_limit(&self, config: &PriorityQueueConfig) -> bool {
331        if let Some(max) = config.max_per_priority {
332            let threshold = (max as f32 * 0.8) as usize;
333            self.critical_buffered > threshold
334                || self.high_buffered > threshold
335                || self.normal_buffered > threshold
336                || self.low_buffered > threshold
337        } else {
338            false
339        }
340    }
341}