purple-ssh 3.0.0

Open-source terminal SSH manager and SSH config editor. Search hundreds of hosts, sync from 16 clouds, transfer files, manage Docker and Podman over SSH, sign short-lived Vault SSH certs and expose an MCP server for AI agents. Rust TUI, MIT licensed.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::app::TunnelFormBaseline;
use crate::app::forms::TunnelForm;
use crate::tunnel::{ActiveTunnel, TunnelRule};

/// Sort order for the tunnels overview screen. Cycled with `s`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TunnelSortMode {
    /// Recently-used hosts first (uses `app.history.last_connected`). Active
    /// tunnels rank by `started_at` so they always sit above idle ones.
    #[default]
    MostRecent,
    /// Alphabetical by host alias, ascending.
    AlphaHostname,
}

impl TunnelSortMode {
    pub fn next(self) -> Self {
        match self {
            TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
            TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
        }
    }

    pub fn label(self) -> &'static str {
        match self {
            TunnelSortMode::MostRecent => "most recent",
            TunnelSortMode::AlphaHostname => "A-Z hostname",
        }
    }
}
use crate::tunnel_live::{
    ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
    PortConflict, TunnelLiveSnapshot,
};

/// Tunnel-owned state grouped off the `App` god-struct. Contains the rule
/// list, the edit form, the live child-process map, form baseline for the
/// dirty check, the pending delete index and the cached per-host summary
/// strings. Pure state container; behaviour lives on `App` or on dedicated
/// methods here.
pub struct TunnelState {
    pub list: Vec<TunnelRule>,
    pub form: TunnelForm,
    pub active: HashMap<String, ActiveTunnel>,
    pub form_baseline: Option<TunnelFormBaseline>,
    pub pending_delete: Option<usize>,
    pub summaries_cache: HashMap<String, String>,
    /// Sort mode for the tunnels overview screen. Cycled by `s`.
    pub sort_mode: TunnelSortMode,

    // Live-data layer. Receivers are drained from `poll()`. The Sender
    // halves are cloned into per-tunnel parser threads when start_tunnel
    // succeeds.
    pub parser_tx: Sender<ParserMessage>,
    pub parser_rx: Receiver<ParserMessage>,
    pub lsof_tx: Sender<LsofMessage>,
    pub lsof_rx: Receiver<LsofMessage>,

    // Last lsof snapshot, keyed by tunnel bind port. Replaced wholesale
    // on every successful poll so closed sockets disappear.
    pub clients: HashMap<u16, Vec<ClientPeer>>,
    pub conflicts: HashMap<u16, PortConflict>,
    pub last_lsof_at: Option<Instant>,

    /// Per-peer rolling braille history, pushed once per lsof poll
    /// arrival from `poll()`. Keyed by `(bind_port, peer.src)` so it
    /// survives wholesale `clients` replacements as long as the peer is
    /// still in the new snapshot. Each cell carries one
    /// `current_rx_bps + current_tx_bps` snapshot from a single lsof
    /// poll, so the visible window covers `PEER_VIZ_BUCKETS` polls
    /// (~24-48s on Linux/macOS).
    pub peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
    /// Wall-clock of the most recent `push_peer_viz` rotation, and the
    /// one before it. The renderer divides `(now - last) / (last - prev)`
    /// to derive a smooth phase in `[0, 1]` that drifts the wave
    /// leftward by exactly one bucket between pushes — adaptive to the
    /// actual poll interval (which varies on macOS due to nettop
    /// overhead).
    pub peer_viz_last_push: Option<Instant>,
    pub peer_viz_prev_push: Option<Instant>,

    // The single shared lsof poller. Lazily started on first tunnel
    // start; lives until App::Drop. The bind_ports list is cloned on
    // every poll iteration so updates are eventually consistent.
    pub lsof: Option<LsofPollerHandle>,

    /// Demo / test seed. When `App.demo_mode == true` the detail panel
    /// reads from this map instead of the live counters, so visual
    /// regression tests are byte-deterministic.
    pub demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
}

impl Default for TunnelState {
    fn default() -> Self {
        let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
        let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
        Self {
            list: Vec::new(),
            form: TunnelForm::new(),
            active: HashMap::new(),
            form_baseline: None,
            pending_delete: None,
            summaries_cache: HashMap::new(),
            sort_mode: TunnelSortMode::default(),
            parser_tx,
            parser_rx,
            lsof_tx,
            lsof_rx,
            clients: HashMap::new(),
            conflicts: HashMap::new(),
            last_lsof_at: None,
            peer_viz: HashMap::new(),
            peer_viz_last_push: None,
            peer_viz_prev_push: None,
            lsof: None,
            demo_live_snapshots: HashMap::new(),
        }
    }
}

impl Drop for TunnelState {
    fn drop(&mut self) {
        if let Some(mut handle) = self.lsof.take() {
            handle.shutdown();
        }
    }
}

impl TunnelState {
    /// Ensure the shared lsof poller is running. Idempotent: a second
    /// call after the poller is already up is a noop. Caller is
    /// responsible for updating `bind_ports` afterwards.
    pub fn ensure_lsof_poller(&mut self) {
        if self.lsof.is_some() {
            return;
        }
        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
        let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
        let thread = crate::tunnel_live::spawn_lsof_poller(
            bind_ports.clone(),
            self.lsof_tx.clone(),
            stop.clone(),
        );
        self.lsof = Some(LsofPollerHandle {
            stop,
            bind_ports,
            thread: Some(thread),
        });
        log::debug!("[purple] Tunnel lsof poller started");
    }

    /// Replace the lsof poller's port list. Callers compute the
    /// `(alias, bind_port, tunnel_pid)` tuples from the SSH config
    /// directives because `ActiveTunnel` does not store the rule set
    /// directly. The poller picks up the new list on its next iteration.
    pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
        if let Some(handle) = &self.lsof {
            if let Ok(mut g) = handle.bind_ports.lock() {
                *g = ports;
            }
        }
    }

    /// Drain the parser channel into per-tunnel live state, drain the
    /// lsof channel into the shared `clients` and `conflicts` maps,
    /// rotate per-tunnel history buckets and finally poll every active
    /// child for exit. Returns (alias, message, is_error) tuples so the
    /// outer loop can route them through `notify_*`.
    pub fn poll(&mut self) -> Vec<(String, String, bool)> {
        let now = Instant::now();
        // Drain channel events first so any pending opens are reflected
        // before we report exits.
        while let Ok(msg) = self.parser_rx.try_recv() {
            if let Some(tunnel) = self.active.get_mut(&msg.alias) {
                tunnel.live.record_event(msg.event);
            }
        }
        // Drain lsof snapshots — keep only the freshest one. Older
        // pending messages are discarded because they would just
        // overwrite each other.
        let mut latest_lsof: Option<LsofMessage> = None;
        while let Ok(msg) = self.lsof_rx.try_recv() {
            latest_lsof = Some(msg);
        }
        if let Some(msg) = latest_lsof {
            self.clients = msg.clients;
            self.conflicts = msg.conflicts;
            self.last_lsof_at = Some(msg.at);
            // Roll the per-peer braille history forward exactly once
            // per lsof arrival. The renderer derives a smooth phase
            // from the timestamp pair below to fill in motion between
            // pushes at terminal frame rate.
            self.push_peer_viz(now);
        }
        // Rotate per-tunnel history. Bucket width is `BUCKET_SECS`
        // (currently 2s), so this is effectively per-poll rotation.
        for tunnel in self.active.values_mut() {
            tunnel.live.rotate_if_due(now);
        }
        // Build a port → alias map once and reuse it for both the
        // throughput aggregation and the concurrent-activity sampling
        // below. Source: the lsof poller's `(alias, port, pid)` view of
        // the active bind ports.
        let port_to_alias: HashMap<u16, String> = self
            .lsof
            .as_ref()
            .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
            .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
            .unwrap_or_default();
        // Aggregate per-peer current bps into per-tunnel readouts. The
        // tunnel-level value is the honest sum of every connected
        // client's flow — it matches the per-peer numbers shown in the
        // roster. The previous SSH-process sampler counted both ends
        // of the loopback hop, which doubled the displayed speed.
        let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
        for (port, peers) in &self.clients {
            let Some(alias) = port_to_alias.get(port) else {
                continue;
            };
            let entry = bps_per_alias
                .entry(alias.clone())
                .or_insert((0u64, 0u64, false));
            for peer in peers {
                entry.0 = entry.0.saturating_add(peer.current_rx_bps);
                entry.1 = entry.1.saturating_add(peer.current_tx_bps);
                if peer.last_sample_at.is_some() {
                    entry.2 = true;
                }
            }
        }
        for (alias, tunnel) in self.active.iter_mut() {
            let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
            tunnel.live.current_rx_bps = rx;
            tunnel.live.current_tx_bps = tx;
            tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
            tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
            if ready {
                tunnel.live.last_throughput_at = Some(now);
            }
        }
        // Sample concurrent activity per alias into the current bucket.
        // Source = max(lsof ESTABLISHED clients, ssh active channels)
        // summed across every bind_port that belongs to the alias. That
        // way the sparkline reflects ongoing concurrency (a long-lived
        // WebKit connection, a streaming HTTP/2 session) rather than
        // only short channel-open bursts.
        let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
        for (port, peers) in &self.clients {
            if let Some(alias) = port_to_alias.get(port) {
                *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
            }
        }
        for (alias, tunnel) in self.active.iter_mut() {
            let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
            let sample = lsof_count.max(tunnel.live.active_channels);
            tunnel.live.sample_activity(sample);
        }

        if self.active.is_empty() {
            return Vec::new();
        }
        let mut exited = Vec::new();
        let mut to_remove = Vec::new();
        for (alias, tunnel) in &mut self.active {
            match tunnel.child.try_wait() {
                Ok(Some(status)) => {
                    // The parser thread holds child.stderr; ask the
                    // shared stderr ringbuffer for the last meaningful
                    // line instead of re-reading the pipe.
                    let stderr_msg = tunnel
                        .live
                        .stderr_buffer
                        .lock()
                        .ok()
                        .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
                        .map(|s| s.trim().to_string())
                        .filter(|s| !s.is_empty());
                    let exit_code = status.code().unwrap_or(-1);
                    if !status.success() {
                        log::error!(
                            "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
                        );
                        if let Some(ref err) = stderr_msg {
                            log::debug!("[external] Tunnel stderr: {}", err.trim());
                        }
                    }
                    let last_exit_line = stderr_msg
                        .clone()
                        .unwrap_or_else(|| format!("exit code {}", exit_code));
                    tunnel.live.last_exit = Some((exit_code, last_exit_line));
                    tunnel.live.parser_stop.store(true, Ordering::Relaxed);
                    // Mark all currently-open channels as closed so the
                    // active count drops to zero on exit.
                    if tunnel.live.active_channels > 0 {
                        let close_now = ChannelEventKind::Close;
                        let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
                        for id in ids {
                            tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
                                at: now,
                                channel_id: id,
                                kind: close_now,
                                channel_kind: None,
                                opened_at: None,
                            });
                        }
                    }
                    let (msg, is_error) = if status.success() {
                        (format!("Tunnel for {} closed.", alias), false)
                    } else if let Some(err) = stderr_msg {
                        (format!("Tunnel for {}: {}", alias, err), true)
                    } else {
                        (
                            format!("Tunnel for {} exited with code {}.", alias, exit_code),
                            true,
                        )
                    };
                    exited.push((alias.clone(), msg, is_error));
                    to_remove.push(alias.clone());
                }
                Ok(None) => {}
                Err(e) => {
                    exited.push((
                        alias.clone(),
                        format!("Tunnel for {} lost: {}", alias, e),
                        true,
                    ));
                    to_remove.push(alias.clone());
                }
            }
        }
        for alias in to_remove {
            self.active.remove(&alias);
        }
        exited
    }

    /// Push one bucket of per-peer braille history. Called exactly
    /// once per lsof arrival so the visible window encodes
    /// `PEER_VIZ_BUCKETS` consecutive poll snapshots — long enough to
    /// see the trend, short enough to react quickly to changes. The
    /// renderer fills in smooth motion between pushes via
    /// `peer_viz_last_push` / `peer_viz_prev_push`. Garbage-collects
    /// entries for peers that no longer appear in `self.clients`.
    pub fn push_peer_viz(&mut self, now: Instant) {
        let mut live: HashSet<(u16, String)> = HashSet::new();
        for (port, peers) in &self.clients {
            for peer in peers {
                let key = (*port, peer.src.clone());
                live.insert(key.clone());
                let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
                let history = self
                    .peer_viz
                    .entry(key)
                    .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
                history.rotate_left(1);
                history[PEER_VIZ_BUCKETS - 1] = combined;
            }
        }
        self.peer_viz.retain(|key, _| live.contains(key));
        self.peer_viz_prev_push = self.peer_viz_last_push;
        self.peer_viz_last_push = Some(now);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn default_state_is_empty() {
        let s = TunnelState::default();
        assert!(s.list.is_empty());
        assert!(s.active.is_empty());
        assert!(s.pending_delete.is_none());
        assert!(s.summaries_cache.is_empty());
    }

    #[test]
    fn poll_on_empty_returns_empty_vec() {
        // Fast path: no active tunnels means no exit events to report and
        // no child processes to reap. Spawning real ssh child processes
        // belongs in integration tests.
        let mut s = TunnelState::default();
        let result = s.poll();
        assert!(result.is_empty());
        assert!(s.active.is_empty());
    }

    fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
        ClientPeer {
            src: src.to_string(),
            process: "curl".into(),
            pid: 1234,
            since: Instant::now(),
            responsible_app: None,
            current_rx_bps: rx,
            current_tx_bps: tx,
            bytes_rcvd: None,
            bytes_sent: None,
            last_sample_at: Some(Instant::now()),
        }
    }

    #[test]
    fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
        let mut s = TunnelState::default();
        s.clients
            .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
        let now = Instant::now();
        s.push_peer_viz(now);
        let key = (8080u16, "127.0.0.1:1".to_string());
        let history = s.peer_viz.get(&key).expect("entry created on first push");
        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
        for cell in &history[..PEER_VIZ_BUCKETS - 1] {
            assert_eq!(*cell, 0);
        }
        assert_eq!(s.peer_viz_last_push, Some(now));
        assert_eq!(s.peer_viz_prev_push, None);
    }

    #[test]
    fn push_peer_viz_rotates_left_on_each_call() {
        let mut s = TunnelState::default();
        s.clients
            .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
        let t0 = Instant::now();
        s.push_peer_viz(t0);
        // Update the bps reading and push again to simulate a second
        // lsof arrival.
        if let Some(peers) = s.clients.get_mut(&8080) {
            peers[0].current_rx_bps = 200;
        }
        let t1 = t0 + std::time::Duration::from_secs(2);
        s.push_peer_viz(t1);
        let key = (8080u16, "127.0.0.1:1".to_string());
        let history = s.peer_viz.get(&key).expect("entry exists");
        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
        assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
        // Both timestamps populated so the renderer can derive a
        // smooth phase from the actual interval.
        assert_eq!(s.peer_viz_last_push, Some(t1));
        assert_eq!(s.peer_viz_prev_push, Some(t0));
    }

    #[test]
    fn push_peer_viz_garbage_collects_disappeared_peers() {
        let mut s = TunnelState::default();
        s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
        let t0 = Instant::now();
        s.push_peer_viz(t0);
        assert!(
            s.peer_viz
                .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
        );
        // Peer disappears from the lsof snapshot on the next poll.
        s.clients.clear();
        s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
        assert!(s.peer_viz.is_empty());
    }
}