hojicha_runtime/priority_queue.rs
1//! Priority-based event queue with backpressure support
2//!
3//! This module provides a priority queue for events that ensures important events
4//! (like user input and quit commands) are processed before less critical events
5//! (like ticks and resize events). It also implements backpressure to prevent
6//! memory exhaustion under high load.
7//!
8//! # Priority Levels
9//!
10//! Events are automatically assigned priorities:
11//! - **High**: Quit, Key events, Suspend/Resume, Process execution
12//! - **Normal**: Mouse events, User messages, Paste events
13//! - **Low**: Tick, Resize, Focus/Blur events
14//!
15//! # Backpressure
16//!
17//! When the queue reaches 80% capacity, backpressure is activated. If the queue
18//! fills completely, lower priority events are dropped in favor of higher priority
19//! ones.
20//!
21//! # Example
22//!
23//! ```ignore
24//! use hojicha::priority_queue::PriorityEventQueue;
25//!
26//! let mut queue = PriorityEventQueue::new(1000);
27//!
28//! // High priority events are processed first
29//! queue.push(Event::Tick)?; // Low priority
30//! queue.push(Event::User(msg))?; // Normal priority
31//! queue.push(Event::Quit)?; // High priority
32//!
33//! assert_eq!(queue.pop(), Some(Event::Quit)); // High first
34//! assert_eq!(queue.pop(), Some(Event::User(msg))); // Then normal
35//! assert_eq!(queue.pop(), Some(Event::Tick)); // Low last
36//! ```
37
38use hojicha_core::core::Message;
39use hojicha_core::event::Event;
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42
43/// Priority levels for events in the queue
44///
45/// Lower numeric values indicate higher priority. Events are processed
46/// in priority order, with high priority events processed before normal
47/// and low priority events.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum Priority {
50 /// High priority for critical events (Quit, Key events, Suspend/Resume)
51 High = 0,
52 /// Normal priority for user interactions (Mouse, User messages, Paste)
53 Normal = 1,
54 /// Low priority for background events (Tick, Resize, Focus/Blur)
55 Low = 2,
56}
57
58impl Priority {
59 /// Determine the priority level for a given event
60 ///
61 /// This method automatically assigns priority levels based on event type:
62 /// - High: Quit, Key events, Suspend/Resume, Process execution
63 /// - Normal: Mouse events, User messages, Paste events
64 /// - Low: Tick, Resize, Focus/Blur events
65 pub fn from_event<M: Message>(event: &Event<M>) -> Self {
66 match event {
67 Event::Quit => Priority::High,
68 Event::Key(_) => Priority::High,
69 Event::Mouse(_) => Priority::Normal,
70 Event::User(_) => Priority::Normal,
71 Event::Resize { .. } => Priority::Low,
72 Event::Tick => Priority::Low,
73 Event::Paste(_) => Priority::Normal,
74 Event::Focus | Event::Blur => Priority::Low,
75 Event::Suspend | Event::Resume | Event::ExecProcess => Priority::High,
76 }
77 }
78}
79
80#[derive(Debug)]
81struct PriorityEvent<M: Message> {
82 priority: Priority,
83 sequence: usize,
84 event: Event<M>,
85}
86
87impl<M: Message> PartialEq for PriorityEvent<M> {
88 fn eq(&self, other: &Self) -> bool {
89 self.priority == other.priority && self.sequence == other.sequence
90 }
91}
92
93impl<M: Message> Eq for PriorityEvent<M> {}
94
95impl<M: Message> Ord for PriorityEvent<M> {
96 fn cmp(&self, other: &Self) -> Ordering {
97 // BinaryHeap is a max-heap, so we want High (0) to be greater than Low (2)
98 // Therefore we reverse the comparison
99 match other.priority.cmp(&self.priority) {
100 Ordering::Equal => self.sequence.cmp(&other.sequence),
101 other => other,
102 }
103 }
104}
105
106impl<M: Message> PartialOrd for PriorityEvent<M> {
107 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108 Some(self.cmp(other))
109 }
110}
111
112/// A priority queue for events with automatic backpressure handling
113///
114/// This queue ensures that high priority events (like user input and quit commands)
115/// are processed before lower priority events (like ticks and resize events).
116/// It implements backpressure to prevent memory exhaustion under high load.
117///
118/// When the queue reaches 80% capacity, backpressure is activated. If the queue
119/// fills completely, lower priority events are dropped in favor of higher priority ones.
120///
121/// # Example
122///
123/// ```ignore
124/// let mut queue = PriorityEventQueue::new(1000);
125///
126/// queue.push(Event::Tick)?; // Low priority
127/// queue.push(Event::Quit)?; // High priority
128///
129/// // High priority events are processed first
130/// assert_eq!(queue.pop(), Some(Event::Quit));
131/// assert_eq!(queue.pop(), Some(Event::Tick));
132/// ```
133pub struct PriorityEventQueue<M: Message> {
134 heap: BinaryHeap<PriorityEvent<M>>,
135 sequence_counter: usize,
136 max_size: usize,
137 backpressure_threshold: usize,
138 backpressure_active: bool,
139 dropped_events: usize,
140}
141
142impl<M: Message> PriorityEventQueue<M> {
143 /// Create a new priority event queue with the specified maximum size
144 ///
145 /// # Arguments
146 /// * `max_size` - Maximum number of events the queue can hold
147 ///
148 /// # Example
149 /// ```ignore
150 /// let queue = PriorityEventQueue::new(1000);
151 /// ```
152 pub fn new(max_size: usize) -> Self {
153 Self {
154 heap: BinaryHeap::new(),
155 sequence_counter: 0,
156 max_size,
157 backpressure_threshold: (max_size as f64 * 0.8) as usize,
158 backpressure_active: false,
159 dropped_events: 0,
160 }
161 }
162
163 /// Push an event into the priority queue
164 ///
165 /// Events are automatically prioritized based on their type. If the queue is full,
166 /// lower priority events may be dropped to make room for higher priority ones.
167 ///
168 /// # Arguments
169 /// * `event` - The event to add to the queue
170 ///
171 /// # Returns
172 /// * `Ok(())` if the event was successfully added
173 /// * `Err(event)` if the event was dropped due to queue overflow
174 ///
175 /// # Example
176 /// ```ignore
177 /// if let Err(dropped) = queue.push(Event::Tick) {
178 /// println!("Event was dropped: {:?}", dropped);
179 /// }
180 /// ```
181 pub fn push(&mut self, event: Event<M>) -> Result<(), Event<M>> {
182 if self.heap.len() >= self.max_size {
183 let priority = Priority::from_event(&event);
184
185 if priority == Priority::High {
186 if let Some(lowest) = self.find_lowest_priority_event() {
187 self.heap.retain(|e| e.sequence != lowest);
188 self.dropped_events += 1;
189 } else {
190 self.dropped_events += 1;
191 return Err(event);
192 }
193 } else {
194 self.dropped_events += 1;
195 return Err(event);
196 }
197 }
198
199 let priority = Priority::from_event(&event);
200 let priority_event = PriorityEvent {
201 priority,
202 sequence: self.sequence_counter,
203 event,
204 };
205
206 self.sequence_counter += 1;
207 self.heap.push(priority_event);
208
209 if self.heap.len() >= self.backpressure_threshold {
210 self.backpressure_active = true;
211 }
212
213 Ok(())
214 }
215
216 /// Remove and return the highest priority event from the queue
217 ///
218 /// Events are returned in priority order, with high priority events
219 /// returned before normal and low priority events.
220 ///
221 /// # Returns
222 /// * `Some(event)` if there are events in the queue
223 /// * `None` if the queue is empty
224 ///
225 /// # Example
226 /// ```ignore
227 /// while let Some(event) = queue.pop() {
228 /// process_event(event);
229 /// }
230 /// ```
231 pub fn pop(&mut self) -> Option<Event<M>> {
232 let result = self.heap.pop().map(|pe| pe.event);
233
234 if self.heap.len() < self.backpressure_threshold {
235 self.backpressure_active = false;
236 }
237
238 result
239 }
240
241 /// Check if the queue is empty
242 ///
243 /// # Returns
244 /// * `true` if the queue contains no events
245 /// * `false` if the queue contains one or more events
246 pub fn is_empty(&self) -> bool {
247 self.heap.is_empty()
248 }
249
250 /// Get the current number of events in the queue
251 ///
252 /// # Returns
253 /// The number of events currently in the queue
254 pub fn len(&self) -> usize {
255 self.heap.len()
256 }
257
258 /// Check if backpressure is currently active
259 ///
260 /// Backpressure is activated when the queue reaches 80% of its capacity.
261 ///
262 /// # Returns
263 /// * `true` if backpressure is active
264 /// * `false` if the queue is below the backpressure threshold
265 pub fn is_backpressure_active(&self) -> bool {
266 self.backpressure_active
267 }
268
269 /// Get the total number of events that have been dropped
270 ///
271 /// Events are dropped when the queue is full and lower priority
272 /// events are evicted to make room for higher priority ones.
273 ///
274 /// # Returns
275 /// The total number of events dropped since queue creation
276 pub fn dropped_events(&self) -> usize {
277 self.dropped_events
278 }
279
280 /// Clear all events from the queue
281 ///
282 /// This removes all events and resets the backpressure state,
283 /// but preserves the dropped event counter and capacity settings.
284 pub fn clear(&mut self) {
285 self.heap.clear();
286 self.backpressure_active = false;
287 }
288
289 fn find_lowest_priority_event(&self) -> Option<usize> {
290 self.heap
291 .iter()
292 .filter(|e| e.priority == Priority::Low)
293 .map(|e| e.sequence)
294 .min()
295 }
296
297 /// Get the current capacity of the queue
298 pub fn capacity(&self) -> usize {
299 self.max_size
300 }
301
302 /// Resize the queue to a new capacity
303 ///
304 /// # Arguments
305 /// * `new_size` - The new maximum size for the queue
306 ///
307 /// # Returns
308 /// * `Ok(())` if resize succeeded
309 /// * `Err(ResizeError)` if the new size is invalid or would cause data loss
310 pub fn resize(&mut self, new_size: usize) -> Result<(), ResizeError> {
311 if new_size == 0 {
312 return Err(ResizeError::InvalidSize("Queue size cannot be zero".into()));
313 }
314
315 let current_len = self.heap.len();
316
317 // If shrinking below current usage, we need to drop events
318 if new_size < current_len {
319 // Calculate how many events to drop
320 let to_drop = current_len - new_size;
321
322 // Collect events sorted by priority (lowest priority first)
323 let mut events: Vec<_> = self.heap.iter().collect();
324 events.sort_by(|a, b| {
325 b.priority
326 .cmp(&a.priority)
327 .then(b.sequence.cmp(&a.sequence))
328 });
329
330 // Get sequences of events to drop (lowest priority ones)
331 let drop_sequences: Vec<usize> =
332 events.iter().take(to_drop).map(|e| e.sequence).collect();
333
334 // Drop the events
335 self.heap.retain(|e| !drop_sequences.contains(&e.sequence));
336 self.dropped_events += to_drop;
337 }
338
339 // Update size and thresholds
340 self.max_size = new_size;
341 self.backpressure_threshold = (new_size as f64 * 0.8) as usize;
342
343 // Update backpressure status
344 self.backpressure_active = self.heap.len() >= self.backpressure_threshold;
345
346 Ok(())
347 }
348
349 /// Try to grow the queue by a specified amount
350 pub fn try_grow(&mut self, additional: usize) -> Result<usize, ResizeError> {
351 let new_size = self.max_size.saturating_add(additional);
352 self.resize(new_size)?;
353 Ok(new_size)
354 }
355
356 /// Try to shrink the queue by a specified amount
357 pub fn try_shrink(&mut self, reduction: usize) -> Result<usize, ResizeError> {
358 let new_size = self.max_size.saturating_sub(reduction).max(1);
359 self.resize(new_size)?;
360 Ok(new_size)
361 }
362
363 /// Get current queue statistics for scaling decisions
364 pub fn stats(&self) -> QueueStats {
365 QueueStats {
366 current_size: self.heap.len(),
367 max_size: self.max_size,
368 utilization: self.heap.len() as f64 / self.max_size as f64,
369 backpressure_active: self.backpressure_active,
370 dropped_events: self.dropped_events,
371 }
372 }
373}
374
375/// Error type for resize operations
376#[derive(Debug, Clone)]
377pub enum ResizeError {
378 /// The requested size is invalid (e.g., zero or negative)
379 InvalidSize(String),
380 /// The resize operation would cause high priority events to be dropped
381 WouldDropHighPriorityEvents,
382}
383
384impl std::fmt::Display for ResizeError {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 ResizeError::InvalidSize(msg) => write!(f, "Invalid size: {msg}"),
388 ResizeError::WouldDropHighPriorityEvents => {
389 write!(f, "Resize would drop high priority events")
390 }
391 }
392 }
393}
394
395impl std::error::Error for ResizeError {}
396
397/// Queue statistics for monitoring and scaling decisions
398#[derive(Debug, Clone)]
399pub struct QueueStats {
400 /// Current number of events in the queue
401 pub current_size: usize,
402 /// Maximum capacity of the queue
403 pub max_size: usize,
404 /// Current utilization as a percentage (0.0 to 1.0)
405 pub utilization: f64,
406 /// Whether backpressure is currently active
407 pub backpressure_active: bool,
408 /// Total number of events dropped
409 pub dropped_events: usize,
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use hojicha_core::event::{Key, KeyEvent};
416
417 #[derive(Debug, Clone, PartialEq)]
418 struct TestMsg(usize);
419
420 #[test]
421 fn test_priority_ordering() {
422 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(10);
423
424 queue.push(Event::Tick).unwrap();
425 queue.push(Event::User(TestMsg(1))).unwrap();
426 queue.push(Event::Quit).unwrap();
427 queue.push(Event::User(TestMsg(2))).unwrap();
428 queue
429 .push(Event::Key(KeyEvent {
430 key: Key::Char('a'),
431 modifiers: crossterm::event::KeyModifiers::empty(),
432 }))
433 .unwrap();
434
435 // Both Quit and Key have High priority, order between them is not guaranteed
436 let first = queue.pop();
437 let second = queue.pop();
438
439 // Check that we got both high priority events first
440 let got_quit = matches!(first, Some(Event::Quit)) || matches!(second, Some(Event::Quit));
441 let got_key = matches!(first, Some(Event::Key(_))) || matches!(second, Some(Event::Key(_)));
442
443 assert!(got_quit, "Expected Quit event in first two pops");
444 assert!(got_key, "Expected Key event in first two pops");
445
446 // Normal priority events - order may vary due to heap implementation
447 let third = queue.pop();
448 let fourth = queue.pop();
449
450 let got_user1 = matches!(third, Some(Event::User(TestMsg(1))))
451 || matches!(fourth, Some(Event::User(TestMsg(1))));
452 let got_user2 = matches!(third, Some(Event::User(TestMsg(2))))
453 || matches!(fourth, Some(Event::User(TestMsg(2))));
454
455 assert!(got_user1, "Expected User(TestMsg(1))");
456 assert!(got_user2, "Expected User(TestMsg(2))");
457 assert_eq!(queue.pop(), Some(Event::Tick));
458 assert_eq!(queue.pop(), None);
459 }
460
461 #[test]
462 fn test_backpressure() {
463 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(5);
464
465 for i in 0..4 {
466 queue.push(Event::User(TestMsg(i))).unwrap();
467 }
468
469 assert!(queue.is_backpressure_active());
470
471 queue.pop();
472 queue.pop();
473
474 assert!(!queue.is_backpressure_active());
475 }
476
477 #[test]
478 fn test_event_dropping() {
479 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(3);
480
481 queue.push(Event::Tick).unwrap();
482 queue.push(Event::User(TestMsg(1))).unwrap();
483 queue.push(Event::User(TestMsg(2))).unwrap();
484
485 let result = queue.push(Event::Tick);
486 assert!(result.is_err());
487 assert_eq!(queue.dropped_events(), 1);
488
489 queue.push(Event::Quit).unwrap();
490 assert_eq!(queue.dropped_events(), 2);
491 }
492}