1use crate::metrics::{AdvancedEventStats, MetricsCollector, MetricsConfig};
7use crate::priority_queue::{Priority, PriorityEventQueue, ResizeError};
8use crate::queue_scaling::{AutoScaleConfig, QueueAutoScaler, ScalingDecision};
9use crossterm::event::{Event as CrosstermEvent, KeyEventKind};
10use hojicha_core::core::Message;
11use hojicha_core::event::Event;
12use log::{debug, info, trace, warn};
13use std::sync::mpsc;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17type PriorityMapper = Arc<dyn Fn(&Event<()>) -> Priority + Send + Sync>;
19
20#[derive(Debug, Clone, Default)]
22pub struct EventStats {
23 pub total_events: usize,
25 pub high_priority_events: usize,
27 pub normal_priority_events: usize,
29 pub low_priority_events: usize,
31 pub dropped_events: usize,
33 pub processing_time_ms: u128,
35 pub queue_size_max: usize,
37}
38
39#[derive(Clone)]
41pub struct PriorityConfig {
42 pub max_queue_size: usize,
44 pub log_drops: bool,
46 pub priority_mapper: Option<PriorityMapper>,
48}
49
50impl std::fmt::Debug for PriorityConfig {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("PriorityConfig")
53 .field("max_queue_size", &self.max_queue_size)
54 .field("log_drops", &self.log_drops)
55 .field("priority_mapper", &self.priority_mapper.is_some())
56 .finish()
57 }
58}
59
60impl Default for PriorityConfig {
61 fn default() -> Self {
62 Self {
63 max_queue_size: 1000,
64 log_drops: true,
65 priority_mapper: None,
66 }
67 }
68}
69
70pub struct PriorityEventProcessor<M: Message> {
72 queue: Arc<Mutex<PriorityEventQueue<M>>>,
73 stats: Arc<Mutex<EventStats>>,
74 config: PriorityConfig,
75 metrics: Arc<MetricsCollector>,
76 auto_scaler: Option<Arc<Mutex<QueueAutoScaler>>>,
77}
78
79impl<M: Message> PriorityEventProcessor<M> {
80 pub fn new() -> Self {
82 Self::with_config(PriorityConfig::default())
83 }
84
85 pub fn with_config(config: PriorityConfig) -> Self {
87 debug!(
88 "Initializing PriorityEventProcessor with max_queue_size: {}",
89 config.max_queue_size
90 );
91
92 let metrics_config = MetricsConfig {
93 track_percentiles: true,
94 track_by_type: true,
95 sampling_rate: 1.0,
96 max_histogram_size: 100_000,
97 rate_window: Duration::from_secs(60),
98 };
99
100 Self {
101 queue: Arc::new(Mutex::new(PriorityEventQueue::new(config.max_queue_size))),
102 stats: Arc::new(Mutex::new(EventStats::default())),
103 config,
104 metrics: Arc::new(MetricsCollector::new(metrics_config)),
105 auto_scaler: None,
106 }
107 }
108
109 pub fn stats(&self) -> EventStats {
111 self.stats.lock().unwrap().clone()
112 }
113
114 pub fn reset_stats(&self) {
116 *self.stats.lock().unwrap() = EventStats::default();
117 self.metrics.reset();
118 }
119
120 pub fn advanced_metrics(&self) -> AdvancedEventStats {
122 self.metrics.snapshot()
123 }
124
125 pub fn metrics_collector(&self) -> Arc<MetricsCollector> {
127 self.metrics.clone()
128 }
129
130 pub fn enable_auto_scaling(&mut self, config: AutoScaleConfig) {
132 info!("Enabling auto-scaling with config: {:?}", config);
133 self.auto_scaler = Some(Arc::new(Mutex::new(QueueAutoScaler::new(config))));
134 }
135
136 pub fn disable_auto_scaling(&mut self) {
138 info!("Disabling auto-scaling");
139 self.auto_scaler = None;
140 }
141
142 pub fn resize_queue(&self, new_size: usize) -> Result<(), ResizeError> {
144 let mut queue = self.queue.lock().unwrap();
145 let old_size = queue.capacity();
146 queue.resize(new_size)?;
147 info!("Queue resized from {} to {}", old_size, new_size);
148 Ok(())
149 }
150
151 pub fn queue_capacity(&self) -> usize {
153 self.queue.lock().unwrap().capacity()
154 }
155
156 pub fn push(&self, event: Event<M>) -> Result<(), Event<M>> {
158 let priority = self.detect_priority(&event);
159
160 trace!(
161 "Pushing event with priority {:?}: {:?}",
162 priority,
163 std::mem::discriminant(&event)
164 );
165
166 let mut queue = self.queue.lock().unwrap();
167 let result = queue.push(event);
168
169 let mut stats = self.stats.lock().unwrap();
171 stats.total_events += 1;
172
173 match priority {
174 Priority::High => stats.high_priority_events += 1,
175 Priority::Normal => stats.normal_priority_events += 1,
176 Priority::Low => stats.low_priority_events += 1,
177 }
178
179 let current_size = queue.len();
180 let capacity = self.config.max_queue_size;
181 if current_size > stats.queue_size_max {
182 stats.queue_size_max = current_size;
183 }
184
185 self.metrics.update_queue_depth(current_size, capacity);
187
188 if result.is_err() {
189 stats.dropped_events += 1;
190 self.metrics.record_dropped();
191 if self.config.log_drops {
192 warn!(
193 "Dropped event due to queue overflow (priority: {:?})",
194 priority
195 );
196 }
197 }
198
199 if queue.is_backpressure_active() {
201 debug!("Queue backpressure active: {} events queued", current_size);
202 self.metrics.record_backpressure();
203 }
204
205 result
206 }
207
208 pub fn pop(&self) -> Option<Event<M>> {
210 let start = Instant::now();
211 let event = self.queue.lock().unwrap().pop();
212
213 if let Some(ref e) = event {
214 let elapsed = start.elapsed();
215 let mut stats = self.stats.lock().unwrap();
216 stats.processing_time_ms += elapsed.as_millis();
217
218 let priority = Priority::from_event(e);
220 let event_type = match e {
221 Event::Quit => Some("quit"),
222 Event::Key(_) => Some("key"),
223 Event::Mouse(_) => Some("mouse"),
224 Event::User(_) => Some("user"),
225 Event::Resize { .. } => Some("resize"),
226 Event::Tick => Some("tick"),
227 Event::Paste(_) => Some("paste"),
228 Event::Focus => Some("focus"),
229 Event::Blur => Some("blur"),
230 Event::Suspend => Some("suspend"),
231 Event::Resume => Some("resume"),
232 Event::ExecProcess => Some("exec"),
233 };
234
235 self.metrics.record_event(priority, elapsed, event_type);
236
237 if let Some(ref auto_scaler) = self.auto_scaler {
239 let mut scaler = auto_scaler.lock().unwrap();
240 let mut queue = self.queue.lock().unwrap();
241
242 if let Some(decision) = scaler.on_event_processed(&mut queue) {
243 match decision {
244 ScalingDecision::Grow(amount) => {
245 debug!("Auto-scaling: Growing queue by {}", amount);
246 }
247 ScalingDecision::Shrink(amount) => {
248 debug!("Auto-scaling: Shrinking queue by {}", amount);
249 }
250 ScalingDecision::NoChange => {}
251 }
252 }
253 }
254 }
255
256 event
257 }
258
259 pub fn is_empty(&self) -> bool {
261 self.queue.lock().unwrap().is_empty()
262 }
263
264 pub fn queue_size(&self) -> usize {
266 self.queue.lock().unwrap().len()
267 }
268
269 fn detect_priority(&self, event: &Event<M>) -> Priority {
271 if let Some(ref mapper) = self.config.priority_mapper {
273 let event_ref = unsafe { std::mem::transmute::<&Event<M>, &Event<()>>(event) };
279 return mapper(event_ref);
280 }
281
282 Priority::from_event(event)
284 }
285
286 pub fn process_events(
288 &self,
289 message_rx: &mpsc::Receiver<Event<M>>,
290 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
291 tick_rate: Duration,
292 ) -> Option<Event<M>> {
293 trace!("Processing events, queue size: {}", self.queue_size());
294
295 self.drain_channels(message_rx, crossterm_rx);
297
298 if let Some(event) = self.pop() {
300 debug!(
301 "Returning event from queue: {:?}",
302 std::mem::discriminant(&event)
303 );
304 return Some(event);
305 }
306
307 match crossterm_rx.recv_timeout(tick_rate) {
309 Ok(ct_event) => self.handle_crossterm_event(ct_event, crossterm_rx),
310 Err(mpsc::RecvTimeoutError::Timeout) => {
311 if let Ok(msg) = message_rx.try_recv() {
313 Some(msg)
314 } else {
315 trace!("Generating tick event");
316 Some(Event::Tick)
317 }
318 }
319 Err(_) => None,
320 }
321 }
322
323 pub fn process_events_headless(
325 &self,
326 message_rx: &mpsc::Receiver<Event<M>>,
327 tick_rate: Duration,
328 ) -> Option<Event<M>> {
329 while let Ok(msg) = message_rx.try_recv() {
331 let _ = self.push(msg);
332 }
333
334 if let Some(event) = self.pop() {
336 return Some(event);
337 }
338
339 match message_rx.recv_timeout(tick_rate) {
341 Ok(msg) => Some(msg),
342 Err(mpsc::RecvTimeoutError::Timeout) => Some(Event::Tick),
343 Err(_) => None,
344 }
345 }
346
347 fn drain_channels(
349 &self,
350 message_rx: &mpsc::Receiver<Event<M>>,
351 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
352 ) {
353 while let Ok(msg) = message_rx.try_recv() {
355 if self.push(msg).is_err() {
356 break; }
358 }
359
360 while let Ok(ct_event) = crossterm_rx.try_recv() {
362 if let Some(event) = Self::convert_crossterm_event(ct_event) {
363 let typed_event = unsafe { std::mem::transmute_copy(&event) };
366 if self.push(typed_event).is_err() {
367 break; }
369 }
370 }
371 }
372
373 fn handle_crossterm_event(
375 &self,
376 ct_event: CrosstermEvent,
377 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
378 ) -> Option<Event<M>> {
379 match ct_event {
380 CrosstermEvent::Resize(width, height) => {
381 let (final_width, final_height) =
383 Self::coalesce_resize_events(width, height, crossterm_rx);
384 Some(Event::Resize {
385 width: final_width,
386 height: final_height,
387 })
388 }
389 _ => Self::convert_crossterm_event(ct_event)
390 .map(|e| unsafe { std::mem::transmute_copy(&e) }),
391 }
392 }
393
394 fn convert_crossterm_event(event: CrosstermEvent) -> Option<Event<()>> {
396 match event {
397 CrosstermEvent::Key(key) if key.kind == KeyEventKind::Press => {
398 Some(Event::Key(key.into()))
399 }
400 CrosstermEvent::Mouse(mouse) => Some(Event::Mouse(mouse.into())),
401 CrosstermEvent::Resize(width, height) => Some(Event::Resize { width, height }),
402 CrosstermEvent::Paste(data) => Some(Event::Paste(data)),
403 CrosstermEvent::FocusGained => Some(Event::Focus),
404 CrosstermEvent::FocusLost => Some(Event::Blur),
405 _ => None,
406 }
407 }
408
409 fn coalesce_resize_events(
411 initial_width: u16,
412 initial_height: u16,
413 rx: &mpsc::Receiver<CrosstermEvent>,
414 ) -> (u16, u16) {
415 let mut width = initial_width;
416 let mut height = initial_height;
417
418 while let Ok(CrosstermEvent::Resize(w, h)) = rx.try_recv() {
420 width = w;
421 height = h;
422 }
423
424 debug!("Coalesced resize events to {}x{}", width, height);
425 (width, height)
426 }
427}
428
429impl<M: Message> Default for PriorityEventProcessor<M> {
430 fn default() -> Self {
431 Self::new()
432 }
433}
434
435pub fn get_event_stats<M: Message>(processor: &PriorityEventProcessor<M>) -> String {
437 let stats = processor.stats();
438 format!(
439 "Events: {} total ({} high, {} normal, {} low), {} dropped, max queue: {}",
440 stats.total_events,
441 stats.high_priority_events,
442 stats.normal_priority_events,
443 stats.low_priority_events,
444 stats.dropped_events,
445 stats.queue_size_max
446 )
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 #[derive(Debug, Clone, PartialEq)]
454 struct TestMsg(String);
455
456 #[test]
457 fn test_priority_processor_creation() {
458 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
459 assert_eq!(processor.queue_size(), 0);
460 assert!(processor.is_empty());
461 }
462
463 #[test]
464 fn test_event_prioritization() {
465 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
466
467 processor.push(Event::Tick).unwrap();
469 processor
470 .push(Event::User(TestMsg("normal".to_string())))
471 .unwrap();
472 processor.push(Event::Quit).unwrap();
473
474 assert_eq!(processor.pop(), Some(Event::Quit));
476 assert_eq!(
477 processor.pop(),
478 Some(Event::User(TestMsg("normal".to_string())))
479 );
480 assert_eq!(processor.pop(), Some(Event::Tick));
481 }
482
483 #[test]
484 fn test_statistics_tracking() {
485 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
486
487 processor.push(Event::Quit).unwrap();
488 processor
489 .push(Event::User(TestMsg("test".to_string())))
490 .unwrap();
491 processor.push(Event::Tick).unwrap();
492
493 let stats = processor.stats();
494 assert_eq!(stats.total_events, 3);
495 assert_eq!(stats.high_priority_events, 1);
496 assert_eq!(stats.normal_priority_events, 1);
497 assert_eq!(stats.low_priority_events, 1);
498 }
499
500 #[test]
501 fn test_custom_priority_mapper() {
502 let config = PriorityConfig {
503 max_queue_size: 100,
504 log_drops: false,
505 priority_mapper: Some(Arc::new(|_event| {
506 Priority::High
508 })),
509 };
510
511 let processor: PriorityEventProcessor<TestMsg> =
512 PriorityEventProcessor::with_config(config);
513
514 processor.push(Event::Tick).unwrap();
515 let stats = processor.stats();
516 assert_eq!(stats.high_priority_events, 1);
517 assert_eq!(stats.low_priority_events, 0);
518 }
519}