1use crate::metrics::{AdvancedEventStats, MetricsCollector, MetricsConfig};
7use crate::priority_queue::{Priority, PriorityEventQueue, ResizeError};
8use crate::queue_scaling::{AutoScaleConfig, QueueAutoScaler, ScalingDecision};
9use crossterm::event::{Event as CrosstermEvent, KeyEventKind};
10use hojicha_core::core::Message;
11use hojicha_core::event::Event;
12use log::{debug, info, trace, warn};
13use std::sync::mpsc;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17type PriorityMapper = Arc<dyn Fn(&Event<()>) -> Priority + Send + Sync>;
19
20#[derive(Debug, Clone, Default)]
22pub struct EventStats {
23 pub total_events: usize,
25 pub high_priority_events: usize,
27 pub normal_priority_events: usize,
29 pub low_priority_events: usize,
31 pub dropped_events: usize,
33 pub processing_time_ms: u128,
35 pub queue_size_max: usize,
37}
38
39#[derive(Clone)]
41pub struct PriorityConfig {
42 pub max_queue_size: usize,
44 pub log_drops: bool,
46 pub priority_mapper: Option<PriorityMapper>,
48}
49
50impl std::fmt::Debug for PriorityConfig {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("PriorityConfig")
53 .field("max_queue_size", &self.max_queue_size)
54 .field("log_drops", &self.log_drops)
55 .field("priority_mapper", &self.priority_mapper.is_some())
56 .finish()
57 }
58}
59
60impl Default for PriorityConfig {
61 fn default() -> Self {
62 Self {
63 max_queue_size: 1000,
64 log_drops: true,
65 priority_mapper: None,
66 }
67 }
68}
69
70pub struct PriorityEventProcessor<M: Message> {
72 queue: Arc<Mutex<PriorityEventQueue<M>>>,
73 stats: Arc<Mutex<EventStats>>,
74 config: PriorityConfig,
75 metrics: Arc<MetricsCollector>,
76 auto_scaler: Option<Arc<Mutex<QueueAutoScaler>>>,
77}
78
79impl<M: Message> PriorityEventProcessor<M> {
80 pub fn new() -> Self {
82 Self::with_config(PriorityConfig::default())
83 }
84
85 pub fn with_config(config: PriorityConfig) -> Self {
87 debug!(
88 "Initializing PriorityEventProcessor with max_queue_size: {}",
89 config.max_queue_size
90 );
91
92 let metrics_config = MetricsConfig {
93 track_percentiles: true,
94 track_by_type: true,
95 sampling_rate: 1.0,
96 max_histogram_size: 100_000,
97 rate_window: Duration::from_secs(60),
98 };
99
100 Self {
101 queue: Arc::new(Mutex::new(PriorityEventQueue::new(config.max_queue_size))),
102 stats: Arc::new(Mutex::new(EventStats::default())),
103 config,
104 metrics: Arc::new(MetricsCollector::new(metrics_config)),
105 auto_scaler: None,
106 }
107 }
108
109 pub fn stats(&self) -> EventStats {
111 self.stats.lock().unwrap().clone()
112 }
113
114 pub fn reset_stats(&self) {
116 *self.stats.lock().unwrap() = EventStats::default();
117 self.metrics.reset();
118 }
119
120 pub fn advanced_metrics(&self) -> AdvancedEventStats {
122 self.metrics.snapshot()
123 }
124
125 pub fn metrics_collector(&self) -> Arc<MetricsCollector> {
127 self.metrics.clone()
128 }
129
130 pub fn enable_auto_scaling(&mut self, config: AutoScaleConfig) {
132 info!("Enabling auto-scaling with config: {:?}", config);
133 self.auto_scaler = Some(Arc::new(Mutex::new(QueueAutoScaler::new(config))));
134 }
135
136 pub fn disable_auto_scaling(&mut self) {
138 info!("Disabling auto-scaling");
139 self.auto_scaler = None;
140 }
141
142 pub fn resize_queue(&self, new_size: usize) -> Result<(), ResizeError> {
144 let mut queue = self.queue.lock().unwrap();
145 let old_size = queue.capacity();
146 queue.resize(new_size)?;
147 info!("Queue resized from {} to {}", old_size, new_size);
148 Ok(())
149 }
150
151 pub fn queue_capacity(&self) -> usize {
153 self.queue.lock().unwrap().capacity()
154 }
155
156 pub fn push(&self, event: Event<M>) -> Result<(), Event<M>> {
158 let priority = self.detect_priority(&event);
159
160 trace!(
161 "Pushing event with priority {:?}: {:?}",
162 priority,
163 std::mem::discriminant(&event)
164 );
165
166 let mut queue = self.queue.lock().unwrap();
167 let result = queue.push(event);
168
169 let mut stats = self.stats.lock().unwrap();
171 stats.total_events += 1;
172
173 match priority {
174 Priority::High => stats.high_priority_events += 1,
175 Priority::Normal => stats.normal_priority_events += 1,
176 Priority::Low => stats.low_priority_events += 1,
177 }
178
179 let current_size = queue.len();
180 let capacity = self.config.max_queue_size;
181 if current_size > stats.queue_size_max {
182 stats.queue_size_max = current_size;
183 }
184
185 self.metrics.update_queue_depth(current_size, capacity);
187
188 if result.is_err() {
189 stats.dropped_events += 1;
190 self.metrics.record_dropped();
191 if self.config.log_drops {
192 warn!(
193 "Dropped event due to queue overflow (priority: {:?})",
194 priority
195 );
196 }
197 }
198
199 if queue.is_backpressure_active() {
201 debug!("Queue backpressure active: {} events queued", current_size);
202 self.metrics.record_backpressure();
203 }
204
205 result
206 }
207
208 pub fn pop(&self) -> Option<Event<M>> {
210 let start = Instant::now();
211 let event = self.queue.lock().unwrap().pop();
212
213 if let Some(ref e) = event {
214 let elapsed = start.elapsed();
215 let mut stats = self.stats.lock().unwrap();
216 stats.processing_time_ms += elapsed.as_millis();
217
218 let priority = Priority::from_event(e);
220 let event_type = match e {
221 Event::Quit => Some("quit"),
222 Event::Key(_) => Some("key"),
223 Event::Mouse(_) => Some("mouse"),
224 Event::User(_) => Some("user"),
225 Event::Resize { .. } => Some("resize"),
226 Event::Tick => Some("tick"),
227 Event::Paste(_) => Some("paste"),
228 Event::Focus => Some("focus"),
229 Event::Blur => Some("blur"),
230 Event::Suspend => Some("suspend"),
231 Event::Resume => Some("resume"),
232 Event::ExecProcess => Some("exec"),
233 };
234
235 self.metrics.record_event(priority, elapsed, event_type);
236
237 if let Some(ref auto_scaler) = self.auto_scaler {
239 let mut scaler = auto_scaler.lock().unwrap();
240 let mut queue = self.queue.lock().unwrap();
241
242 if let Some(decision) = scaler.on_event_processed(&mut queue) {
243 match decision {
244 ScalingDecision::Grow(amount) => {
245 debug!("Auto-scaling: Growing queue by {}", amount);
246 }
247 ScalingDecision::Shrink(amount) => {
248 debug!("Auto-scaling: Shrinking queue by {}", amount);
249 }
250 ScalingDecision::NoChange => {}
251 }
252 }
253 }
254 }
255
256 event
257 }
258
259 pub fn is_empty(&self) -> bool {
261 self.queue.lock().unwrap().is_empty()
262 }
263
264 pub fn queue_size(&self) -> usize {
266 self.queue.lock().unwrap().len()
267 }
268
269 fn detect_priority(&self, event: &Event<M>) -> Priority {
271 if let Some(ref mapper) = self.config.priority_mapper {
273 let event_ref = unsafe { std::mem::transmute::<&Event<M>, &Event<()>>(event) };
276 return mapper(event_ref);
277 }
278
279 Priority::from_event(event)
281 }
282
283 pub fn process_events(
285 &self,
286 message_rx: &mpsc::Receiver<Event<M>>,
287 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
288 tick_rate: Duration,
289 ) -> Option<Event<M>> {
290 trace!("Processing events, queue size: {}", self.queue_size());
291
292 self.drain_channels(message_rx, crossterm_rx);
294
295 if let Some(event) = self.pop() {
297 debug!(
298 "Returning event from queue: {:?}",
299 std::mem::discriminant(&event)
300 );
301 return Some(event);
302 }
303
304 match crossterm_rx.recv_timeout(tick_rate) {
306 Ok(ct_event) => self.handle_crossterm_event(ct_event, crossterm_rx),
307 Err(mpsc::RecvTimeoutError::Timeout) => {
308 if let Ok(msg) = message_rx.try_recv() {
310 Some(msg)
311 } else {
312 trace!("Generating tick event");
313 Some(Event::Tick)
314 }
315 }
316 Err(_) => None,
317 }
318 }
319
320 pub fn process_events_headless(
322 &self,
323 message_rx: &mpsc::Receiver<Event<M>>,
324 tick_rate: Duration,
325 ) -> Option<Event<M>> {
326 while let Ok(msg) = message_rx.try_recv() {
328 let _ = self.push(msg);
329 }
330
331 if let Some(event) = self.pop() {
333 return Some(event);
334 }
335
336 match message_rx.recv_timeout(tick_rate) {
338 Ok(msg) => Some(msg),
339 Err(mpsc::RecvTimeoutError::Timeout) => Some(Event::Tick),
340 Err(_) => None,
341 }
342 }
343
344 fn drain_channels(
346 &self,
347 message_rx: &mpsc::Receiver<Event<M>>,
348 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
349 ) {
350 while let Ok(msg) = message_rx.try_recv() {
352 if self.push(msg).is_err() {
353 break; }
355 }
356
357 while let Ok(ct_event) = crossterm_rx.try_recv() {
359 if let Some(event) = Self::convert_crossterm_event(ct_event) {
360 let typed_event = unsafe { std::mem::transmute_copy(&event) };
363 if self.push(typed_event).is_err() {
364 break; }
366 }
367 }
368 }
369
370 fn handle_crossterm_event(
372 &self,
373 ct_event: CrosstermEvent,
374 crossterm_rx: &mpsc::Receiver<CrosstermEvent>,
375 ) -> Option<Event<M>> {
376 match ct_event {
377 CrosstermEvent::Resize(width, height) => {
378 let (final_width, final_height) =
380 Self::coalesce_resize_events(width, height, crossterm_rx);
381 Some(Event::Resize {
382 width: final_width,
383 height: final_height,
384 })
385 }
386 _ => Self::convert_crossterm_event(ct_event)
387 .map(|e| unsafe { std::mem::transmute_copy(&e) }),
388 }
389 }
390
391 fn convert_crossterm_event(event: CrosstermEvent) -> Option<Event<()>> {
393 match event {
394 CrosstermEvent::Key(key) if key.kind == KeyEventKind::Press => {
395 Some(Event::Key(key.into()))
396 }
397 CrosstermEvent::Mouse(mouse) => Some(Event::Mouse(mouse.into())),
398 CrosstermEvent::Resize(width, height) => Some(Event::Resize { width, height }),
399 CrosstermEvent::Paste(data) => Some(Event::Paste(data)),
400 CrosstermEvent::FocusGained => Some(Event::Focus),
401 CrosstermEvent::FocusLost => Some(Event::Blur),
402 _ => None,
403 }
404 }
405
406 fn coalesce_resize_events(
408 initial_width: u16,
409 initial_height: u16,
410 rx: &mpsc::Receiver<CrosstermEvent>,
411 ) -> (u16, u16) {
412 let mut width = initial_width;
413 let mut height = initial_height;
414
415 while let Ok(CrosstermEvent::Resize(w, h)) = rx.try_recv() {
417 width = w;
418 height = h;
419 }
420
421 debug!("Coalesced resize events to {}x{}", width, height);
422 (width, height)
423 }
424}
425
426impl<M: Message> Default for PriorityEventProcessor<M> {
427 fn default() -> Self {
428 Self::new()
429 }
430}
431
432pub fn get_event_stats<M: Message>(processor: &PriorityEventProcessor<M>) -> String {
434 let stats = processor.stats();
435 format!(
436 "Events: {} total ({} high, {} normal, {} low), {} dropped, max queue: {}",
437 stats.total_events,
438 stats.high_priority_events,
439 stats.normal_priority_events,
440 stats.low_priority_events,
441 stats.dropped_events,
442 stats.queue_size_max
443 )
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[derive(Debug, Clone, PartialEq)]
451 struct TestMsg(String);
452
453 #[test]
454 fn test_priority_processor_creation() {
455 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
456 assert_eq!(processor.queue_size(), 0);
457 assert!(processor.is_empty());
458 }
459
460 #[test]
461 fn test_event_prioritization() {
462 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
463
464 processor.push(Event::Tick).unwrap();
466 processor
467 .push(Event::User(TestMsg("normal".to_string())))
468 .unwrap();
469 processor.push(Event::Quit).unwrap();
470
471 assert_eq!(processor.pop(), Some(Event::Quit));
473 assert_eq!(
474 processor.pop(),
475 Some(Event::User(TestMsg("normal".to_string())))
476 );
477 assert_eq!(processor.pop(), Some(Event::Tick));
478 }
479
480 #[test]
481 fn test_statistics_tracking() {
482 let processor: PriorityEventProcessor<TestMsg> = PriorityEventProcessor::new();
483
484 processor.push(Event::Quit).unwrap();
485 processor
486 .push(Event::User(TestMsg("test".to_string())))
487 .unwrap();
488 processor.push(Event::Tick).unwrap();
489
490 let stats = processor.stats();
491 assert_eq!(stats.total_events, 3);
492 assert_eq!(stats.high_priority_events, 1);
493 assert_eq!(stats.normal_priority_events, 1);
494 assert_eq!(stats.low_priority_events, 1);
495 }
496
497 #[test]
498 fn test_custom_priority_mapper() {
499 let config = PriorityConfig {
500 max_queue_size: 100,
501 log_drops: false,
502 priority_mapper: Some(Arc::new(|_event| {
503 Priority::High
505 })),
506 };
507
508 let processor: PriorityEventProcessor<TestMsg> =
509 PriorityEventProcessor::with_config(config);
510
511 processor.push(Event::Tick).unwrap();
512 let stats = processor.stats();
513 assert_eq!(stats.high_priority_events, 1);
514 assert_eq!(stats.low_priority_events, 0);
515 }
516}