Skip to main content

purple_ssh/
event.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, mpsc};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use anyhow::Result;
7use crossterm::event::{self, Event as CrosstermEvent, KeyEvent, KeyEventKind};
8
9/// Application events.
10pub enum AppEvent {
11    Key(KeyEvent),
12    Tick,
13    PingResult {
14        alias: String,
15        rtt_ms: Option<u32>,
16        generation: u64,
17    },
18    SyncComplete {
19        provider: String,
20        hosts: Vec<crate::providers::ProviderHost>,
21    },
22    SyncPartial {
23        provider: String,
24        hosts: Vec<crate::providers::ProviderHost>,
25        failures: usize,
26        total: usize,
27    },
28    SyncError {
29        provider: String,
30        message: String,
31    },
32    SyncProgress {
33        provider: String,
34        message: String,
35    },
36    UpdateAvailable {
37        version: String,
38        headline: Option<String>,
39    },
40    FileBrowserListing {
41        alias: String,
42        path: String,
43        entries: Result<Vec<crate::file_browser::FileEntry>, String>,
44    },
45    ScpComplete {
46        alias: String,
47        success: bool,
48        message: String,
49    },
50    SnippetHostDone {
51        run_id: u64,
52        alias: String,
53        stdout: String,
54        stderr: String,
55        exit_code: Option<i32>,
56    },
57    SnippetAllDone {
58        run_id: u64,
59    },
60    SnippetProgress {
61        run_id: u64,
62        completed: usize,
63        total: usize,
64    },
65    ContainerListing {
66        alias: String,
67        result: Result<
68            (
69                crate::containers::ContainerRuntime,
70                Vec<crate::containers::ContainerInfo>,
71            ),
72            crate::containers::ContainerError,
73        >,
74    },
75    ContainerActionComplete {
76        alias: String,
77        action: crate::containers::ContainerAction,
78        result: Result<(), String>,
79    },
80    VaultSignResult {
81        alias: String,
82        /// Snapshot of the host's `CertificateFile` directive at signing time.
83        /// Carried in the event so the main loop never has to re-look up the
84        /// host (which would be O(n) and racy under concurrent renames). Empty
85        /// when the host has no `CertificateFile` set; `should_write_certificate_file`
86        /// uses this directly to decide whether to write a default directive.
87        certificate_file: String,
88        success: bool,
89        message: String,
90    },
91    VaultSignProgress {
92        alias: String,
93        done: usize,
94        total: usize,
95    },
96    VaultSignAllDone {
97        signed: u32,
98        failed: u32,
99        skipped: u32,
100        cancelled: bool,
101        aborted_message: Option<String>,
102        first_error: Option<String>,
103    },
104    CertCheckResult {
105        alias: String,
106        status: crate::vault_ssh::CertStatus,
107    },
108    CertCheckError {
109        alias: String,
110        message: String,
111    },
112    PollError,
113}
114
115/// Polls crossterm events in a background thread.
116pub struct EventHandler {
117    tx: mpsc::Sender<AppEvent>,
118    rx: mpsc::Receiver<AppEvent>,
119    paused: Arc<AtomicBool>,
120    // Keep the thread handle alive
121    _handle: thread::JoinHandle<()>,
122}
123
124impl EventHandler {
125    pub fn new(tick_rate_ms: u64) -> Self {
126        let (tx, rx) = mpsc::channel();
127        let tick_rate = Duration::from_millis(tick_rate_ms);
128        let event_tx = tx.clone();
129        let paused = Arc::new(AtomicBool::new(false));
130        let paused_flag = paused.clone();
131
132        let handle = thread::spawn(move || {
133            let mut last_tick = Instant::now();
134            loop {
135                // When paused, sleep instead of polling stdin
136                if paused_flag.load(Ordering::Acquire) {
137                    thread::sleep(Duration::from_millis(50));
138                    continue;
139                }
140
141                // Cap poll timeout at 50ms so we notice pause flag quickly
142                let remaining = tick_rate
143                    .checked_sub(last_tick.elapsed())
144                    .unwrap_or(Duration::ZERO);
145                let timeout = remaining.min(Duration::from_millis(50));
146
147                match event::poll(timeout) {
148                    Ok(true) => {
149                        if let Ok(evt) = event::read() {
150                            match evt {
151                                CrosstermEvent::Key(key) if key.kind == KeyEventKind::Press => {
152                                    if event_tx.send(AppEvent::Key(key)).is_err() {
153                                        return;
154                                    }
155                                }
156                                CrosstermEvent::Resize(..) => {
157                                    // Trigger immediate redraw on terminal resize
158                                    if event_tx.send(AppEvent::Tick).is_err() {
159                                        return;
160                                    }
161                                }
162                                _ => {}
163                            }
164                        }
165                    }
166                    Ok(false) => {}
167                    Err(_) => {
168                        // Poll error (e.g. stdin closed). Notify main loop and exit.
169                        let _ = event_tx.send(AppEvent::PollError);
170                        return;
171                    }
172                }
173
174                if last_tick.elapsed() >= tick_rate {
175                    if event_tx.send(AppEvent::Tick).is_err() {
176                        return;
177                    }
178                    last_tick = Instant::now();
179                }
180            }
181        });
182
183        Self {
184            tx,
185            rx,
186            paused,
187            _handle: handle,
188        }
189    }
190
191    /// Get the next event (blocks until available).
192    pub fn next(&self) -> Result<AppEvent> {
193        Ok(self.rx.recv()?)
194    }
195
196    /// Try to get the next event with a timeout.
197    pub fn next_timeout(&self, timeout: Duration) -> Result<Option<AppEvent>> {
198        match self.rx.recv_timeout(timeout) {
199            Ok(event) => Ok(Some(event)),
200            Err(mpsc::RecvTimeoutError::Timeout) => Ok(None),
201            Err(mpsc::RecvTimeoutError::Disconnected) => {
202                Err(anyhow::anyhow!("event channel disconnected"))
203            }
204        }
205    }
206
207    /// Get a clone of the sender for sending events from other threads.
208    pub fn sender(&self) -> mpsc::Sender<AppEvent> {
209        self.tx.clone()
210    }
211
212    /// Pause event polling (call before spawning SSH).
213    pub fn pause(&self) {
214        self.paused.store(true, Ordering::Release);
215    }
216
217    /// Resume event polling (call after SSH exits).
218    pub fn resume(&self) {
219        // Drain stale events, but keep background result events
220        let mut preserved = Vec::new();
221        while let Ok(event) = self.rx.try_recv() {
222            match event {
223                AppEvent::PingResult { .. }
224                | AppEvent::SyncComplete { .. }
225                | AppEvent::SyncPartial { .. }
226                | AppEvent::SyncError { .. }
227                | AppEvent::SyncProgress { .. }
228                | AppEvent::UpdateAvailable { .. }
229                | AppEvent::FileBrowserListing { .. }
230                | AppEvent::ScpComplete { .. }
231                | AppEvent::SnippetHostDone { .. }
232                | AppEvent::SnippetAllDone { .. }
233                | AppEvent::SnippetProgress { .. }
234                | AppEvent::ContainerListing { .. }
235                | AppEvent::ContainerActionComplete { .. }
236                | AppEvent::VaultSignResult { .. }
237                | AppEvent::VaultSignProgress { .. }
238                | AppEvent::VaultSignAllDone { .. }
239                | AppEvent::CertCheckResult { .. }
240                | AppEvent::CertCheckError { .. } => preserved.push(event),
241                _ => {}
242            }
243        }
244        // Re-send preserved events
245        for event in preserved {
246            let _ = self.tx.send(event);
247        }
248        self.paused.store(false, Ordering::Release);
249    }
250}