hojicha_runtime/
priority_queue.rs

1//! Priority-based event queue with backpressure support
2//!
3//! This module provides a priority queue for events that ensures important events
4//! (like user input and quit commands) are processed before less critical events
5//! (like ticks and resize events). It also implements backpressure to prevent
6//! memory exhaustion under high load.
7//!
8//! # Priority Levels
9//!
10//! Events are automatically assigned priorities:
11//! - **High**: Quit, Key events, Suspend/Resume, Process execution
12//! - **Normal**: Mouse events, User messages, Paste events
13//! - **Low**: Tick, Resize, Focus/Blur events
14//!
15//! # Backpressure
16//!
17//! When the queue reaches 80% capacity, backpressure is activated. If the queue
18//! fills completely, lower priority events are dropped in favor of higher priority
19//! ones.
20//!
21//! # Example
22//!
23//! ```
24//! use hojicha_runtime::priority_queue::PriorityEventQueue;
25//! use hojicha_core::event::Event;
26//!
27//! #[derive(Debug, Clone, PartialEq)]
28//! struct TestMsg(u32);
29//!
30//! let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
31//!
32//! // High priority events are processed first
33//! queue.push(Event::Tick).unwrap();                  // Low priority
34//! queue.push(Event::User(TestMsg(42))).unwrap();     // Normal priority  
35//! queue.push(Event::Quit).unwrap();                  // High priority
36//!
37//! assert_eq!(queue.pop(), Some(Event::Quit));                // High first
38//! assert_eq!(queue.pop(), Some(Event::User(TestMsg(42))));   // Then normal
39//! assert_eq!(queue.pop(), Some(Event::Tick));                // Low last
40//! ```
41
42use hojicha_core::core::Message;
43use hojicha_core::event::Event;
44use std::cmp::Ordering;
45use std::collections::BinaryHeap;
46
47/// Priority levels for events in the queue
48///
49/// Lower numeric values indicate higher priority. Events are processed
50/// in priority order, with high priority events processed before normal
51/// and low priority events.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
53pub enum Priority {
54    /// High priority for critical events (Quit, Key events, Suspend/Resume)
55    High = 0,
56    /// Normal priority for user interactions (Mouse, User messages, Paste)
57    Normal = 1,
58    /// Low priority for background events (Tick, Resize, Focus/Blur)
59    Low = 2,
60}
61
62impl Priority {
63    /// Determine the priority level for a given event
64    ///
65    /// This method automatically assigns priority levels based on event type:
66    /// - High: Quit, Key events, Suspend/Resume, Process execution
67    /// - Normal: Mouse events, User messages, Paste events  
68    /// - Low: Tick, Resize, Focus/Blur events
69    pub fn from_event<M: Message>(event: &Event<M>) -> Self {
70        match event {
71            Event::Quit => Priority::High,
72            Event::Key(_) => Priority::High,
73            Event::Mouse(_) => Priority::Normal,
74            Event::User(_) => Priority::Normal,
75            Event::Resize { .. } => Priority::Low,
76            Event::Tick => Priority::Low,
77            Event::Paste(_) => Priority::Normal,
78            Event::Focus | Event::Blur => Priority::Low,
79            Event::Suspend | Event::Resume | Event::ExecProcess => Priority::High,
80        }
81    }
82}
83
84#[derive(Debug)]
85struct PriorityEvent<M: Message> {
86    priority: Priority,
87    sequence: usize,
88    event: Event<M>,
89}
90
91impl<M: Message> PartialEq for PriorityEvent<M> {
92    fn eq(&self, other: &Self) -> bool {
93        self.priority == other.priority && self.sequence == other.sequence
94    }
95}
96
97impl<M: Message> Eq for PriorityEvent<M> {}
98
99impl<M: Message> Ord for PriorityEvent<M> {
100    fn cmp(&self, other: &Self) -> Ordering {
101        // BinaryHeap is a max-heap, so we want High (0) to be greater than Low (2)
102        // Therefore we reverse the comparison
103        match other.priority.cmp(&self.priority) {
104            Ordering::Equal => self.sequence.cmp(&other.sequence),
105            other => other,
106        }
107    }
108}
109
110impl<M: Message> PartialOrd for PriorityEvent<M> {
111    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
112        Some(self.cmp(other))
113    }
114}
115
116/// A priority queue for events with automatic backpressure handling
117///
118/// This queue ensures that high priority events (like user input and quit commands)
119/// are processed before lower priority events (like ticks and resize events).
120/// It implements backpressure to prevent memory exhaustion under high load.
121///
122/// When the queue reaches 80% capacity, backpressure is activated. If the queue
123/// fills completely, lower priority events are dropped in favor of higher priority ones.
124///
125/// # Example
126///
127/// ```ignore
128/// let mut queue = PriorityEventQueue::new(1000);
129///
130/// queue.push(Event::Tick)?;       // Low priority
131/// queue.push(Event::Quit)?;       // High priority
132///
133/// // High priority events are processed first
134/// assert_eq!(queue.pop(), Some(Event::Quit));
135/// assert_eq!(queue.pop(), Some(Event::Tick));
136/// ```
137pub struct PriorityEventQueue<M: Message> {
138    heap: BinaryHeap<PriorityEvent<M>>,
139    sequence_counter: usize,
140    max_size: usize,
141    backpressure_threshold: usize,
142    backpressure_active: bool,
143    dropped_events: usize,
144}
145
146impl<M: Message> PriorityEventQueue<M> {
147    /// Create a new priority event queue with the specified maximum size
148    ///
149    /// # Arguments
150    /// * `max_size` - Maximum number of events the queue can hold
151    ///
152    /// # Example
153    /// ```
154    /// use hojicha_runtime::priority_queue::PriorityEventQueue;
155    ///
156    /// #[derive(Debug, Clone, PartialEq)]
157    /// struct TestMsg;
158    ///
159    /// let queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
160    /// assert_eq!(queue.len(), 0);
161    /// assert_eq!(queue.capacity(), 1000);
162    /// ```
163    pub fn new(max_size: usize) -> Self {
164        Self {
165            heap: BinaryHeap::new(),
166            sequence_counter: 0,
167            max_size,
168            backpressure_threshold: (max_size as f64 * 0.8) as usize,
169            backpressure_active: false,
170            dropped_events: 0,
171        }
172    }
173
174    /// Push an event into the priority queue
175    ///
176    /// Events are automatically prioritized based on their type. If the queue is full,
177    /// lower priority events may be dropped to make room for higher priority ones.
178    ///
179    /// # Arguments
180    /// * `event` - The event to add to the queue
181    ///
182    /// # Returns
183    /// * `Ok(())` if the event was successfully added
184    /// * `Err(event)` if the event was dropped due to queue overflow
185    ///
186    /// # Example
187    /// ```
188    /// use hojicha_runtime::priority_queue::PriorityEventQueue;
189    /// use hojicha_core::event::Event;
190    ///
191    /// #[derive(Debug, Clone, PartialEq)]
192    /// struct TestMsg;
193    ///
194    /// let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
195    /// let result = queue.push(Event::Tick);
196    /// assert!(result.is_ok());
197    /// assert_eq!(queue.len(), 1);
198    /// ```
199    pub fn push(&mut self, event: Event<M>) -> Result<(), Event<M>> {
200        if self.heap.len() >= self.max_size {
201            let priority = Priority::from_event(&event);
202
203            if priority == Priority::High {
204                if let Some(lowest) = self.find_lowest_priority_event() {
205                    self.heap.retain(|e| e.sequence != lowest);
206                    self.dropped_events += 1;
207                } else {
208                    self.dropped_events += 1;
209                    return Err(event);
210                }
211            } else {
212                self.dropped_events += 1;
213                return Err(event);
214            }
215        }
216
217        let priority = Priority::from_event(&event);
218        let priority_event = PriorityEvent {
219            priority,
220            sequence: self.sequence_counter,
221            event,
222        };
223
224        self.sequence_counter += 1;
225        self.heap.push(priority_event);
226
227        if self.heap.len() >= self.backpressure_threshold {
228            self.backpressure_active = true;
229        }
230
231        Ok(())
232    }
233
234    /// Remove and return the highest priority event from the queue
235    ///
236    /// Events are returned in priority order, with high priority events
237    /// returned before normal and low priority events.
238    ///
239    /// # Returns
240    /// * `Some(event)` if there are events in the queue
241    /// * `None` if the queue is empty
242    ///
243    /// # Example
244    /// ```
245    /// use hojicha_runtime::priority_queue::PriorityEventQueue;
246    /// use hojicha_core::event::Event;
247    ///
248    /// #[derive(Debug, Clone, PartialEq)]
249    /// struct TestMsg;
250    ///
251    /// let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
252    /// queue.push(Event::Tick).unwrap();
253    /// queue.push(Event::Quit).unwrap(); // Higher priority
254    ///
255    /// // High priority event comes first
256    /// assert_eq!(queue.pop(), Some(Event::Quit));
257    /// assert_eq!(queue.pop(), Some(Event::Tick));
258    /// assert_eq!(queue.pop(), None);
259    /// ```
260    pub fn pop(&mut self) -> Option<Event<M>> {
261        let result = self.heap.pop().map(|pe| pe.event);
262
263        if self.heap.len() < self.backpressure_threshold {
264            self.backpressure_active = false;
265        }
266
267        result
268    }
269
270    /// Check if the queue is empty
271    ///
272    /// # Returns
273    /// * `true` if the queue contains no events
274    /// * `false` if the queue contains one or more events
275    pub fn is_empty(&self) -> bool {
276        self.heap.is_empty()
277    }
278
279    /// Get the current number of events in the queue
280    ///
281    /// # Returns
282    /// The number of events currently in the queue
283    pub fn len(&self) -> usize {
284        self.heap.len()
285    }
286
287    /// Check if backpressure is currently active
288    ///
289    /// Backpressure is activated when the queue reaches 80% of its capacity.
290    ///
291    /// # Returns
292    /// * `true` if backpressure is active
293    /// * `false` if the queue is below the backpressure threshold
294    pub fn is_backpressure_active(&self) -> bool {
295        self.backpressure_active
296    }
297
298    /// Get the total number of events that have been dropped
299    ///
300    /// Events are dropped when the queue is full and lower priority
301    /// events are evicted to make room for higher priority ones.
302    ///
303    /// # Returns
304    /// The total number of events dropped since queue creation
305    pub fn dropped_events(&self) -> usize {
306        self.dropped_events
307    }
308
309    /// Clear all events from the queue
310    ///
311    /// This removes all events and resets the backpressure state,
312    /// but preserves the dropped event counter and capacity settings.
313    pub fn clear(&mut self) {
314        self.heap.clear();
315        self.backpressure_active = false;
316    }
317
318    fn find_lowest_priority_event(&self) -> Option<usize> {
319        self.heap
320            .iter()
321            .filter(|e| e.priority == Priority::Low)
322            .map(|e| e.sequence)
323            .min()
324    }
325
326    /// Get the current capacity of the queue
327    pub fn capacity(&self) -> usize {
328        self.max_size
329    }
330
331    /// Resize the queue to a new capacity
332    ///
333    /// # Arguments
334    /// * `new_size` - The new maximum size for the queue
335    ///
336    /// # Returns
337    /// * `Ok(())` if resize succeeded
338    /// * `Err(ResizeError)` if the new size is invalid or would cause data loss
339    pub fn resize(&mut self, new_size: usize) -> Result<(), ResizeError> {
340        if new_size == 0 {
341            return Err(ResizeError::InvalidSize("Queue size cannot be zero".into()));
342        }
343
344        let current_len = self.heap.len();
345
346        // If shrinking below current usage, we need to drop events
347        if new_size < current_len {
348            // Calculate how many events to drop
349            let to_drop = current_len - new_size;
350
351            // Collect events sorted by priority (lowest priority first)
352            let mut events: Vec<_> = self.heap.iter().collect();
353            events.sort_by(|a, b| {
354                b.priority
355                    .cmp(&a.priority)
356                    .then(b.sequence.cmp(&a.sequence))
357            });
358
359            // Get sequences of events to drop (lowest priority ones)
360            let drop_sequences: Vec<usize> =
361                events.iter().take(to_drop).map(|e| e.sequence).collect();
362
363            // Drop the events
364            self.heap.retain(|e| !drop_sequences.contains(&e.sequence));
365            self.dropped_events += to_drop;
366        }
367
368        // Update size and thresholds
369        self.max_size = new_size;
370        self.backpressure_threshold = (new_size as f64 * 0.8) as usize;
371
372        // Update backpressure status
373        self.backpressure_active = self.heap.len() >= self.backpressure_threshold;
374
375        Ok(())
376    }
377
378    /// Try to grow the queue by a specified amount
379    pub fn try_grow(&mut self, additional: usize) -> Result<usize, ResizeError> {
380        let new_size = self.max_size.saturating_add(additional);
381        self.resize(new_size)?;
382        Ok(new_size)
383    }
384
385    /// Try to shrink the queue by a specified amount
386    pub fn try_shrink(&mut self, reduction: usize) -> Result<usize, ResizeError> {
387        let new_size = self.max_size.saturating_sub(reduction).max(1);
388        self.resize(new_size)?;
389        Ok(new_size)
390    }
391
392    /// Get current queue statistics for scaling decisions
393    pub fn stats(&self) -> QueueStats {
394        QueueStats {
395            current_size: self.heap.len(),
396            max_size: self.max_size,
397            utilization: self.heap.len() as f64 / self.max_size as f64,
398            backpressure_active: self.backpressure_active,
399            dropped_events: self.dropped_events,
400        }
401    }
402}
403
404/// Error type for resize operations
405#[derive(Debug, Clone)]
406pub enum ResizeError {
407    /// The requested size is invalid (e.g., zero or negative)
408    InvalidSize(String),
409    /// The resize operation would cause high priority events to be dropped
410    WouldDropHighPriorityEvents,
411}
412
413impl std::fmt::Display for ResizeError {
414    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415        match self {
416            ResizeError::InvalidSize(msg) => write!(f, "Invalid size: {msg}"),
417            ResizeError::WouldDropHighPriorityEvents => {
418                write!(f, "Resize would drop high priority events")
419            }
420        }
421    }
422}
423
424impl std::error::Error for ResizeError {}
425
426/// Queue statistics for monitoring and scaling decisions
427#[derive(Debug, Clone)]
428pub struct QueueStats {
429    /// Current number of events in the queue
430    pub current_size: usize,
431    /// Maximum capacity of the queue
432    pub max_size: usize,
433    /// Current utilization as a percentage (0.0 to 1.0)
434    pub utilization: f64,
435    /// Whether backpressure is currently active
436    pub backpressure_active: bool,
437    /// Total number of events dropped
438    pub dropped_events: usize,
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use hojicha_core::event::{Key, KeyEvent};
445
446    #[derive(Debug, Clone, PartialEq)]
447    struct TestMsg(usize);
448
449    #[test]
450    fn test_priority_ordering() {
451        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(10);
452
453        queue.push(Event::Tick).unwrap();
454        queue.push(Event::User(TestMsg(1))).unwrap();
455        queue.push(Event::Quit).unwrap();
456        queue.push(Event::User(TestMsg(2))).unwrap();
457        queue
458            .push(Event::Key(KeyEvent {
459                key: Key::Char('a'),
460                modifiers: crossterm::event::KeyModifiers::empty(),
461            }))
462            .unwrap();
463
464        // Both Quit and Key have High priority, order between them is not guaranteed
465        let first = queue.pop();
466        let second = queue.pop();
467
468        // Check that we got both high priority events first
469        let got_quit = matches!(first, Some(Event::Quit)) || matches!(second, Some(Event::Quit));
470        let got_key = matches!(first, Some(Event::Key(_))) || matches!(second, Some(Event::Key(_)));
471
472        assert!(got_quit, "Expected Quit event in first two pops");
473        assert!(got_key, "Expected Key event in first two pops");
474
475        // Normal priority events - order may vary due to heap implementation
476        let third = queue.pop();
477        let fourth = queue.pop();
478
479        let got_user1 = matches!(third, Some(Event::User(TestMsg(1))))
480            || matches!(fourth, Some(Event::User(TestMsg(1))));
481        let got_user2 = matches!(third, Some(Event::User(TestMsg(2))))
482            || matches!(fourth, Some(Event::User(TestMsg(2))));
483
484        assert!(got_user1, "Expected User(TestMsg(1))");
485        assert!(got_user2, "Expected User(TestMsg(2))");
486        assert_eq!(queue.pop(), Some(Event::Tick));
487        assert_eq!(queue.pop(), None);
488    }
489
490    #[test]
491    fn test_backpressure() {
492        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(5);
493
494        for i in 0..4 {
495            queue.push(Event::User(TestMsg(i))).unwrap();
496        }
497
498        assert!(queue.is_backpressure_active());
499
500        queue.pop();
501        queue.pop();
502
503        assert!(!queue.is_backpressure_active());
504    }
505
506    #[test]
507    fn test_event_dropping() {
508        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(3);
509
510        queue.push(Event::Tick).unwrap();
511        queue.push(Event::User(TestMsg(1))).unwrap();
512        queue.push(Event::User(TestMsg(2))).unwrap();
513
514        let result = queue.push(Event::Tick);
515        assert!(result.is_err());
516        assert_eq!(queue.dropped_events(), 1);
517
518        queue.push(Event::Quit).unwrap();
519        assert_eq!(queue.dropped_events(), 2);
520    }
521}