hojicha_runtime/program/
priority_event_processor.rs

1//! Priority-aware event processing with automatic event prioritization
2//!
3//! This module implements the default event processing with built-in prioritization
4//! to ensure UI responsiveness even under heavy event load.
5
6use crate::metrics::{AdvancedEventStats, MetricsCollector, MetricsConfig};
7use crate::priority_queue::{Priority, PriorityEventQueue, ResizeError};
8use crate::queue_scaling::{AutoScaleConfig, QueueAutoScaler, ScalingDecision};
9use crossterm::event::{Event as CrosstermEvent, KeyEventKind};
10use hojicha_core::core::Message;
11use hojicha_core::event::Event;
12use log::{debug, info, trace, warn};
13use std::sync::mpsc;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17/// Type alias for custom priority mapping function
18type PriorityMapper = Arc<dyn Fn(&Event<()>) -> Priority + Send + Sync>;
19
20/// Statistics for monitoring event processing behavior
21#[derive(Debug, Clone, Default)]
22pub struct EventStats {
23    /// Total number of events processed
24    pub total_events: usize,
25    /// Number of high priority events processed
26    pub high_priority_events: usize,
27    /// Number of normal priority events processed
28    pub normal_priority_events: usize,
29    /// Number of low priority events processed
30    pub low_priority_events: usize,
31    /// Number of events dropped due to queue overflow
32    pub dropped_events: usize,
33    /// Total processing time in milliseconds
34    pub processing_time_ms: u128,
35    /// Maximum queue size reached during processing
36    pub queue_size_max: usize,
37}
38
39/// Configuration for priority event processing
40#[derive(Clone)]
41pub struct PriorityConfig {
42    /// Maximum queue size (default: 1000)
43    pub max_queue_size: usize,
44    /// Whether to log dropped events (default: true)
45    pub log_drops: bool,
46    /// Custom priority mapper (default: None, uses automatic detection)
47    pub priority_mapper: Option<PriorityMapper>,
48}
49
50impl std::fmt::Debug for PriorityConfig {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("PriorityConfig")
53            .field("max_queue_size", &self.max_queue_size)
54            .field("log_drops", &self.log_drops)
55            .field("priority_mapper", &self.priority_mapper.is_some())
56            .finish()
57    }
58}
59
60impl Default for PriorityConfig {
61    fn default() -> Self {
62        Self {
63            max_queue_size: 1000,
64            log_drops: true,
65            priority_mapper: None,
66        }
67    }
68}
69
70/// Priority-aware event processor
71pub struct PriorityEventProcessor<M: Message> {
72    queue: Arc<Mutex<PriorityEventQueue<M>>>,
73    stats: Arc<Mutex<EventStats>>,
74    config: PriorityConfig,
75    metrics: Arc<MetricsCollector>,
76    auto_scaler: Option<Arc<Mutex<QueueAutoScaler>>>,
77}
78
79impl<M: Message> PriorityEventProcessor<M> {
80    /// Create a new priority event processor with default configuration
81    pub fn new() -> Self {
82        Self::with_config(PriorityConfig::default())
83    }
84
85    /// Create a new priority event processor with custom configuration
86    pub fn with_config(config: PriorityConfig) -> Self {
87        debug!(
88            "Initializing PriorityEventProcessor with max_queue_size: {}",
89            config.max_queue_size
90        );
91
92        let metrics_config = MetricsConfig {
93            track_percentiles: true,
94            track_by_type: true,
95            sampling_rate: 1.0,
96            max_histogram_size: 100_000,
97            rate_window: Duration::from_secs(60),
98        };
99
100        Self {
101            queue: Arc::new(Mutex::new(PriorityEventQueue::new(config.max_queue_size))),
102            stats: Arc::new(Mutex::new(EventStats::default())),
103            config,
104            metrics: Arc::new(MetricsCollector::new(metrics_config)),
105            auto_scaler: None,
106        }
107    }
108
109    /// Get current statistics
110    pub fn stats(&self) -> EventStats {
111        self.stats.lock().unwrap().clone()
112    }
113
114    /// Reset statistics
115    pub fn reset_stats(&self) {
116        *self.stats.lock().unwrap() = EventStats::default();
117        self.metrics.reset();
118    }
119
120    /// Get advanced metrics snapshot
121    pub fn advanced_metrics(&self) -> AdvancedEventStats {
122        self.metrics.snapshot()
123    }
124
125    /// Get metrics collector for external monitoring
126    pub fn metrics_collector(&self) -> Arc<MetricsCollector> {
127        self.metrics.clone()
128    }
129
130    /// Enable auto-scaling with the given configuration
131    pub fn enable_auto_scaling(&mut self, config: AutoScaleConfig) {
132        info!("Enabling auto-scaling with config: {:?}", config);
133        self.auto_scaler = Some(Arc::new(Mutex::new(QueueAutoScaler::new(config))));
134    }
135
136    /// Disable auto-scaling
137    pub fn disable_auto_scaling(&mut self) {
138        info!("Disabling auto-scaling");
139        self.auto_scaler = None;
140    }
141
142    /// Manually resize the queue
143    pub fn resize_queue(&self, new_size: usize) -> Result<(), ResizeError> {
144        let mut queue = self.queue.lock().unwrap();
145        let old_size = queue.capacity();
146        queue.resize(new_size)?;
147        info!("Queue resized from {} to {}", old_size, new_size);
148        Ok(())
149    }
150
151    /// Get current queue capacity
152    pub fn queue_capacity(&self) -> usize {
153        self.queue.lock().unwrap().capacity()
154    }
155
156    /// Push an event to the queue
157    pub fn push(&self, event: Event<M>) -> Result<(), Event<M>> {
158        let priority = self.detect_priority(&event);
159
160        trace!(
161            "Pushing event with priority {:?}: {:?}",
162            priority,
163            std::mem::discriminant(&event)
164        );
165
166        let mut queue = self.queue.lock().unwrap();
167        let result = queue.push(event);
168
169        // Update statistics
170        let mut stats = self.stats.lock().unwrap();
171        stats.total_events += 1;
172
173        match priority {
174            Priority::High => stats.high_priority_events += 1,
175            Priority::Normal => stats.normal_priority_events += 1,
176            Priority::Low => stats.low_priority_events += 1,
177        }
178
179        let current_size = queue.len();
180        let capacity = self.config.max_queue_size;
181        if current_size > stats.queue_size_max {
182            stats.queue_size_max = current_size;
183        }
184
185        // Update metrics
186        self.metrics.update_queue_depth(current_size, capacity);
187
188        if result.is_err() {
189            stats.dropped_events += 1;
190            self.metrics.record_dropped();
191            if self.config.log_drops {
192                warn!(
193                    "Dropped event due to queue overflow (priority: {:?})",
194                    priority
195                );
196            }
197        }
198
199        // Log if queue is getting full
200        if queue.is_backpressure_active() {
201            debug!("Queue backpressure active: {} events queued", current_size);
202            self.metrics.record_backpressure();
203        }
204
205        result
206    }
207
208    /// Pop the highest priority event from the queue
209    pub fn pop(&self) -> Option<Event<M>> {
210        let start = Instant::now();
211        let event = self.queue.lock().unwrap().pop();
212
213        if let Some(ref e) = event {
214            let elapsed = start.elapsed();
215            let mut stats = self.stats.lock().unwrap();
216            stats.processing_time_ms += elapsed.as_millis();
217
218            // Record metrics for the event
219            let priority = Priority::from_event(e);
220            let event_type = match e {
221                Event::Quit => Some("quit"),
222                Event::Key(_) => Some("key"),
223                Event::Mouse(_) => Some("mouse"),
224                Event::User(_) => Some("user"),
225                Event::Resize { .. } => Some("resize"),
226                Event::Tick => Some("tick"),
227                Event::Paste(_) => Some("paste"),
228                Event::Focus => Some("focus"),
229                Event::Blur => Some("blur"),
230                Event::Suspend => Some("suspend"),
231                Event::Resume => Some("resume"),
232                Event::ExecProcess => Some("exec"),
233            };
234
235            self.metrics.record_event(priority, elapsed, event_type);
236
237            // Check if auto-scaling should be triggered
238            if let Some(ref auto_scaler) = self.auto_scaler {
239                let mut scaler = auto_scaler.lock().unwrap();
240                let mut queue = self.queue.lock().unwrap();
241
242                if let Some(decision) = scaler.on_event_processed(&mut queue) {
243                    match decision {
244                        ScalingDecision::Grow(amount) => {
245                            debug!("Auto-scaling: Growing queue by {}", amount);
246                        }
247                        ScalingDecision::Shrink(amount) => {
248                            debug!("Auto-scaling: Shrinking queue by {}", amount);
249                        }
250                        ScalingDecision::NoChange => {}
251                    }
252                }
253            }
254        }
255
256        event
257    }
258
259    /// Check if the queue is empty
260    pub fn is_empty(&self) -> bool {
261        self.queue.lock().unwrap().is_empty()
262    }
263
264    /// Get the current queue size
265    pub fn queue_size(&self) -> usize {
266        self.queue.lock().unwrap().len()
267    }
268
269    /// Detect the priority of an event
270    fn detect_priority(&self, event: &Event<M>) -> Priority {
271        // Use custom mapper if provided
272        if let Some(ref mapper) = self.config.priority_mapper {
273            // We need to transmute to Event<()> for the mapper
274            // This is safe because we only look at the discriminant, not the data
275            let event_ref = unsafe { std::mem::transmute::<&Event<M>, &Event<()>>(event) };
276            return mapper(event_ref);
277        }
278
279        // Default automatic priority detection
280        Priority::from_event(event)
281    }
282
283    /// Process events from multiple sources with priority handling
284    pub fn process_events(
285        &self,
286        message_rx: &mpsc::Receiver<Event<M>>,
287        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
288        tick_rate: Duration,
289    ) -> Option<Event<M>> {
290        trace!("Processing events, queue size: {}", self.queue_size());
291
292        // First, drain all available events into the priority queue
293        self.drain_channels(message_rx, crossterm_rx);
294
295        // If we have events in the queue, return the highest priority one
296        if let Some(event) = self.pop() {
297            debug!(
298                "Returning event from queue: {:?}",
299                std::mem::discriminant(&event)
300            );
301            return Some(event);
302        }
303
304        // No events available, wait for new ones with timeout
305        match crossterm_rx.recv_timeout(tick_rate) {
306            Ok(ct_event) => self.handle_crossterm_event(ct_event, crossterm_rx),
307            Err(mpsc::RecvTimeoutError::Timeout) => {
308                // Check for messages one more time
309                if let Ok(msg) = message_rx.try_recv() {
310                    Some(msg)
311                } else {
312                    trace!("Generating tick event");
313                    Some(Event::Tick)
314                }
315            }
316            Err(_) => None,
317        }
318    }
319
320    /// Process events in headless mode (no terminal events)
321    pub fn process_events_headless(
322        &self,
323        message_rx: &mpsc::Receiver<Event<M>>,
324        tick_rate: Duration,
325    ) -> Option<Event<M>> {
326        // Drain all available messages into the priority queue
327        while let Ok(msg) = message_rx.try_recv() {
328            let _ = self.push(msg);
329        }
330
331        // Return highest priority event if available
332        if let Some(event) = self.pop() {
333            return Some(event);
334        }
335
336        // Wait for new messages with timeout
337        match message_rx.recv_timeout(tick_rate) {
338            Ok(msg) => Some(msg),
339            Err(mpsc::RecvTimeoutError::Timeout) => Some(Event::Tick),
340            Err(_) => None,
341        }
342    }
343
344    /// Drain all available events from channels into the priority queue
345    fn drain_channels(
346        &self,
347        message_rx: &mpsc::Receiver<Event<M>>,
348        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
349    ) {
350        // Drain all available user messages
351        while let Ok(msg) = message_rx.try_recv() {
352            if self.push(msg).is_err() {
353                break; // Queue is full
354            }
355        }
356
357        // Drain all available terminal events
358        while let Ok(ct_event) = crossterm_rx.try_recv() {
359            if let Some(event) = Self::convert_crossterm_event(ct_event) {
360                // Unsafe transmute to add the message type
361                // This is safe because Event<()> and Event<M> have the same layout
362                let typed_event = unsafe { std::mem::transmute_copy(&event) };
363                if self.push(typed_event).is_err() {
364                    break; // Queue is full
365                }
366            }
367        }
368    }
369
370    /// Handle a crossterm event, with special handling for resize events
371    fn handle_crossterm_event(
372        &self,
373        ct_event: CrosstermEvent,
374        crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
375    ) -> Option<Event<M>> {
376        match ct_event {
377            CrosstermEvent::Resize(width, height) => {
378                // Coalesce multiple resize events
379                let (final_width, final_height) =
380                    Self::coalesce_resize_events(width, height, crossterm_rx);
381                Some(Event::Resize {
382                    width: final_width,
383                    height: final_height,
384                })
385            }
386            _ => Self::convert_crossterm_event(ct_event)
387                .map(|e| unsafe { std::mem::transmute_copy(&e) }),
388        }
389    }
390
391    /// Convert a crossterm event to a hojicha event
392    fn convert_crossterm_event(event: CrosstermEvent) -> Option<Event<()>> {
393        match event {
394            CrosstermEvent::Key(key) if key.kind == KeyEventKind::Press => {
395                Some(Event::Key(key.into()))
396            }
397            CrosstermEvent::Mouse(mouse) => Some(Event::Mouse(mouse.into())),
398            CrosstermEvent::Resize(width, height) => Some(Event::Resize { width, height }),
399            CrosstermEvent::Paste(data) => Some(Event::Paste(data)),
400            CrosstermEvent::FocusGained => Some(Event::Focus),
401            CrosstermEvent::FocusLost => Some(Event::Blur),
402            _ => None,
403        }
404    }
405
406    /// Coalesce multiple resize events into one
407    fn coalesce_resize_events(
408        initial_width: u16,
409        initial_height: u16,
410        rx: &mpsc::Receiver<CrosstermEvent>,
411    ) -> (u16, u16) {
412        let mut width = initial_width;
413        let mut height = initial_height;
414
415        // Drain any additional resize events
416        while let Ok(CrosstermEvent::Resize(w, h)) = rx.try_recv() {
417            width = w;
418            height = h;
419        }
420
421        debug!("Coalesced resize events to {}x{}", width, height);
422        (width, height)
423    }
424}
425
426impl<M: Message> Default for PriorityEventProcessor<M> {
427    fn default() -> Self {
428        Self::new()
429    }
430}
431
432/// Public API for getting event processing statistics
433pub fn get_event_stats<M: Message>(processor: &PriorityEventProcessor<M>) -> String {
434    let stats = processor.stats();
435    format!(
436        "Events: {} total ({} high, {} normal, {} low), {} dropped, max queue: {}",
437        stats.total_events,
438        stats.high_priority_events,
439        stats.normal_priority_events,
440        stats.low_priority_events,
441        stats.dropped_events,
442        stats.queue_size_max
443    )
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    #[derive(Debug, Clone, PartialEq)]
451    struct TestMsg(String);
452
453    #[test]
454    fn test_priority_processor_creation() {
455        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
456        assert_eq!(processor.queue_size(), 0);
457        assert!(processor.is_empty());
458    }
459
460    #[test]
461    fn test_event_prioritization() {
462        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
463
464        // Add events in reverse priority order
465        processor.push(Event::Tick).unwrap();
466        processor
467            .push(Event::User(TestMsg("normal".to_string())))
468            .unwrap();
469        processor.push(Event::Quit).unwrap();
470
471        // Should get them back in priority order
472        assert_eq!(processor.pop(), Some(Event::Quit));
473        assert_eq!(
474            processor.pop(),
475            Some(Event::User(TestMsg("normal".to_string())))
476        );
477        assert_eq!(processor.pop(), Some(Event::Tick));
478    }
479
480    #[test]
481    fn test_statistics_tracking() {
482        let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
483
484        processor.push(Event::Quit).unwrap();
485        processor
486            .push(Event::User(TestMsg("test".to_string())))
487            .unwrap();
488        processor.push(Event::Tick).unwrap();
489
490        let stats = processor.stats();
491        assert_eq!(stats.total_events, 3);
492        assert_eq!(stats.high_priority_events, 1);
493        assert_eq!(stats.normal_priority_events, 1);
494        assert_eq!(stats.low_priority_events, 1);
495    }
496
497    #[test]
498    fn test_custom_priority_mapper() {
499        let config = PriorityConfig {
500            max_queue_size: 100,
501            log_drops: false,
502            priority_mapper: Some(Arc::new(|_event| {
503                // Make everything high priority for testing
504                Priority::High
505            })),
506        };
507
508        let processor: PriorityEventProcessor<TestMsg> =
509            PriorityEventProcessor::with_config(config);
510
511        processor.push(Event::Tick).unwrap();
512        let stats = processor.stats();
513        assert_eq!(stats.high_priority_events, 1);
514        assert_eq!(stats.low_priority_events, 0);
515    }
516}