hojicha_runtime/program/
priority_event_processor.rs

1//! Priority-aware event processing with automatic event prioritization
2//!
3//! This module implements the default event processing with built-in prioritization
4//! to ensure UI responsiveness even under heavy event load.
5
6use crate::metrics::{AdvancedEventStats, MetricsCollector, MetricsConfig};
7use crate::priority_queue::{Priority, PriorityEventQueue, ResizeError};
8use crate::queue_scaling::{AutoScaleConfig, QueueAutoScaler, ScalingDecision};
9use crossterm::event::{Event as CrosstermEvent, KeyEventKind};
10use hojicha_core::core::Message;
11use hojicha_core::event::Event;
12use log::{debug, info, trace, warn};
13use std::sync::mpsc;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17/// Type alias for custom priority mapping function
18type PriorityMapper = Arc<dyn Fn(&Event<()>) -> Priority + Send + Sync>;
19
20/// Statistics for monitoring event processing behavior
21#[derive(Debug, Clone, Default)]
22pub struct EventStats {
23    /// Total number of events processed
24    pub total_events: usize,
25    /// Number of high priority events processed
26    pub high_priority_events: usize,
27    /// Number of normal priority events processed
28    pub normal_priority_events: usize,
29    /// Number of low priority events processed
30    pub low_priority_events: usize,
31    /// Number of events dropped due to queue overflow
32    pub dropped_events: usize,
33    /// Total processing time in milliseconds
34    pub processing_time_ms: u128,
35    /// Maximum queue size reached during processing
36    pub queue_size_max: usize,
37}
38
39/// Configuration for priority event processing
40#[derive(Clone)]
41pub struct PriorityConfig {
42    /// Maximum queue size (default: 1000)
43    pub max_queue_size: usize,
44    /// Whether to log dropped events (default: true)
45    pub log_drops: bool,
46    /// Custom priority mapper (default: None, uses automatic detection)
47    pub priority_mapper: Option<PriorityMapper>,
48}
49
50impl std::fmt::Debug for PriorityConfig {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("PriorityConfig")
53            .field("max_queue_size", &self.max_queue_size)
54            .field("log_drops", &self.log_drops)
55            .field("priority_mapper", &self.priority_mapper.is_some())
56            .finish()
57    }
58}
59
60impl Default for PriorityConfig {
61    fn default() -> Self {
62        Self {
63            max_queue_size: 1000,
64            log_drops: true,
65            priority_mapper: None,
66        }
67    }
68}
69
70/// Priority-aware event processor
71pub struct PriorityEventProcessor<M: Message> {
72    queue: Arc<Mutex<PriorityEventQueue<M>>>,
73    stats: Arc<Mutex<EventStats>>,
74    config: PriorityConfig,
75    metrics: Arc<MetricsCollector>,
76    auto_scaler: Option<Arc<Mutex<QueueAutoScaler>>>,
77}
78
79impl<M: Message> PriorityEventProcessor<M> {
80    /// Create a new priority event processor with default configuration
81    pub fn new() -> Self {
82        Self::with_config(PriorityConfig::default())
83    }
84
85    /// Create a new priority event processor with custom configuration
86    pub fn with_config(config: PriorityConfig) -> Self {
87        debug!(
88            "Initializing PriorityEventProcessor with max_queue_size: {}",
89            config.max_queue_size
90        );
91
92        let metrics_config = MetricsConfig {
93            track_percentiles: true,
94            track_by_type: true,
95            sampling_rate: 1.0,
96            max_histogram_size: 100_000,
97            rate_window: Duration::from_secs(60),
98        };
99
100        Self {
101            queue: Arc::new(Mutex::new(PriorityEventQueue::new(config.max_queue_size))),
102            stats: Arc::new(Mutex::new(EventStats::default())),
103            config,
104            metrics: Arc::new(MetricsCollector::new(metrics_config)),
105            auto_scaler: None,
106        }
107    }
108
109    /// Get current statistics
110    pub fn stats(&self) -> EventStats {
111        self.stats.lock().unwrap().clone()
112    }
113
114    /// Reset statistics
115    pub fn reset_stats(&self) {
116        *self.stats.lock().unwrap() = EventStats::default();
117        self.metrics.reset();
118    }
119
120    /// Get advanced metrics snapshot
121    pub fn advanced_metrics(&self) -> AdvancedEventStats {
122        self.metrics.snapshot()
123    }
124
125    /// Get metrics collector for external monitoring
126    pub fn metrics_collector(&self) -> Arc<MetricsCollector> {
127        self.metrics.clone()
128    }
129
130    /// Enable auto-scaling with the given configuration
131    pub fn enable_auto_scaling(&mut self, config: AutoScaleConfig) {
132        info!("Enabling auto-scaling with config: {:?}", config);
133        self.auto_scaler = Some(Arc::new(Mutex::new(QueueAutoScaler::new(config))));
134    }
135
136    /// Disable auto-scaling
137    pub fn disable_auto_scaling(&mut self) {
138        info!("Disabling auto-scaling");
139        self.auto_scaler = None;
140    }
141
142    /// Manually resize the queue
143    pub fn resize_queue(&self, new_size: usize) -> Result<(), ResizeError> {
144        let mut queue = self.queue.lock().unwrap();
145        let old_size = queue.capacity();
146        queue.resize(new_size)?;
147        info!("Queue resized from {} to {}", old_size, new_size);
148        Ok(())
149    }
150
151    /// Get current queue capacity
152    pub fn queue_capacity(&self) -> usize {
153        self.queue.lock().unwrap().capacity()
154    }
155
156    /// Push an event to the queue
157    pub fn push(&self, event: Event<M>) -> Result<(), Event<M>> {
158        let priority = self.detect_priority(&event);
159
160        trace!(
161            "Pushing event with priority {:?}: {:?}",
162            priority,
163            std::mem::discriminant(&event)
164        );
165
166        let mut queue = self.queue.lock().unwrap();
167        let result = queue.push(event);
168
169        // Update statistics
170        let mut stats = self.stats.lock().unwrap();
171        stats.total_events += 1;
172
173        match priority {
174            Priority::High => stats.high_priority_events += 1,
175            Priority::Normal => stats.normal_priority_events += 1,
176            Priority::Low => stats.low_priority_events += 1,
177        }
178
179        let current_size = queue.len();
180        let capacity = self.config.max_queue_size;
181        if current_size > stats.queue_size_max {
182            stats.queue_size_max = current_size;
183        }
184
185        // Update metrics
186        self.metrics.update_queue_depth(current_size, capacity);
187
188        if result.is_err() {
189            stats.dropped_events += 1;
190            self.metrics.record_dropped();
191            if self.config.log_drops {
192                warn!(
193                    "Dropped event due to queue overflow (priority: {:?})",
194                    priority
195                );
196            }
197        }
198
199        // Log if queue is getting full
200        if queue.is_backpressure_active() {
201            debug!("Queue backpressure active: {} events queued", current_size);
202            self.metrics.record_backpressure();
203        }
204
205        result
206    }
207
208    /// Pop the highest priority event from the queue
209    pub fn pop(&self) -> Option<Event<M>> {
210        let start = Instant::now();
211        let event = self.queue.lock().unwrap().pop();
212
213        if let Some(ref e) = event {
214            let elapsed = start.elapsed();
215            let mut stats = self.stats.lock().unwrap();
216            stats.processing_time_ms += elapsed.as_millis();
217
218            // Record metrics for the event
219            let priority = Priority::from_event(e);
220            let event_type = match e {
221                Event::Quit => Some("quit"),
222                Event::Key(_) => Some("key"),
223                Event::Mouse(_) => Some("mouse"),
224                Event::User(_) => Some("user"),
225                Event::Resize { .. } => Some("resize"),
226                Event::Tick => Some("tick"),
227                Event::Paste(_) => Some("paste"),
228                Event::Focus => Some("focus"),
229                Event::Blur => Some("blur"),
230                Event::Suspend => Some("suspend"),
231                Event::Resume => Some("resume"),
232                Event::ExecProcess => Some("exec"),
233            };
234
235            self.metrics.record_event(priority, elapsed, event_type);
236
237            // Check if auto-scaling should be triggered
238            if let Some(ref auto_scaler) = self.auto_scaler {
239                let mut scaler = auto_scaler.lock().unwrap();
240                let mut queue = self.queue.lock().unwrap();
241
242                if let Some(decision) = scaler.on_event_processed(&mut queue) {
243                    match decision {
244                        ScalingDecision::Grow(amount) => {
245                            debug!("Auto-scaling: Growing queue by {}", amount);
246                        }
247                        ScalingDecision::Shrink(amount) => {
248                            debug!("Auto-scaling: Shrinking queue by {}", amount);
249                        }
250                        ScalingDecision::NoChange => {}
251                    }
252                }
253            }
254        }
255
256        event
257    }
258
259    /// Check if the queue is empty
260    pub fn is_empty(&self) -> bool {
261        self.queue.lock().unwrap().is_empty()
262    }
263
264    /// Get the current queue size
265    pub fn queue_size(&self) -> usize {
266        self.queue.lock().unwrap().len()
267    }
268
269    /// Detect the priority of an event
270    fn detect_priority(&self, event: &Event<M>) -> Priority {
271        // Use custom mapper if provided
272        if let Some(ref mapper) = self.config.priority_mapper {
273            // SAFETY: This transmute is safe because:
274            // 1. Event<M> and Event<()> have identical memory layouts (same discriminant, same size)
275            // 2. The mapper only inspects the event type discriminant, never accesses the data
276            // 3. We never construct or return the transmuted type, only read its discriminant
277            // TODO: Replace with safe alternative using pattern matching
278            let event_ref = unsafe { std::mem::transmute::<&Event<M>, &Event<()>>(event) };
279            return mapper(event_ref);
280        }
281
282        // Default automatic priority detection
283        Priority::from_event(event)
284    }
285
286    /// Process events from multiple sources with priority handling
287    pub fn process_events(
288        &self,
289        message_rx: &mpsc::Receiver<Event<M>>,
290        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
291        tick_rate: Duration,
292    ) -> Option<Event<M>> {
293        trace!("Processing events, queue size: {}", self.queue_size());
294
295        // First, drain all available events into the priority queue
296        self.drain_channels(message_rx, crossterm_rx);
297
298        // If we have events in the queue, return the highest priority one
299        if let Some(event) = self.pop() {
300            debug!(
301                "Returning event from queue: {:?}",
302                std::mem::discriminant(&event)
303            );
304            return Some(event);
305        }
306
307        // No events available, wait for new ones with timeout
308        match crossterm_rx.recv_timeout(tick_rate) {
309            Ok(ct_event) => self.handle_crossterm_event(ct_event, crossterm_rx),
310            Err(mpsc::RecvTimeoutError::Timeout) => {
311                // Check for messages one more time
312                if let Ok(msg) = message_rx.try_recv() {
313                    Some(msg)
314                } else {
315                    trace!("Generating tick event");
316                    Some(Event::Tick)
317                }
318            }
319            Err(_) => None,
320        }
321    }
322
323    /// Process events in headless mode (no terminal events)
324    pub fn process_events_headless(
325        &self,
326        message_rx: &mpsc::Receiver<Event<M>>,
327        tick_rate: Duration,
328    ) -> Option<Event<M>> {
329        // Drain all available messages into the priority queue
330        while let Ok(msg) = message_rx.try_recv() {
331            let _ = self.push(msg);
332        }
333
334        // Return highest priority event if available
335        if let Some(event) = self.pop() {
336            return Some(event);
337        }
338
339        // Wait for new messages with timeout
340        match message_rx.recv_timeout(tick_rate) {
341            Ok(msg) => Some(msg),
342            Err(mpsc::RecvTimeoutError::Timeout) => Some(Event::Tick),
343            Err(_) => None,
344        }
345    }
346
347    /// Drain all available events from channels into the priority queue
348    fn drain_channels(
349        &self,
350        message_rx: &mpsc::Receiver<Event<M>>,
351        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
352    ) {
353        // Drain all available user messages
354        while let Ok(msg) = message_rx.try_recv() {
355            if self.push(msg).is_err() {
356                break; // Queue is full
357            }
358        }
359
360        // Drain all available terminal events
361        while let Ok(ct_event) = crossterm_rx.try_recv() {
362            if let Some(event) = Self::convert_crossterm_event(ct_event) {
363                // Unsafe transmute to add the message type
364                // This is safe because Event<()> and Event<M> have the same layout
365                let typed_event = unsafe { std::mem::transmute_copy(&event) };
366                if self.push(typed_event).is_err() {
367                    break; // Queue is full
368                }
369            }
370        }
371    }
372
373    /// Handle a crossterm event, with special handling for resize events
374    fn handle_crossterm_event(
375        &self,
376        ct_event: CrosstermEvent,
377        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
378    ) -> Option<Event<M>> {
379        match ct_event {
380            CrosstermEvent::Resize(width, height) => {
381                // Coalesce multiple resize events
382                let (final_width, final_height) =
383                    Self::coalesce_resize_events(width, height, crossterm_rx);
384                Some(Event::Resize {
385                    width: final_width,
386                    height: final_height,
387                })
388            }
389            _ => Self::convert_crossterm_event(ct_event)
390                .map(|e| unsafe { std::mem::transmute_copy(&e) }),
391        }
392    }
393
394    /// Convert a crossterm event to a hojicha event
395    fn convert_crossterm_event(event: CrosstermEvent) -> Option<Event<()>> {
396        match event {
397            CrosstermEvent::Key(key) if key.kind == KeyEventKind::Press => {
398                Some(Event::Key(key.into()))
399            }
400            CrosstermEvent::Mouse(mouse) => Some(Event::Mouse(mouse.into())),
401            CrosstermEvent::Resize(width, height) => Some(Event::Resize { width, height }),
402            CrosstermEvent::Paste(data) => Some(Event::Paste(data)),
403            CrosstermEvent::FocusGained => Some(Event::Focus),
404            CrosstermEvent::FocusLost => Some(Event::Blur),
405            _ => None,
406        }
407    }
408
409    /// Coalesce multiple resize events into one
410    fn coalesce_resize_events(
411        initial_width: u16,
412        initial_height: u16,
413        rx: &mpsc::Receiver<CrosstermEvent>,
414    ) -> (u16, u16) {
415        let mut width = initial_width;
416        let mut height = initial_height;
417
418        // Drain any additional resize events
419        while let Ok(CrosstermEvent::Resize(w, h)) = rx.try_recv() {
420            width = w;
421            height = h;
422        }
423
424        debug!("Coalesced resize events to {}x{}", width, height);
425        (width, height)
426    }
427}
428
429impl<M: Message> Default for PriorityEventProcessor<M> {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435/// Public API for getting event processing statistics
436pub fn get_event_stats<M: Message>(processor: &PriorityEventProcessor<M>) -> String {
437    let stats = processor.stats();
438    format!(
439        "Events: {} total ({} high, {} normal, {} low), {} dropped, max queue: {}",
440        stats.total_events,
441        stats.high_priority_events,
442        stats.normal_priority_events,
443        stats.low_priority_events,
444        stats.dropped_events,
445        stats.queue_size_max
446    )
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[derive(Debug, Clone, PartialEq)]
454    struct TestMsg(String);
455
456    #[test]
457    fn test_priority_processor_creation() {
458        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
459        assert_eq!(processor.queue_size(), 0);
460        assert!(processor.is_empty());
461    }
462
463    #[test]
464    fn test_event_prioritization() {
465        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
466
467        // Add events in reverse priority order
468        processor.push(Event::Tick).unwrap();
469        processor
470            .push(Event::User(TestMsg("normal".to_string())))
471            .unwrap();
472        processor.push(Event::Quit).unwrap();
473
474        // Should get them back in priority order
475        assert_eq!(processor.pop(), Some(Event::Quit));
476        assert_eq!(
477            processor.pop(),
478            Some(Event::User(TestMsg("normal".to_string())))
479        );
480        assert_eq!(processor.pop(), Some(Event::Tick));
481    }
482
483    #[test]
484    fn test_statistics_tracking() {
485        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
486
487        processor.push(Event::Quit).unwrap();
488        processor
489            .push(Event::User(TestMsg("test".to_string())))
490            .unwrap();
491        processor.push(Event::Tick).unwrap();
492
493        let stats = processor.stats();
494        assert_eq!(stats.total_events, 3);
495        assert_eq!(stats.high_priority_events, 1);
496        assert_eq!(stats.normal_priority_events, 1);
497        assert_eq!(stats.low_priority_events, 1);
498    }
499
500    #[test]
501    fn test_custom_priority_mapper() {
502        let config = PriorityConfig {
503            max_queue_size: 100,
504            log_drops: false,
505            priority_mapper: Some(Arc::new(|_event| {
506                // Make everything high priority for testing
507                Priority::High
508            })),
509        };
510
511        let processor: PriorityEventProcessor<TestMsg> =
512            PriorityEventProcessor::with_config(config);
513
514        processor.push(Event::Tick).unwrap();
515        let stats = processor.stats();
516        assert_eq!(stats.high_priority_events, 1);
517        assert_eq!(stats.low_priority_events, 0);
518    }
519}