1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, mpsc};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use anyhow::Result;
7use crossterm::event::{self, Event as CrosstermEvent, KeyEvent, KeyEventKind};
8
9pub enum AppEvent {
11 Key(KeyEvent),
12 Tick,
13 PingResult {
14 alias: String,
15 rtt_ms: Option<u32>,
16 generation: u64,
17 },
18 SyncComplete {
19 provider: String,
20 hosts: Vec<crate::providers::ProviderHost>,
21 },
22 SyncPartial {
23 provider: String,
24 hosts: Vec<crate::providers::ProviderHost>,
25 failures: usize,
26 total: usize,
27 },
28 SyncError {
29 provider: String,
30 message: String,
31 },
32 SyncProgress {
33 provider: String,
34 message: String,
35 },
36 UpdateAvailable {
37 version: String,
38 headline: Option<String>,
39 },
40 FileBrowserListing {
41 alias: String,
42 path: String,
43 entries: Result<Vec<crate::file_browser::FileEntry>, String>,
44 },
45 ScpComplete {
46 alias: String,
47 success: bool,
48 message: String,
49 },
50 SnippetHostDone {
51 run_id: u64,
52 alias: String,
53 stdout: String,
54 stderr: String,
55 exit_code: Option<i32>,
56 },
57 SnippetAllDone {
58 run_id: u64,
59 },
60 SnippetProgress {
61 run_id: u64,
62 completed: usize,
63 total: usize,
64 },
65 KeyPushResult {
74 run_id: u64,
75 result: crate::key_push::KeyPushResult,
76 },
77 ContainerListing {
78 alias: String,
79 result: Result<crate::containers::ContainerListing, crate::containers::ContainerError>,
80 },
81 ContainerActionComplete {
82 alias: String,
83 action: crate::containers::ContainerAction,
84 result: Result<(), String>,
85 },
86 ContainerInspectComplete {
91 alias: String,
92 container_id: String,
93 result: Box<Result<crate::containers::ContainerInspect, String>>,
98 },
99 ContainerLogsComplete {
103 alias: String,
104 container_id: String,
105 container_name: String,
106 result: Result<Vec<String>, String>,
107 },
108 ContainerLogsTailComplete {
113 alias: String,
114 container_id: String,
115 result: Box<Result<Vec<String>, String>>,
116 },
117 VaultSignResult {
118 alias: String,
119 certificate_file: String,
125 success: bool,
126 message: String,
127 },
128 VaultSignProgress {
129 alias: String,
130 done: usize,
131 total: usize,
132 },
133 VaultSignAllDone {
134 signed: u32,
135 failed: u32,
136 skipped: u32,
137 cancelled: bool,
138 aborted_message: Option<String>,
139 first_error: Option<String>,
140 },
141 CertCheckResult {
142 alias: String,
143 status: crate::vault_ssh::CertStatus,
144 },
145 CertCheckError {
146 alias: String,
147 message: String,
148 },
149 PollError,
150}
151
152impl AppEvent {
153 fn is_background_result(&self) -> bool {
158 match self {
159 AppEvent::Key(_) | AppEvent::Tick | AppEvent::PollError => false,
160 AppEvent::PingResult { .. }
161 | AppEvent::SyncComplete { .. }
162 | AppEvent::SyncPartial { .. }
163 | AppEvent::SyncError { .. }
164 | AppEvent::SyncProgress { .. }
165 | AppEvent::UpdateAvailable { .. }
166 | AppEvent::FileBrowserListing { .. }
167 | AppEvent::ScpComplete { .. }
168 | AppEvent::SnippetHostDone { .. }
169 | AppEvent::SnippetAllDone { .. }
170 | AppEvent::SnippetProgress { .. }
171 | AppEvent::KeyPushResult { .. }
172 | AppEvent::ContainerListing { .. }
173 | AppEvent::ContainerActionComplete { .. }
174 | AppEvent::ContainerInspectComplete { .. }
175 | AppEvent::ContainerLogsComplete { .. }
176 | AppEvent::ContainerLogsTailComplete { .. }
177 | AppEvent::VaultSignResult { .. }
178 | AppEvent::VaultSignProgress { .. }
179 | AppEvent::VaultSignAllDone { .. }
180 | AppEvent::CertCheckResult { .. }
181 | AppEvent::CertCheckError { .. } => true,
182 }
183 }
184}
185
186pub struct EventHandler {
188 tx: mpsc::Sender<AppEvent>,
189 rx: mpsc::Receiver<AppEvent>,
190 paused: Arc<AtomicBool>,
191 _handle: thread::JoinHandle<()>,
193}
194
195impl EventHandler {
196 pub fn new(tick_rate_ms: u64) -> Self {
197 let (tx, rx) = mpsc::channel();
198 let tick_rate = Duration::from_millis(tick_rate_ms);
199 let event_tx = tx.clone();
200 let paused = Arc::new(AtomicBool::new(false));
201 let paused_flag = paused.clone();
202
203 let handle = thread::spawn(move || {
204 let mut last_tick = Instant::now();
205 loop {
206 if paused_flag.load(Ordering::Acquire) {
208 thread::sleep(Duration::from_millis(50));
209 continue;
210 }
211
212 let remaining = tick_rate
214 .checked_sub(last_tick.elapsed())
215 .unwrap_or(Duration::ZERO);
216 let timeout = remaining.min(Duration::from_millis(50));
217
218 match event::poll(timeout) {
219 Ok(true) => {
220 if let Ok(evt) = event::read() {
221 match evt {
222 CrosstermEvent::Key(key)
223 if key.kind == KeyEventKind::Press
224 && event_tx.send(AppEvent::Key(key)).is_err() =>
225 {
226 return;
227 }
228 CrosstermEvent::Resize(..)
230 if event_tx.send(AppEvent::Tick).is_err() =>
231 {
232 return;
233 }
234 _ => {}
235 }
236 }
237 }
238 Ok(false) => {}
239 Err(e) => {
240 log::error!("[external] crossterm poll failed: {e}");
242 let _ = event_tx.send(AppEvent::PollError);
243 return;
244 }
245 }
246
247 if last_tick.elapsed() >= tick_rate {
248 if event_tx.send(AppEvent::Tick).is_err() {
249 return;
250 }
251 last_tick = Instant::now();
252 }
253 }
254 });
255
256 Self {
257 tx,
258 rx,
259 paused,
260 _handle: handle,
261 }
262 }
263
264 pub fn next(&self) -> Result<AppEvent> {
266 Ok(self.rx.recv()?)
267 }
268
269 pub fn next_timeout(&self, timeout: Duration) -> Result<Option<AppEvent>> {
271 match self.rx.recv_timeout(timeout) {
272 Ok(event) => Ok(Some(event)),
273 Err(mpsc::RecvTimeoutError::Timeout) => Ok(None),
274 Err(mpsc::RecvTimeoutError::Disconnected) => {
275 Err(anyhow::anyhow!("event channel disconnected"))
276 }
277 }
278 }
279
280 pub fn sender(&self) -> mpsc::Sender<AppEvent> {
282 self.tx.clone()
283 }
284
285 pub fn pause(&self) {
287 self.paused.store(true, Ordering::Release);
288 }
289
290 pub fn resume(&self) {
292 let mut preserved = Vec::new();
293 while let Ok(event) = self.rx.try_recv() {
294 if event.is_background_result() {
295 preserved.push(event);
296 }
297 }
298 for event in preserved {
299 let _ = self.tx.send(event);
300 }
301 self.paused.store(false, Ordering::Release);
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
309
310 #[test]
311 fn poll_thread_events_are_not_background_results() {
312 let k = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE);
313 assert!(!AppEvent::Key(k).is_background_result());
314 assert!(!AppEvent::Tick.is_background_result());
315 assert!(!AppEvent::PollError.is_background_result());
316 }
317
318 #[test]
319 fn worker_events_are_background_results() {
320 assert!(
321 AppEvent::PingResult {
322 alias: "h".into(),
323 rtt_ms: None,
324 generation: 0,
325 }
326 .is_background_result()
327 );
328 assert!(
329 AppEvent::SyncComplete {
330 provider: "p".into(),
331 hosts: vec![],
332 }
333 .is_background_result()
334 );
335 assert!(
336 AppEvent::SyncPartial {
337 provider: "p".into(),
338 hosts: vec![],
339 failures: 0,
340 total: 0,
341 }
342 .is_background_result()
343 );
344 assert!(
345 AppEvent::SyncError {
346 provider: "p".into(),
347 message: "x".into(),
348 }
349 .is_background_result()
350 );
351 assert!(
352 AppEvent::SyncProgress {
353 provider: "p".into(),
354 message: "x".into(),
355 }
356 .is_background_result()
357 );
358 assert!(
359 AppEvent::UpdateAvailable {
360 version: "1.0.0".into(),
361 headline: None,
362 }
363 .is_background_result()
364 );
365 assert!(
366 AppEvent::FileBrowserListing {
367 alias: "h".into(),
368 path: "/".into(),
369 entries: Ok(vec![]),
370 }
371 .is_background_result()
372 );
373 assert!(
374 AppEvent::ScpComplete {
375 alias: "h".into(),
376 success: true,
377 message: String::new(),
378 }
379 .is_background_result()
380 );
381 assert!(
382 AppEvent::SnippetHostDone {
383 run_id: 0,
384 alias: "h".into(),
385 stdout: String::new(),
386 stderr: String::new(),
387 exit_code: Some(0),
388 }
389 .is_background_result()
390 );
391 assert!(AppEvent::SnippetAllDone { run_id: 0 }.is_background_result());
392 assert!(
393 AppEvent::SnippetProgress {
394 run_id: 0,
395 completed: 0,
396 total: 0,
397 }
398 .is_background_result()
399 );
400 assert!(
401 AppEvent::VaultSignProgress {
402 alias: "h".into(),
403 done: 0,
404 total: 0,
405 }
406 .is_background_result()
407 );
408 assert!(
409 AppEvent::VaultSignAllDone {
410 signed: 0,
411 failed: 0,
412 skipped: 0,
413 cancelled: false,
414 aborted_message: None,
415 first_error: None,
416 }
417 .is_background_result()
418 );
419 assert!(
420 AppEvent::CertCheckError {
421 alias: "h".into(),
422 message: "x".into(),
423 }
424 .is_background_result()
425 );
426 }
427
428 #[test]
435 fn resume_drains_input_and_keeps_background_results() {
436 let handler = EventHandler::new(60_000);
437 handler.pause();
438
439 let k = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE);
440 handler.tx.send(AppEvent::Key(k)).unwrap();
441 handler.tx.send(AppEvent::Tick).unwrap();
442 handler.tx.send(AppEvent::PollError).unwrap();
443 handler
444 .tx
445 .send(AppEvent::SyncProgress {
446 provider: "p".into(),
447 message: "x".into(),
448 })
449 .unwrap();
450 handler
451 .tx
452 .send(AppEvent::PingResult {
453 alias: "h".into(),
454 rtt_ms: Some(12),
455 generation: 1,
456 })
457 .unwrap();
458
459 handler.resume();
460
461 let mut received = Vec::new();
462 while let Ok(Some(ev)) = handler.next_timeout(Duration::from_millis(50)) {
463 received.push(ev);
464 }
465 let background: Vec<_> = received
466 .into_iter()
467 .filter(AppEvent::is_background_result)
468 .collect();
469
470 assert_eq!(
471 background.len(),
472 2,
473 "exactly two background events survive resume()"
474 );
475 assert!(matches!(background[0], AppEvent::SyncProgress { .. }));
476 assert!(matches!(background[1], AppEvent::PingResult { .. }));
477 }
478}