Skip to main content

purple_ssh/
event.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, mpsc};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use anyhow::Result;
7use crossterm::event::{self, Event as CrosstermEvent, KeyEvent, KeyEventKind};
8
9/// Application events.
10pub enum AppEvent {
11    Key(KeyEvent),
12    Tick,
13    PingResult {
14        alias: String,
15        rtt_ms: Option<u32>,
16        generation: u64,
17    },
18    SyncComplete {
19        provider: String,
20        hosts: Vec<crate::providers::ProviderHost>,
21    },
22    SyncPartial {
23        provider: String,
24        hosts: Vec<crate::providers::ProviderHost>,
25        failures: usize,
26        total: usize,
27    },
28    SyncError {
29        provider: String,
30        message: String,
31    },
32    SyncProgress {
33        provider: String,
34        message: String,
35    },
36    UpdateAvailable {
37        version: String,
38        headline: Option<String>,
39    },
40    FileBrowserListing {
41        alias: String,
42        path: String,
43        entries: Result<Vec<crate::file_browser::FileEntry>, String>,
44    },
45    ScpComplete {
46        alias: String,
47        success: bool,
48        message: String,
49    },
50    SnippetHostDone {
51        run_id: u64,
52        alias: String,
53        stdout: String,
54        stderr: String,
55        exit_code: Option<i32>,
56    },
57    SnippetAllDone {
58        run_id: u64,
59    },
60    SnippetProgress {
61        run_id: u64,
62        completed: usize,
63        total: usize,
64    },
65    /// Result of one host in a key-push run. Aggregated in
66    /// `app.keys.push.results` by the main loop; the summary toast / sticky
67    /// error overlay fires once `results.len() == expected_count`.
68    ///
69    /// `run_id` matches `app.keys.push.run_id` at the moment the worker
70    /// was spawned. Results whose `run_id` no longer matches the current
71    /// run are stale (a previous cancelled run's tail event) and dropped
72    /// before they touch the accumulator.
73    KeyPushResult {
74        run_id: u64,
75        result: crate::key_push::KeyPushResult,
76    },
77    ContainerListing {
78        alias: String,
79        result: Result<crate::containers::ContainerListing, crate::containers::ContainerError>,
80    },
81    ContainerActionComplete {
82        alias: String,
83        action: crate::containers::ContainerAction,
84        result: Result<(), String>,
85    },
86    /// Result of a `docker inspect` (or `podman inspect`) call fired by
87    /// the containers overview detail panel. Cached per (alias,
88    /// container_id) once received so repeat fetches inside the TTL
89    /// window are skipped.
90    ContainerInspectComplete {
91        alias: String,
92        container_id: String,
93        // Boxed because `ContainerInspect` carries the full audit
94        // payload (caps, mounts, compose labels). Inlining it grows the
95        // `AppEvent` enum past clippy's `large_enum_variant` budget,
96        // bloating every queue slot for events that do not carry it.
97        result: Box<Result<crate::containers::ContainerInspect, String>>,
98    },
99    /// Result of `<runtime> logs --tail 200` over SSH for a container
100    /// the user opened with `l`. Populates `Screen::ContainerLogs.body`
101    /// (or `.error`) when received.
102    ContainerLogsComplete {
103        alias: String,
104        container_id: String,
105        container_name: String,
106        result: Result<Vec<String>, String>,
107    },
108    /// Result of a short `<runtime> logs --tail N` fetch fired by the
109    /// containers-overview detail panel to populate the LOGS card.
110    /// Distinct from `ContainerLogsComplete` so the dedicated logs
111    /// viewer (`l`) and the detail-panel card stay on separate caches.
112    ContainerLogsTailComplete {
113        alias: String,
114        container_id: String,
115        result: Box<Result<Vec<String>, String>>,
116    },
117    VaultSignResult {
118        alias: String,
119        /// Snapshot of the host's `CertificateFile` directive at signing time.
120        /// Carried in the event so the main loop never has to re-look up the
121        /// host (which would be O(n) and racy under concurrent renames). Empty
122        /// when the host has no `CertificateFile` set; `should_write_certificate_file`
123        /// uses this directly to decide whether to write a default directive.
124        certificate_file: String,
125        success: bool,
126        message: String,
127    },
128    VaultSignProgress {
129        alias: String,
130        done: usize,
131        total: usize,
132    },
133    VaultSignAllDone {
134        signed: u32,
135        failed: u32,
136        skipped: u32,
137        cancelled: bool,
138        aborted_message: Option<String>,
139        first_error: Option<String>,
140    },
141    CertCheckResult {
142        alias: String,
143        status: crate::vault_ssh::CertStatus,
144    },
145    CertCheckError {
146        alias: String,
147        message: String,
148    },
149    PollError,
150}
151
152/// Polls crossterm events in a background thread.
153pub struct EventHandler {
154    tx: mpsc::Sender<AppEvent>,
155    rx: mpsc::Receiver<AppEvent>,
156    paused: Arc<AtomicBool>,
157    // Keep the thread handle alive
158    _handle: thread::JoinHandle<()>,
159}
160
161impl EventHandler {
162    pub fn new(tick_rate_ms: u64) -> Self {
163        let (tx, rx) = mpsc::channel();
164        let tick_rate = Duration::from_millis(tick_rate_ms);
165        let event_tx = tx.clone();
166        let paused = Arc::new(AtomicBool::new(false));
167        let paused_flag = paused.clone();
168
169        let handle = thread::spawn(move || {
170            let mut last_tick = Instant::now();
171            loop {
172                // When paused, sleep instead of polling stdin
173                if paused_flag.load(Ordering::Acquire) {
174                    thread::sleep(Duration::from_millis(50));
175                    continue;
176                }
177
178                // Cap poll timeout at 50ms so we notice pause flag quickly
179                let remaining = tick_rate
180                    .checked_sub(last_tick.elapsed())
181                    .unwrap_or(Duration::ZERO);
182                let timeout = remaining.min(Duration::from_millis(50));
183
184                match event::poll(timeout) {
185                    Ok(true) => {
186                        if let Ok(evt) = event::read() {
187                            match evt {
188                                CrosstermEvent::Key(key)
189                                    if key.kind == KeyEventKind::Press
190                                        && event_tx.send(AppEvent::Key(key)).is_err() =>
191                                {
192                                    return;
193                                }
194                                // Trigger immediate redraw on terminal resize.
195                                CrosstermEvent::Resize(..)
196                                    if event_tx.send(AppEvent::Tick).is_err() =>
197                                {
198                                    return;
199                                }
200                                _ => {}
201                            }
202                        }
203                    }
204                    Ok(false) => {}
205                    Err(e) => {
206                        // Poll error (e.g. stdin closed). Notify main loop and exit.
207                        log::error!("[external] crossterm poll failed: {e}");
208                        let _ = event_tx.send(AppEvent::PollError);
209                        return;
210                    }
211                }
212
213                if last_tick.elapsed() >= tick_rate {
214                    if event_tx.send(AppEvent::Tick).is_err() {
215                        return;
216                    }
217                    last_tick = Instant::now();
218                }
219            }
220        });
221
222        Self {
223            tx,
224            rx,
225            paused,
226            _handle: handle,
227        }
228    }
229
230    /// Get the next event (blocks until available).
231    pub fn next(&self) -> Result<AppEvent> {
232        Ok(self.rx.recv()?)
233    }
234
235    /// Try to get the next event with a timeout.
236    pub fn next_timeout(&self, timeout: Duration) -> Result<Option<AppEvent>> {
237        match self.rx.recv_timeout(timeout) {
238            Ok(event) => Ok(Some(event)),
239            Err(mpsc::RecvTimeoutError::Timeout) => Ok(None),
240            Err(mpsc::RecvTimeoutError::Disconnected) => {
241                Err(anyhow::anyhow!("event channel disconnected"))
242            }
243        }
244    }
245
246    /// Get a clone of the sender for sending events from other threads.
247    pub fn sender(&self) -> mpsc::Sender<AppEvent> {
248        self.tx.clone()
249    }
250
251    /// Pause event polling (call before spawning SSH).
252    pub fn pause(&self) {
253        self.paused.store(true, Ordering::Release);
254    }
255
256    /// Resume event polling (call after SSH exits).
257    pub fn resume(&self) {
258        // Drain stale events, but keep background result events
259        let mut preserved = Vec::new();
260        while let Ok(event) = self.rx.try_recv() {
261            match event {
262                AppEvent::PingResult { .. }
263                | AppEvent::SyncComplete { .. }
264                | AppEvent::SyncPartial { .. }
265                | AppEvent::SyncError { .. }
266                | AppEvent::SyncProgress { .. }
267                | AppEvent::UpdateAvailable { .. }
268                | AppEvent::FileBrowserListing { .. }
269                | AppEvent::ScpComplete { .. }
270                | AppEvent::SnippetHostDone { .. }
271                | AppEvent::SnippetAllDone { .. }
272                | AppEvent::SnippetProgress { .. }
273                | AppEvent::KeyPushResult { .. }
274                | AppEvent::ContainerListing { .. }
275                | AppEvent::ContainerActionComplete { .. }
276                | AppEvent::ContainerInspectComplete { .. }
277                | AppEvent::ContainerLogsComplete { .. }
278                | AppEvent::ContainerLogsTailComplete { .. }
279                | AppEvent::VaultSignResult { .. }
280                | AppEvent::VaultSignProgress { .. }
281                | AppEvent::VaultSignAllDone { .. }
282                | AppEvent::CertCheckResult { .. }
283                | AppEvent::CertCheckError { .. } => preserved.push(event),
284                _ => {}
285            }
286        }
287        // Re-send preserved events
288        for event in preserved {
289            let _ = self.tx.send(event);
290        }
291        self.paused.store(false, Ordering::Release);
292    }
293}