1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use anyhow::Result;
use crossterm::event::{EventStream, KeyEvent, MouseEvent};
use futures::StreamExt;
use notify::RecursiveMode;
use notify_debouncer_mini::{new_debouncer, DebounceEventResult, DebouncedEventKind, Debouncer};
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Debug)]
pub enum Event {
Tick,
Key(KeyEvent),
Mouse(MouseEvent),
Resize(u16, u16),
FileChange(PathBuf),
PtyOutput,
}
pub struct EventHandler {
rx: mpsc::UnboundedReceiver<Event>,
// Keep the debouncer alive to prevent it from being dropped
_debouncer: Option<Debouncer<notify::RecommendedWatcher>>,
}
impl EventHandler {
pub fn new(
tick_rate: u64,
watch_path: Option<PathBuf>,
pty_rx: mpsc::UnboundedReceiver<()>,
) -> Self {
let tick_rate = Duration::from_millis(tick_rate);
let (tx, rx) = mpsc::unbounded_channel();
// Spawn async event loop using EventStream + select!
let tx_clone = tx.clone();
tokio::spawn(async move {
let mut crossterm_events = EventStream::new();
let mut pty_rx = pty_rx;
let mut tick_interval = tokio::time::interval(tick_rate);
loop {
tokio::select! {
// Crossterm terminal events (key, mouse, resize)
maybe_event = crossterm_events.next() => {
match maybe_event {
Some(Ok(crossterm::event::Event::Key(key))) => {
if tx_clone.send(Event::Key(key)).is_err() {
break;
}
}
Some(Ok(crossterm::event::Event::Mouse(mouse))) => {
if tx_clone.send(Event::Mouse(mouse)).is_err() {
break;
}
}
Some(Ok(crossterm::event::Event::Resize(w, h))) => {
if tx_clone.send(Event::Resize(w, h)).is_err() {
break;
}
}
Some(Ok(_)) => {}
Some(Err(_)) => break,
None => break,
}
}
// PTY output notification — triggers immediate redraw
maybe_pty = pty_rx.recv() => {
match maybe_pty {
Some(()) => {
// Drain any additional pending notifications to coalesce redraws
while pty_rx.try_recv().is_ok() {}
if tx_clone.send(Event::PtyOutput).is_err() {
break;
}
}
None => {
// PTY channel closed (process exited), keep running for other events
}
}
}
// Periodic tick for housekeeping (process exit check, etc.)
_ = tick_interval.tick() => {
if tx_clone.send(Event::Tick).is_err() {
break;
}
}
}
}
});
// Initialize file watcher if watch_path is provided
let debouncer = watch_path.and_then(|path| {
let fs_tx = tx.clone();
let mut debouncer = new_debouncer(
Duration::from_millis(300),
move |result: DebounceEventResult| {
if let Ok(events) = result {
for fs_event in events {
if matches!(
fs_event.kind,
DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
) {
let _ = fs_tx.send(Event::FileChange(fs_event.path));
}
}
}
},
)
.ok()?;
debouncer
.watcher()
.watch(&path, RecursiveMode::Recursive)
.ok()?;
Some(debouncer)
});
Self {
rx,
_debouncer: debouncer,
}
}
pub async fn next(&mut self) -> Result<Event> {
self.rx
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("Event channel closed"))
}
}