hojicha_runtime/
priority_queue.rs

1//! Priority-based event queue with backpressure support
2//!
3//! This module provides a priority queue for events that ensures important events
4//! (like user input and quit commands) are processed before less critical events
5//! (like ticks and resize events). It also implements backpressure to prevent
6//! memory exhaustion under high load.
7//!
8//! # Priority Levels
9//!
10//! Events are automatically assigned priorities:
11//! - **High**: Quit, Key events, Suspend/Resume, Process execution
12//! - **Normal**: Mouse events, User messages, Paste events
13//! - **Low**: Tick, Resize, Focus/Blur events
14//!
15//! # Backpressure
16//!
17//! When the queue reaches 80% capacity, backpressure is activated. If the queue
18//! fills completely, lower priority events are dropped in favor of higher priority
19//! ones.
20//!
21//! # Example
22//!
23//! ```ignore
24//! use hojicha::priority_queue::PriorityEventQueue;
25//!
26//! let mut queue = PriorityEventQueue::new(1000);
27//!
28//! // High priority events are processed first
29//! queue.push(Event::Tick)?;           // Low priority
30//! queue.push(Event::User(msg))?;      // Normal priority  
31//! queue.push(Event::Quit)?;           // High priority
32//!
33//! assert_eq!(queue.pop(), Some(Event::Quit));  // High first
34//! assert_eq!(queue.pop(), Some(Event::User(msg))); // Then normal
35//! assert_eq!(queue.pop(), Some(Event::Tick));  // Low last
36//! ```
37
38use hojicha_core::core::Message;
39use hojicha_core::event::Event;
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42
43/// Priority levels for events in the queue
44///
45/// Lower numeric values indicate higher priority. Events are processed
46/// in priority order, with high priority events processed before normal
47/// and low priority events.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum Priority {
50    /// High priority for critical events (Quit, Key events, Suspend/Resume)
51    High = 0,
52    /// Normal priority for user interactions (Mouse, User messages, Paste)
53    Normal = 1,
54    /// Low priority for background events (Tick, Resize, Focus/Blur)
55    Low = 2,
56}
57
58impl Priority {
59    /// Determine the priority level for a given event
60    ///
61    /// This method automatically assigns priority levels based on event type:
62    /// - High: Quit, Key events, Suspend/Resume, Process execution
63    /// - Normal: Mouse events, User messages, Paste events  
64    /// - Low: Tick, Resize, Focus/Blur events
65    pub fn from_event<M: Message>(event: &Event<M>) -> Self {
66        match event {
67            Event::Quit => Priority::High,
68            Event::Key(_) => Priority::High,
69            Event::Mouse(_) => Priority::Normal,
70            Event::User(_) => Priority::Normal,
71            Event::Resize { .. } => Priority::Low,
72            Event::Tick => Priority::Low,
73            Event::Paste(_) => Priority::Normal,
74            Event::Focus | Event::Blur => Priority::Low,
75            Event::Suspend | Event::Resume | Event::ExecProcess => Priority::High,
76        }
77    }
78}
79
80#[derive(Debug)]
81struct PriorityEvent<M: Message> {
82    priority: Priority,
83    sequence: usize,
84    event: Event<M>,
85}
86
87impl<M: Message> PartialEq for PriorityEvent<M> {
88    fn eq(&self, other: &Self) -> bool {
89        self.priority == other.priority && self.sequence == other.sequence
90    }
91}
92
93impl<M: Message> Eq for PriorityEvent<M> {}
94
95impl<M: Message> Ord for PriorityEvent<M> {
96    fn cmp(&self, other: &Self) -> Ordering {
97        // BinaryHeap is a max-heap, so we want High (0) to be greater than Low (2)
98        // Therefore we reverse the comparison
99        match other.priority.cmp(&self.priority) {
100            Ordering::Equal => self.sequence.cmp(&other.sequence),
101            other => other,
102        }
103    }
104}
105
106impl<M: Message> PartialOrd for PriorityEvent<M> {
107    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108        Some(self.cmp(other))
109    }
110}
111
112/// A priority queue for events with automatic backpressure handling
113///
114/// This queue ensures that high priority events (like user input and quit commands)
115/// are processed before lower priority events (like ticks and resize events).
116/// It implements backpressure to prevent memory exhaustion under high load.
117///
118/// When the queue reaches 80% capacity, backpressure is activated. If the queue
119/// fills completely, lower priority events are dropped in favor of higher priority ones.
120///
121/// # Example
122///
123/// ```ignore
124/// let mut queue = PriorityEventQueue::new(1000);
125///
126/// queue.push(Event::Tick)?;       // Low priority
127/// queue.push(Event::Quit)?;       // High priority
128///
129/// // High priority events are processed first
130/// assert_eq!(queue.pop(), Some(Event::Quit));
131/// assert_eq!(queue.pop(), Some(Event::Tick));
132/// ```
133pub struct PriorityEventQueue<M: Message> {
134    heap: BinaryHeap<PriorityEvent<M>>,
135    sequence_counter: usize,
136    max_size: usize,
137    backpressure_threshold: usize,
138    backpressure_active: bool,
139    dropped_events: usize,
140}
141
142impl<M: Message> PriorityEventQueue<M> {
143    /// Create a new priority event queue with the specified maximum size
144    ///
145    /// # Arguments
146    /// * `max_size` - Maximum number of events the queue can hold
147    ///
148    /// # Example
149    /// ```ignore
150    /// let queue = PriorityEventQueue::new(1000);
151    /// ```
152    pub fn new(max_size: usize) -> Self {
153        Self {
154            heap: BinaryHeap::new(),
155            sequence_counter: 0,
156            max_size,
157            backpressure_threshold: (max_size as f64 * 0.8) as usize,
158            backpressure_active: false,
159            dropped_events: 0,
160        }
161    }
162
163    /// Push an event into the priority queue
164    ///
165    /// Events are automatically prioritized based on their type. If the queue is full,
166    /// lower priority events may be dropped to make room for higher priority ones.
167    ///
168    /// # Arguments
169    /// * `event` - The event to add to the queue
170    ///
171    /// # Returns
172    /// * `Ok(())` if the event was successfully added
173    /// * `Err(event)` if the event was dropped due to queue overflow
174    ///
175    /// # Example
176    /// ```ignore
177    /// if let Err(dropped) = queue.push(Event::Tick) {
178    ///     println!("Event was dropped: {:?}", dropped);
179    /// }
180    /// ```
181    pub fn push(&mut self, event: Event<M>) -> Result<(), Event<M>> {
182        if self.heap.len() >= self.max_size {
183            let priority = Priority::from_event(&event);
184
185            if priority == Priority::High {
186                if let Some(lowest) = self.find_lowest_priority_event() {
187                    self.heap.retain(|e| e.sequence != lowest);
188                    self.dropped_events += 1;
189                } else {
190                    self.dropped_events += 1;
191                    return Err(event);
192                }
193            } else {
194                self.dropped_events += 1;
195                return Err(event);
196            }
197        }
198
199        let priority = Priority::from_event(&event);
200        let priority_event = PriorityEvent {
201            priority,
202            sequence: self.sequence_counter,
203            event,
204        };
205
206        self.sequence_counter += 1;
207        self.heap.push(priority_event);
208
209        if self.heap.len() >= self.backpressure_threshold {
210            self.backpressure_active = true;
211        }
212
213        Ok(())
214    }
215
216    /// Remove and return the highest priority event from the queue
217    ///
218    /// Events are returned in priority order, with high priority events
219    /// returned before normal and low priority events.
220    ///
221    /// # Returns
222    /// * `Some(event)` if there are events in the queue
223    /// * `None` if the queue is empty
224    ///
225    /// # Example
226    /// ```ignore
227    /// while let Some(event) = queue.pop() {
228    ///     process_event(event);
229    /// }
230    /// ```
231    pub fn pop(&mut self) -> Option<Event<M>> {
232        let result = self.heap.pop().map(|pe| pe.event);
233
234        if self.heap.len() < self.backpressure_threshold {
235            self.backpressure_active = false;
236        }
237
238        result
239    }
240
241    /// Check if the queue is empty
242    ///
243    /// # Returns
244    /// * `true` if the queue contains no events
245    /// * `false` if the queue contains one or more events
246    pub fn is_empty(&self) -> bool {
247        self.heap.is_empty()
248    }
249
250    /// Get the current number of events in the queue
251    ///
252    /// # Returns
253    /// The number of events currently in the queue
254    pub fn len(&self) -> usize {
255        self.heap.len()
256    }
257
258    /// Check if backpressure is currently active
259    ///
260    /// Backpressure is activated when the queue reaches 80% of its capacity.
261    ///
262    /// # Returns
263    /// * `true` if backpressure is active
264    /// * `false` if the queue is below the backpressure threshold
265    pub fn is_backpressure_active(&self) -> bool {
266        self.backpressure_active
267    }
268
269    /// Get the total number of events that have been dropped
270    ///
271    /// Events are dropped when the queue is full and lower priority
272    /// events are evicted to make room for higher priority ones.
273    ///
274    /// # Returns
275    /// The total number of events dropped since queue creation
276    pub fn dropped_events(&self) -> usize {
277        self.dropped_events
278    }
279
280    /// Clear all events from the queue
281    ///
282    /// This removes all events and resets the backpressure state,
283    /// but preserves the dropped event counter and capacity settings.
284    pub fn clear(&mut self) {
285        self.heap.clear();
286        self.backpressure_active = false;
287    }
288
289    fn find_lowest_priority_event(&self) -> Option<usize> {
290        self.heap
291            .iter()
292            .filter(|e| e.priority == Priority::Low)
293            .map(|e| e.sequence)
294            .min()
295    }
296
297    /// Get the current capacity of the queue
298    pub fn capacity(&self) -> usize {
299        self.max_size
300    }
301
302    /// Resize the queue to a new capacity
303    ///
304    /// # Arguments
305    /// * `new_size` - The new maximum size for the queue
306    ///
307    /// # Returns
308    /// * `Ok(())` if resize succeeded
309    /// * `Err(ResizeError)` if the new size is invalid or would cause data loss
310    pub fn resize(&mut self, new_size: usize) -> Result<(), ResizeError> {
311        if new_size == 0 {
312            return Err(ResizeError::InvalidSize("Queue size cannot be zero".into()));
313        }
314
315        let current_len = self.heap.len();
316
317        // If shrinking below current usage, we need to drop events
318        if new_size < current_len {
319            // Calculate how many events to drop
320            let to_drop = current_len - new_size;
321
322            // Collect events sorted by priority (lowest priority first)
323            let mut events: Vec<_> = self.heap.iter().collect();
324            events.sort_by(|a, b| {
325                b.priority
326                    .cmp(&a.priority)
327                    .then(b.sequence.cmp(&a.sequence))
328            });
329
330            // Get sequences of events to drop (lowest priority ones)
331            let drop_sequences: Vec<usize> =
332                events.iter().take(to_drop).map(|e| e.sequence).collect();
333
334            // Drop the events
335            self.heap.retain(|e| !drop_sequences.contains(&e.sequence));
336            self.dropped_events += to_drop;
337        }
338
339        // Update size and thresholds
340        self.max_size = new_size;
341        self.backpressure_threshold = (new_size as f64 * 0.8) as usize;
342
343        // Update backpressure status
344        self.backpressure_active = self.heap.len() >= self.backpressure_threshold;
345
346        Ok(())
347    }
348
349    /// Try to grow the queue by a specified amount
350    pub fn try_grow(&mut self, additional: usize) -> Result<usize, ResizeError> {
351        let new_size = self.max_size.saturating_add(additional);
352        self.resize(new_size)?;
353        Ok(new_size)
354    }
355
356    /// Try to shrink the queue by a specified amount
357    pub fn try_shrink(&mut self, reduction: usize) -> Result<usize, ResizeError> {
358        let new_size = self.max_size.saturating_sub(reduction).max(1);
359        self.resize(new_size)?;
360        Ok(new_size)
361    }
362
363    /// Get current queue statistics for scaling decisions
364    pub fn stats(&self) -> QueueStats {
365        QueueStats {
366            current_size: self.heap.len(),
367            max_size: self.max_size,
368            utilization: self.heap.len() as f64 / self.max_size as f64,
369            backpressure_active: self.backpressure_active,
370            dropped_events: self.dropped_events,
371        }
372    }
373}
374
375/// Error type for resize operations
376#[derive(Debug, Clone)]
377pub enum ResizeError {
378    /// The requested size is invalid (e.g., zero or negative)
379    InvalidSize(String),
380    /// The resize operation would cause high priority events to be dropped
381    WouldDropHighPriorityEvents,
382}
383
384impl std::fmt::Display for ResizeError {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            ResizeError::InvalidSize(msg) => write!(f, "Invalid size: {msg}"),
388            ResizeError::WouldDropHighPriorityEvents => {
389                write!(f, "Resize would drop high priority events")
390            }
391        }
392    }
393}
394
395impl std::error::Error for ResizeError {}
396
397/// Queue statistics for monitoring and scaling decisions
398#[derive(Debug, Clone)]
399pub struct QueueStats {
400    /// Current number of events in the queue
401    pub current_size: usize,
402    /// Maximum capacity of the queue
403    pub max_size: usize,
404    /// Current utilization as a percentage (0.0 to 1.0)
405    pub utilization: f64,
406    /// Whether backpressure is currently active
407    pub backpressure_active: bool,
408    /// Total number of events dropped
409    pub dropped_events: usize,
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use hojicha_core::event::{Key, KeyEvent};
416
417    #[derive(Debug, Clone, PartialEq)]
418    struct TestMsg(usize);
419
420    #[test]
421    fn test_priority_ordering() {
422        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(10);
423
424        queue.push(Event::Tick).unwrap();
425        queue.push(Event::User(TestMsg(1))).unwrap();
426        queue.push(Event::Quit).unwrap();
427        queue.push(Event::User(TestMsg(2))).unwrap();
428        queue
429            .push(Event::Key(KeyEvent {
430                key: Key::Char('a'),
431                modifiers: crossterm::event::KeyModifiers::empty(),
432            }))
433            .unwrap();
434
435        // Both Quit and Key have High priority, order between them is not guaranteed
436        let first = queue.pop();
437        let second = queue.pop();
438
439        // Check that we got both high priority events first
440        let got_quit = matches!(first, Some(Event::Quit)) || matches!(second, Some(Event::Quit));
441        let got_key = matches!(first, Some(Event::Key(_))) || matches!(second, Some(Event::Key(_)));
442
443        assert!(got_quit, "Expected Quit event in first two pops");
444        assert!(got_key, "Expected Key event in first two pops");
445
446        // Normal priority events - order may vary due to heap implementation
447        let third = queue.pop();
448        let fourth = queue.pop();
449
450        let got_user1 = matches!(third, Some(Event::User(TestMsg(1))))
451            || matches!(fourth, Some(Event::User(TestMsg(1))));
452        let got_user2 = matches!(third, Some(Event::User(TestMsg(2))))
453            || matches!(fourth, Some(Event::User(TestMsg(2))));
454
455        assert!(got_user1, "Expected User(TestMsg(1))");
456        assert!(got_user2, "Expected User(TestMsg(2))");
457        assert_eq!(queue.pop(), Some(Event::Tick));
458        assert_eq!(queue.pop(), None);
459    }
460
461    #[test]
462    fn test_backpressure() {
463        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(5);
464
465        for i in 0..4 {
466            queue.push(Event::User(TestMsg(i))).unwrap();
467        }
468
469        assert!(queue.is_backpressure_active());
470
471        queue.pop();
472        queue.pop();
473
474        assert!(!queue.is_backpressure_active());
475    }
476
477    #[test]
478    fn test_event_dropping() {
479        let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(3);
480
481        queue.push(Event::Tick).unwrap();
482        queue.push(Event::User(TestMsg(1))).unwrap();
483        queue.push(Event::User(TestMsg(2))).unwrap();
484
485        let result = queue.push(Event::Tick);
486        assert!(result.is_err());
487        assert_eq!(queue.dropped_events(), 1);
488
489        queue.push(Event::Quit).unwrap();
490        assert_eq!(queue.dropped_events(), 2);
491    }
492}