Skip to main content

purple_ssh/
event.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, mpsc};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use anyhow::Result;
7use crossterm::event::{self, Event as CrosstermEvent, KeyEvent, KeyEventKind};
8
9/// Application events.
10pub enum AppEvent {
11    Key(KeyEvent),
12    Tick,
13    PingResult {
14        alias: String,
15        rtt_ms: Option<u32>,
16        generation: u64,
17    },
18    SyncComplete {
19        provider: String,
20        hosts: Vec<crate::providers::ProviderHost>,
21    },
22    SyncPartial {
23        provider: String,
24        hosts: Vec<crate::providers::ProviderHost>,
25        failures: usize,
26        total: usize,
27    },
28    SyncError {
29        provider: String,
30        message: String,
31    },
32    SyncProgress {
33        provider: String,
34        message: String,
35    },
36    UpdateAvailable {
37        version: String,
38        headline: Option<String>,
39    },
40    FileBrowserListing {
41        alias: String,
42        path: String,
43        entries: Result<Vec<crate::file_browser::FileEntry>, String>,
44    },
45    ScpComplete {
46        alias: String,
47        success: bool,
48        message: String,
49    },
50    SnippetHostDone {
51        run_id: u64,
52        alias: String,
53        stdout: String,
54        stderr: String,
55        exit_code: Option<i32>,
56    },
57    SnippetAllDone {
58        run_id: u64,
59    },
60    SnippetProgress {
61        run_id: u64,
62        completed: usize,
63        total: usize,
64    },
65    /// Result of one host in a key-push run. Aggregated in
66    /// `app.keys.push.results` by the main loop; the summary toast / sticky
67    /// error overlay fires once `results.len() == expected_count`.
68    ///
69    /// `run_id` matches `app.keys.push.run_id` at the moment the worker
70    /// was spawned. Results whose `run_id` no longer matches the current
71    /// run are stale (a previous cancelled run's tail event) and dropped
72    /// before they touch the accumulator.
73    KeyPushResult {
74        run_id: u64,
75        result: crate::key_push::KeyPushResult,
76    },
77    ContainerListing {
78        alias: String,
79        result: Result<crate::containers::ContainerListing, crate::containers::ContainerError>,
80    },
81    ContainerActionComplete {
82        alias: String,
83        action: crate::containers::ContainerAction,
84        result: Result<(), String>,
85    },
86    /// Result of a `docker inspect` (or `podman inspect`) call fired by
87    /// the containers overview detail panel. Cached per (alias,
88    /// container_id) once received so repeat fetches inside the TTL
89    /// window are skipped.
90    ContainerInspectComplete {
91        alias: String,
92        container_id: String,
93        // Boxed because `ContainerInspect` carries the full audit
94        // payload (caps, mounts, compose labels). Inlining it grows the
95        // `AppEvent` enum past clippy's `large_enum_variant` budget,
96        // bloating every queue slot for events that do not carry it.
97        result: Box<Result<crate::containers::ContainerInspect, String>>,
98    },
99    /// Result of `<runtime> logs --tail 200` over SSH for a container
100    /// the user opened with `l`. Populates `Screen::ContainerLogs.body`
101    /// (or `.error`) when received.
102    ContainerLogsComplete {
103        alias: String,
104        container_id: String,
105        container_name: String,
106        result: Result<Vec<String>, String>,
107    },
108    /// Result of a short `<runtime> logs --tail N` fetch fired by the
109    /// containers-overview detail panel to populate the LOGS card.
110    /// Distinct from `ContainerLogsComplete` so the dedicated logs
111    /// viewer (`l`) and the detail-panel card stay on separate caches.
112    ContainerLogsTailComplete {
113        alias: String,
114        container_id: String,
115        result: Box<Result<Vec<String>, String>>,
116    },
117    VaultSignResult {
118        alias: String,
119        /// Snapshot of the host's `CertificateFile` directive at signing time.
120        /// Carried in the event so the main loop never has to re-look up the
121        /// host (which would be O(n) and racy under concurrent renames). Empty
122        /// when the host has no `CertificateFile` set; `should_write_certificate_file`
123        /// uses this directly to decide whether to write a default directive.
124        certificate_file: String,
125        success: bool,
126        message: String,
127    },
128    VaultSignProgress {
129        alias: String,
130        done: usize,
131        total: usize,
132    },
133    VaultSignAllDone {
134        signed: u32,
135        failed: u32,
136        skipped: u32,
137        cancelled: bool,
138        aborted_message: Option<String>,
139        first_error: Option<String>,
140    },
141    CertCheckResult {
142        alias: String,
143        status: crate::vault_ssh::CertStatus,
144    },
145    CertCheckError {
146        alias: String,
147        message: String,
148    },
149    PollError,
150}
151
152impl AppEvent {
153    /// True for variants produced by background workers (sync, ping,
154    /// container ops, Vault, etc.). False for variants produced by the
155    /// crossterm poll thread (`Key`, `Tick`, `PollError`). Exhaustive
156    /// match forces a deliberate choice when adding a new variant.
157    fn is_background_result(&self) -> bool {
158        match self {
159            AppEvent::Key(_) | AppEvent::Tick | AppEvent::PollError => false,
160            AppEvent::PingResult { .. }
161            | AppEvent::SyncComplete { .. }
162            | AppEvent::SyncPartial { .. }
163            | AppEvent::SyncError { .. }
164            | AppEvent::SyncProgress { .. }
165            | AppEvent::UpdateAvailable { .. }
166            | AppEvent::FileBrowserListing { .. }
167            | AppEvent::ScpComplete { .. }
168            | AppEvent::SnippetHostDone { .. }
169            | AppEvent::SnippetAllDone { .. }
170            | AppEvent::SnippetProgress { .. }
171            | AppEvent::KeyPushResult { .. }
172            | AppEvent::ContainerListing { .. }
173            | AppEvent::ContainerActionComplete { .. }
174            | AppEvent::ContainerInspectComplete { .. }
175            | AppEvent::ContainerLogsComplete { .. }
176            | AppEvent::ContainerLogsTailComplete { .. }
177            | AppEvent::VaultSignResult { .. }
178            | AppEvent::VaultSignProgress { .. }
179            | AppEvent::VaultSignAllDone { .. }
180            | AppEvent::CertCheckResult { .. }
181            | AppEvent::CertCheckError { .. } => true,
182        }
183    }
184}
185
186/// Polls crossterm events in a background thread.
187pub struct EventHandler {
188    tx: mpsc::Sender<AppEvent>,
189    rx: mpsc::Receiver<AppEvent>,
190    paused: Arc<AtomicBool>,
191    // Keep the thread handle alive
192    _handle: thread::JoinHandle<()>,
193}
194
195impl EventHandler {
196    pub fn new(tick_rate_ms: u64) -> Self {
197        let (tx, rx) = mpsc::channel();
198        let tick_rate = Duration::from_millis(tick_rate_ms);
199        let event_tx = tx.clone();
200        let paused = Arc::new(AtomicBool::new(false));
201        let paused_flag = paused.clone();
202
203        let handle = thread::spawn(move || {
204            let mut last_tick = Instant::now();
205            loop {
206                // When paused, sleep instead of polling stdin
207                if paused_flag.load(Ordering::Acquire) {
208                    thread::sleep(Duration::from_millis(50));
209                    continue;
210                }
211
212                // Cap poll timeout at 50ms so we notice pause flag quickly
213                let remaining = tick_rate
214                    .checked_sub(last_tick.elapsed())
215                    .unwrap_or(Duration::ZERO);
216                let timeout = remaining.min(Duration::from_millis(50));
217
218                match event::poll(timeout) {
219                    Ok(true) => {
220                        if let Ok(evt) = event::read() {
221                            match evt {
222                                CrosstermEvent::Key(key)
223                                    if key.kind == KeyEventKind::Press
224                                        && event_tx.send(AppEvent::Key(key)).is_err() =>
225                                {
226                                    return;
227                                }
228                                // Trigger immediate redraw on terminal resize.
229                                CrosstermEvent::Resize(..)
230                                    if event_tx.send(AppEvent::Tick).is_err() =>
231                                {
232                                    return;
233                                }
234                                _ => {}
235                            }
236                        }
237                    }
238                    Ok(false) => {}
239                    Err(e) => {
240                        // Poll error (e.g. stdin closed). Notify main loop and exit.
241                        log::error!("[external] crossterm poll failed: {e}");
242                        let _ = event_tx.send(AppEvent::PollError);
243                        return;
244                    }
245                }
246
247                if last_tick.elapsed() >= tick_rate {
248                    if event_tx.send(AppEvent::Tick).is_err() {
249                        return;
250                    }
251                    last_tick = Instant::now();
252                }
253            }
254        });
255
256        Self {
257            tx,
258            rx,
259            paused,
260            _handle: handle,
261        }
262    }
263
264    /// Get the next event (blocks until available).
265    pub fn next(&self) -> Result<AppEvent> {
266        Ok(self.rx.recv()?)
267    }
268
269    /// Try to get the next event with a timeout.
270    pub fn next_timeout(&self, timeout: Duration) -> Result<Option<AppEvent>> {
271        match self.rx.recv_timeout(timeout) {
272            Ok(event) => Ok(Some(event)),
273            Err(mpsc::RecvTimeoutError::Timeout) => Ok(None),
274            Err(mpsc::RecvTimeoutError::Disconnected) => {
275                Err(anyhow::anyhow!("event channel disconnected"))
276            }
277        }
278    }
279
280    /// Get a clone of the sender for sending events from other threads.
281    pub fn sender(&self) -> mpsc::Sender<AppEvent> {
282        self.tx.clone()
283    }
284
285    /// Pause event polling (call before spawning SSH).
286    pub fn pause(&self) {
287        self.paused.store(true, Ordering::Release);
288    }
289
290    /// Resume event polling (call after SSH exits).
291    pub fn resume(&self) {
292        let mut preserved = Vec::new();
293        while let Ok(event) = self.rx.try_recv() {
294            if event.is_background_result() {
295                preserved.push(event);
296            }
297        }
298        for event in preserved {
299            let _ = self.tx.send(event);
300        }
301        self.paused.store(false, Ordering::Release);
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
309
310    #[test]
311    fn poll_thread_events_are_not_background_results() {
312        let k = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE);
313        assert!(!AppEvent::Key(k).is_background_result());
314        assert!(!AppEvent::Tick.is_background_result());
315        assert!(!AppEvent::PollError.is_background_result());
316    }
317
318    #[test]
319    fn worker_events_are_background_results() {
320        assert!(
321            AppEvent::PingResult {
322                alias: "h".into(),
323                rtt_ms: None,
324                generation: 0,
325            }
326            .is_background_result()
327        );
328        assert!(
329            AppEvent::SyncComplete {
330                provider: "p".into(),
331                hosts: vec![],
332            }
333            .is_background_result()
334        );
335        assert!(
336            AppEvent::SyncPartial {
337                provider: "p".into(),
338                hosts: vec![],
339                failures: 0,
340                total: 0,
341            }
342            .is_background_result()
343        );
344        assert!(
345            AppEvent::SyncError {
346                provider: "p".into(),
347                message: "x".into(),
348            }
349            .is_background_result()
350        );
351        assert!(
352            AppEvent::SyncProgress {
353                provider: "p".into(),
354                message: "x".into(),
355            }
356            .is_background_result()
357        );
358        assert!(
359            AppEvent::UpdateAvailable {
360                version: "1.0.0".into(),
361                headline: None,
362            }
363            .is_background_result()
364        );
365        assert!(
366            AppEvent::FileBrowserListing {
367                alias: "h".into(),
368                path: "/".into(),
369                entries: Ok(vec![]),
370            }
371            .is_background_result()
372        );
373        assert!(
374            AppEvent::ScpComplete {
375                alias: "h".into(),
376                success: true,
377                message: String::new(),
378            }
379            .is_background_result()
380        );
381        assert!(
382            AppEvent::SnippetHostDone {
383                run_id: 0,
384                alias: "h".into(),
385                stdout: String::new(),
386                stderr: String::new(),
387                exit_code: Some(0),
388            }
389            .is_background_result()
390        );
391        assert!(AppEvent::SnippetAllDone { run_id: 0 }.is_background_result());
392        assert!(
393            AppEvent::SnippetProgress {
394                run_id: 0,
395                completed: 0,
396                total: 0,
397            }
398            .is_background_result()
399        );
400        assert!(
401            AppEvent::VaultSignProgress {
402                alias: "h".into(),
403                done: 0,
404                total: 0,
405            }
406            .is_background_result()
407        );
408        assert!(
409            AppEvent::VaultSignAllDone {
410                signed: 0,
411                failed: 0,
412                skipped: 0,
413                cancelled: false,
414                aborted_message: None,
415                first_error: None,
416            }
417            .is_background_result()
418        );
419        assert!(
420            AppEvent::CertCheckError {
421                alias: "h".into(),
422                message: "x".into(),
423            }
424            .is_background_result()
425        );
426    }
427
428    /// End-to-end: pause, drop a mix of events into the channel, resume,
429    /// and verify the drain rule. `Key`, `Tick`, `PollError` must be gone
430    /// and the background results must reappear on the receiving end.
431    /// Filters out any `Tick`/`Key` that the poll thread could in theory
432    /// inject between `EventHandler::new()` and `pause()` (small race
433    /// window) so the assertion stays deterministic in CI.
434    #[test]
435    fn resume_drains_input_and_keeps_background_results() {
436        let handler = EventHandler::new(60_000);
437        handler.pause();
438
439        let k = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE);
440        handler.tx.send(AppEvent::Key(k)).unwrap();
441        handler.tx.send(AppEvent::Tick).unwrap();
442        handler.tx.send(AppEvent::PollError).unwrap();
443        handler
444            .tx
445            .send(AppEvent::SyncProgress {
446                provider: "p".into(),
447                message: "x".into(),
448            })
449            .unwrap();
450        handler
451            .tx
452            .send(AppEvent::PingResult {
453                alias: "h".into(),
454                rtt_ms: Some(12),
455                generation: 1,
456            })
457            .unwrap();
458
459        handler.resume();
460
461        let mut received = Vec::new();
462        while let Ok(Some(ev)) = handler.next_timeout(Duration::from_millis(50)) {
463            received.push(ev);
464        }
465        let background: Vec<_> = received
466            .into_iter()
467            .filter(AppEvent::is_background_result)
468            .collect();
469
470        assert_eq!(
471            background.len(),
472            2,
473            "exactly two background events survive resume()"
474        );
475        assert!(matches!(background[0], AppEvent::SyncProgress { .. }));
476        assert!(matches!(background[1], AppEvent::PingResult { .. }));
477    }
478}