hojicha_runtime/priority_queue.rs
1//! Priority-based event queue with backpressure support
2//!
3//! This module provides a priority queue for events that ensures important events
4//! (like user input and quit commands) are processed before less critical events
5//! (like ticks and resize events). It also implements backpressure to prevent
6//! memory exhaustion under high load.
7//!
8//! # Priority Levels
9//!
10//! Events are automatically assigned priorities:
11//! - **High**: Quit, Key events, Suspend/Resume, Process execution
12//! - **Normal**: Mouse events, User messages, Paste events
13//! - **Low**: Tick, Resize, Focus/Blur events
14//!
15//! # Backpressure
16//!
17//! When the queue reaches 80% capacity, backpressure is activated. If the queue
18//! fills completely, lower priority events are dropped in favor of higher priority
19//! ones.
20//!
21//! # Example
22//!
23//! ```
24//! use hojicha_runtime::priority_queue::PriorityEventQueue;
25//! use hojicha_core::event::Event;
26//!
27//! #[derive(Debug, Clone, PartialEq)]
28//! struct TestMsg(u32);
29//!
30//! let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
31//!
32//! // High priority events are processed first
33//! queue.push(Event::Tick).unwrap(); // Low priority
34//! queue.push(Event::User(TestMsg(42))).unwrap(); // Normal priority
35//! queue.push(Event::Quit).unwrap(); // High priority
36//!
37//! assert_eq!(queue.pop(), Some(Event::Quit)); // High first
38//! assert_eq!(queue.pop(), Some(Event::User(TestMsg(42)))); // Then normal
39//! assert_eq!(queue.pop(), Some(Event::Tick)); // Low last
40//! ```
41
42use hojicha_core::core::Message;
43use hojicha_core::event::Event;
44use std::cmp::Ordering;
45use std::collections::BinaryHeap;
46
47/// Priority levels for events in the queue
48///
49/// Lower numeric values indicate higher priority. Events are processed
50/// in priority order, with high priority events processed before normal
51/// and low priority events.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
53pub enum Priority {
54 /// High priority for critical events (Quit, Key events, Suspend/Resume)
55 High = 0,
56 /// Normal priority for user interactions (Mouse, User messages, Paste)
57 Normal = 1,
58 /// Low priority for background events (Tick, Resize, Focus/Blur)
59 Low = 2,
60}
61
62impl Priority {
63 /// Determine the priority level for a given event
64 ///
65 /// This method automatically assigns priority levels based on event type:
66 /// - High: Quit, Key events, Suspend/Resume, Process execution
67 /// - Normal: Mouse events, User messages, Paste events
68 /// - Low: Tick, Resize, Focus/Blur events
69 pub fn from_event<M: Message>(event: &Event<M>) -> Self {
70 match event {
71 Event::Quit => Priority::High,
72 Event::Key(_) => Priority::High,
73 Event::Mouse(_) => Priority::Normal,
74 Event::User(_) => Priority::Normal,
75 Event::Resize { .. } => Priority::Low,
76 Event::Tick => Priority::Low,
77 Event::Paste(_) => Priority::Normal,
78 Event::Focus | Event::Blur => Priority::Low,
79 Event::Suspend | Event::Resume | Event::ExecProcess => Priority::High,
80 }
81 }
82}
83
84#[derive(Debug)]
85struct PriorityEvent<M: Message> {
86 priority: Priority,
87 sequence: usize,
88 event: Event<M>,
89}
90
91impl<M: Message> PartialEq for PriorityEvent<M> {
92 fn eq(&self, other: &Self) -> bool {
93 self.priority == other.priority && self.sequence == other.sequence
94 }
95}
96
97impl<M: Message> Eq for PriorityEvent<M> {}
98
99impl<M: Message> Ord for PriorityEvent<M> {
100 fn cmp(&self, other: &Self) -> Ordering {
101 // BinaryHeap is a max-heap, so we want High (0) to be greater than Low (2)
102 // Therefore we reverse the comparison
103 match other.priority.cmp(&self.priority) {
104 Ordering::Equal => self.sequence.cmp(&other.sequence),
105 other => other,
106 }
107 }
108}
109
110impl<M: Message> PartialOrd for PriorityEvent<M> {
111 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116/// A priority queue for events with automatic backpressure handling
117///
118/// This queue ensures that high priority events (like user input and quit commands)
119/// are processed before lower priority events (like ticks and resize events).
120/// It implements backpressure to prevent memory exhaustion under high load.
121///
122/// When the queue reaches 80% capacity, backpressure is activated. If the queue
123/// fills completely, lower priority events are dropped in favor of higher priority ones.
124///
125/// # Example
126///
127/// ```ignore
128/// let mut queue = PriorityEventQueue::new(1000);
129///
130/// queue.push(Event::Tick)?; // Low priority
131/// queue.push(Event::Quit)?; // High priority
132///
133/// // High priority events are processed first
134/// assert_eq!(queue.pop(), Some(Event::Quit));
135/// assert_eq!(queue.pop(), Some(Event::Tick));
136/// ```
137pub struct PriorityEventQueue<M: Message> {
138 heap: BinaryHeap<PriorityEvent<M>>,
139 sequence_counter: usize,
140 max_size: usize,
141 backpressure_threshold: usize,
142 backpressure_active: bool,
143 dropped_events: usize,
144}
145
146impl<M: Message> PriorityEventQueue<M> {
147 /// Create a new priority event queue with the specified maximum size
148 ///
149 /// # Arguments
150 /// * `max_size` - Maximum number of events the queue can hold
151 ///
152 /// # Example
153 /// ```
154 /// use hojicha_runtime::priority_queue::PriorityEventQueue;
155 ///
156 /// #[derive(Debug, Clone, PartialEq)]
157 /// struct TestMsg;
158 ///
159 /// let queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
160 /// assert_eq!(queue.len(), 0);
161 /// assert_eq!(queue.capacity(), 1000);
162 /// ```
163 pub fn new(max_size: usize) -> Self {
164 Self {
165 heap: BinaryHeap::new(),
166 sequence_counter: 0,
167 max_size,
168 backpressure_threshold: (max_size as f64 * 0.8) as usize,
169 backpressure_active: false,
170 dropped_events: 0,
171 }
172 }
173
174 /// Push an event into the priority queue
175 ///
176 /// Events are automatically prioritized based on their type. If the queue is full,
177 /// lower priority events may be dropped to make room for higher priority ones.
178 ///
179 /// # Arguments
180 /// * `event` - The event to add to the queue
181 ///
182 /// # Returns
183 /// * `Ok(())` if the event was successfully added
184 /// * `Err(event)` if the event was dropped due to queue overflow
185 ///
186 /// # Example
187 /// ```
188 /// use hojicha_runtime::priority_queue::PriorityEventQueue;
189 /// use hojicha_core::event::Event;
190 ///
191 /// #[derive(Debug, Clone, PartialEq)]
192 /// struct TestMsg;
193 ///
194 /// let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
195 /// let result = queue.push(Event::Tick);
196 /// assert!(result.is_ok());
197 /// assert_eq!(queue.len(), 1);
198 /// ```
199 pub fn push(&mut self, event: Event<M>) -> Result<(), Event<M>> {
200 if self.heap.len() >= self.max_size {
201 let priority = Priority::from_event(&event);
202
203 if priority == Priority::High {
204 if let Some(lowest) = self.find_lowest_priority_event() {
205 self.heap.retain(|e| e.sequence != lowest);
206 self.dropped_events += 1;
207 } else {
208 self.dropped_events += 1;
209 return Err(event);
210 }
211 } else {
212 self.dropped_events += 1;
213 return Err(event);
214 }
215 }
216
217 let priority = Priority::from_event(&event);
218 let priority_event = PriorityEvent {
219 priority,
220 sequence: self.sequence_counter,
221 event,
222 };
223
224 self.sequence_counter += 1;
225 self.heap.push(priority_event);
226
227 if self.heap.len() >= self.backpressure_threshold {
228 self.backpressure_active = true;
229 }
230
231 Ok(())
232 }
233
234 /// Remove and return the highest priority event from the queue
235 ///
236 /// Events are returned in priority order, with high priority events
237 /// returned before normal and low priority events.
238 ///
239 /// # Returns
240 /// * `Some(event)` if there are events in the queue
241 /// * `None` if the queue is empty
242 ///
243 /// # Example
244 /// ```
245 /// use hojicha_runtime::priority_queue::PriorityEventQueue;
246 /// use hojicha_core::event::Event;
247 ///
248 /// #[derive(Debug, Clone, PartialEq)]
249 /// struct TestMsg;
250 ///
251 /// let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(1000);
252 /// queue.push(Event::Tick).unwrap();
253 /// queue.push(Event::Quit).unwrap(); // Higher priority
254 ///
255 /// // High priority event comes first
256 /// assert_eq!(queue.pop(), Some(Event::Quit));
257 /// assert_eq!(queue.pop(), Some(Event::Tick));
258 /// assert_eq!(queue.pop(), None);
259 /// ```
260 pub fn pop(&mut self) -> Option<Event<M>> {
261 let result = self.heap.pop().map(|pe| pe.event);
262
263 if self.heap.len() < self.backpressure_threshold {
264 self.backpressure_active = false;
265 }
266
267 result
268 }
269
270 /// Check if the queue is empty
271 ///
272 /// # Returns
273 /// * `true` if the queue contains no events
274 /// * `false` if the queue contains one or more events
275 pub fn is_empty(&self) -> bool {
276 self.heap.is_empty()
277 }
278
279 /// Get the current number of events in the queue
280 ///
281 /// # Returns
282 /// The number of events currently in the queue
283 pub fn len(&self) -> usize {
284 self.heap.len()
285 }
286
287 /// Check if backpressure is currently active
288 ///
289 /// Backpressure is activated when the queue reaches 80% of its capacity.
290 ///
291 /// # Returns
292 /// * `true` if backpressure is active
293 /// * `false` if the queue is below the backpressure threshold
294 pub fn is_backpressure_active(&self) -> bool {
295 self.backpressure_active
296 }
297
298 /// Get the total number of events that have been dropped
299 ///
300 /// Events are dropped when the queue is full and lower priority
301 /// events are evicted to make room for higher priority ones.
302 ///
303 /// # Returns
304 /// The total number of events dropped since queue creation
305 pub fn dropped_events(&self) -> usize {
306 self.dropped_events
307 }
308
309 /// Clear all events from the queue
310 ///
311 /// This removes all events and resets the backpressure state,
312 /// but preserves the dropped event counter and capacity settings.
313 pub fn clear(&mut self) {
314 self.heap.clear();
315 self.backpressure_active = false;
316 }
317
318 fn find_lowest_priority_event(&self) -> Option<usize> {
319 self.heap
320 .iter()
321 .filter(|e| e.priority == Priority::Low)
322 .map(|e| e.sequence)
323 .min()
324 }
325
326 /// Get the current capacity of the queue
327 pub fn capacity(&self) -> usize {
328 self.max_size
329 }
330
331 /// Resize the queue to a new capacity
332 ///
333 /// # Arguments
334 /// * `new_size` - The new maximum size for the queue
335 ///
336 /// # Returns
337 /// * `Ok(())` if resize succeeded
338 /// * `Err(ResizeError)` if the new size is invalid or would cause data loss
339 pub fn resize(&mut self, new_size: usize) -> Result<(), ResizeError> {
340 if new_size == 0 {
341 return Err(ResizeError::InvalidSize("Queue size cannot be zero".into()));
342 }
343
344 let current_len = self.heap.len();
345
346 // If shrinking below current usage, we need to drop events
347 if new_size < current_len {
348 // Calculate how many events to drop
349 let to_drop = current_len - new_size;
350
351 // Collect events sorted by priority (lowest priority first)
352 let mut events: Vec<_> = self.heap.iter().collect();
353 events.sort_by(|a, b| {
354 b.priority
355 .cmp(&a.priority)
356 .then(b.sequence.cmp(&a.sequence))
357 });
358
359 // Get sequences of events to drop (lowest priority ones)
360 let drop_sequences: Vec<usize> =
361 events.iter().take(to_drop).map(|e| e.sequence).collect();
362
363 // Drop the events
364 self.heap.retain(|e| !drop_sequences.contains(&e.sequence));
365 self.dropped_events += to_drop;
366 }
367
368 // Update size and thresholds
369 self.max_size = new_size;
370 self.backpressure_threshold = (new_size as f64 * 0.8) as usize;
371
372 // Update backpressure status
373 self.backpressure_active = self.heap.len() >= self.backpressure_threshold;
374
375 Ok(())
376 }
377
378 /// Try to grow the queue by a specified amount
379 pub fn try_grow(&mut self, additional: usize) -> Result<usize, ResizeError> {
380 let new_size = self.max_size.saturating_add(additional);
381 self.resize(new_size)?;
382 Ok(new_size)
383 }
384
385 /// Try to shrink the queue by a specified amount
386 pub fn try_shrink(&mut self, reduction: usize) -> Result<usize, ResizeError> {
387 let new_size = self.max_size.saturating_sub(reduction).max(1);
388 self.resize(new_size)?;
389 Ok(new_size)
390 }
391
392 /// Get current queue statistics for scaling decisions
393 pub fn stats(&self) -> QueueStats {
394 QueueStats {
395 current_size: self.heap.len(),
396 max_size: self.max_size,
397 utilization: self.heap.len() as f64 / self.max_size as f64,
398 backpressure_active: self.backpressure_active,
399 dropped_events: self.dropped_events,
400 }
401 }
402}
403
404/// Error type for resize operations
405#[derive(Debug, Clone)]
406pub enum ResizeError {
407 /// The requested size is invalid (e.g., zero or negative)
408 InvalidSize(String),
409 /// The resize operation would cause high priority events to be dropped
410 WouldDropHighPriorityEvents,
411}
412
413impl std::fmt::Display for ResizeError {
414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415 match self {
416 ResizeError::InvalidSize(msg) => write!(f, "Invalid size: {msg}"),
417 ResizeError::WouldDropHighPriorityEvents => {
418 write!(f, "Resize would drop high priority events")
419 }
420 }
421 }
422}
423
424impl std::error::Error for ResizeError {}
425
426/// Queue statistics for monitoring and scaling decisions
427#[derive(Debug, Clone)]
428pub struct QueueStats {
429 /// Current number of events in the queue
430 pub current_size: usize,
431 /// Maximum capacity of the queue
432 pub max_size: usize,
433 /// Current utilization as a percentage (0.0 to 1.0)
434 pub utilization: f64,
435 /// Whether backpressure is currently active
436 pub backpressure_active: bool,
437 /// Total number of events dropped
438 pub dropped_events: usize,
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use hojicha_core::event::{Key, KeyEvent};
445
446 #[derive(Debug, Clone, PartialEq)]
447 struct TestMsg(usize);
448
449 #[test]
450 fn test_priority_ordering() {
451 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(10);
452
453 queue.push(Event::Tick).unwrap();
454 queue.push(Event::User(TestMsg(1))).unwrap();
455 queue.push(Event::Quit).unwrap();
456 queue.push(Event::User(TestMsg(2))).unwrap();
457 queue
458 .push(Event::Key(KeyEvent {
459 key: Key::Char('a'),
460 modifiers: crossterm::event::KeyModifiers::empty(),
461 }))
462 .unwrap();
463
464 // Both Quit and Key have High priority, order between them is not guaranteed
465 let first = queue.pop();
466 let second = queue.pop();
467
468 // Check that we got both high priority events first
469 let got_quit = matches!(first, Some(Event::Quit)) || matches!(second, Some(Event::Quit));
470 let got_key = matches!(first, Some(Event::Key(_))) || matches!(second, Some(Event::Key(_)));
471
472 assert!(got_quit, "Expected Quit event in first two pops");
473 assert!(got_key, "Expected Key event in first two pops");
474
475 // Normal priority events - order may vary due to heap implementation
476 let third = queue.pop();
477 let fourth = queue.pop();
478
479 let got_user1 = matches!(third, Some(Event::User(TestMsg(1))))
480 || matches!(fourth, Some(Event::User(TestMsg(1))));
481 let got_user2 = matches!(third, Some(Event::User(TestMsg(2))))
482 || matches!(fourth, Some(Event::User(TestMsg(2))));
483
484 assert!(got_user1, "Expected User(TestMsg(1))");
485 assert!(got_user2, "Expected User(TestMsg(2))");
486 assert_eq!(queue.pop(), Some(Event::Tick));
487 assert_eq!(queue.pop(), None);
488 }
489
490 #[test]
491 fn test_backpressure() {
492 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(5);
493
494 for i in 0..4 {
495 queue.push(Event::User(TestMsg(i))).unwrap();
496 }
497
498 assert!(queue.is_backpressure_active());
499
500 queue.pop();
501 queue.pop();
502
503 assert!(!queue.is_backpressure_active());
504 }
505
506 #[test]
507 fn test_event_dropping() {
508 let mut queue: PriorityEventQueue<TestMsg> = PriorityEventQueue::new(3);
509
510 queue.push(Event::Tick).unwrap();
511 queue.push(Event::User(TestMsg(1))).unwrap();
512 queue.push(Event::User(TestMsg(2))).unwrap();
513
514 let result = queue.push(Event::Tick);
515 assert!(result.is_err());
516 assert_eq!(queue.dropped_events(), 1);
517
518 queue.push(Event::Quit).unwrap();
519 assert_eq!(queue.dropped_events(), 2);
520 }
521}