Skip to main content

purple_ssh/app/
tunnel_state.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11/// Sort order for the tunnels overview screen. Cycled with `s`.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14    /// Recently-used hosts first (uses `app.history.last_connected`). Active
15    /// tunnels rank by `started_at` so they always sit above idle ones.
16    #[default]
17    MostRecent,
18    /// Alphabetical by host alias, ascending.
19    AlphaHostname,
20}
21
22impl TunnelSortMode {
23    pub fn next(self) -> Self {
24        match self {
25            TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26            TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27        }
28    }
29
30    pub fn label(self) -> &'static str {
31        match self {
32            TunnelSortMode::MostRecent => "most recent",
33            TunnelSortMode::AlphaHostname => "A-Z hostname",
34        }
35    }
36}
37use crate::tunnel_live::{
38    ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39    PortConflict, TunnelLiveSnapshot,
40};
41
42/// Tunnel-owned state grouped off the `App` god-struct. Contains the rule
43/// list, the edit form, the live child-process map, form baseline for the
44/// dirty check, the pending delete index and the cached per-host summary
45/// strings. Pure state container; behaviour lives on `App` or on dedicated
46/// methods here.
47pub struct TunnelState {
48    pub list: Vec<TunnelRule>,
49    pub form: TunnelForm,
50    pub active: HashMap<String, ActiveTunnel>,
51    pub form_baseline: Option<TunnelFormBaseline>,
52    pub pending_delete: Option<usize>,
53    pub summaries_cache: HashMap<String, String>,
54    /// Sort mode for the tunnels overview screen. Cycled by `s`.
55    pub sort_mode: TunnelSortMode,
56
57    // Live-data layer. Receivers are drained from `poll()`. The Sender
58    // halves are cloned into per-tunnel parser threads when start_tunnel
59    // succeeds.
60    pub parser_tx: Sender<ParserMessage>,
61    pub parser_rx: Receiver<ParserMessage>,
62    pub lsof_tx: Sender<LsofMessage>,
63    pub lsof_rx: Receiver<LsofMessage>,
64
65    // Last lsof snapshot, keyed by tunnel bind port. Replaced wholesale
66    // on every successful poll so closed sockets disappear.
67    pub clients: HashMap<u16, Vec<ClientPeer>>,
68    pub conflicts: HashMap<u16, PortConflict>,
69    pub last_lsof_at: Option<Instant>,
70
71    /// Per-peer rolling braille history, pushed once per lsof poll
72    /// arrival from `poll()`. Keyed by `(bind_port, peer.src)` so it
73    /// survives wholesale `clients` replacements as long as the peer is
74    /// still in the new snapshot. Each cell carries one
75    /// `current_rx_bps + current_tx_bps` snapshot from a single lsof
76    /// poll, so the visible window covers `PEER_VIZ_BUCKETS` polls
77    /// (~24-48s on Linux/macOS).
78    pub peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
79    /// Wall-clock of the most recent `push_peer_viz` rotation, and the
80    /// one before it. The renderer divides `(now - last) / (last - prev)`
81    /// to derive a smooth phase in `[0, 1]` that drifts the wave
82    /// leftward by exactly one bucket between pushes — adaptive to the
83    /// actual poll interval (which varies on macOS due to nettop
84    /// overhead).
85    pub peer_viz_last_push: Option<Instant>,
86    pub peer_viz_prev_push: Option<Instant>,
87
88    // The single shared lsof poller. Lazily started on first tunnel
89    // start; lives until App::Drop. The bind_ports list is cloned on
90    // every poll iteration so updates are eventually consistent.
91    pub lsof: Option<LsofPollerHandle>,
92
93    /// Demo / test seed. When `App.demo_mode == true` the detail panel
94    /// reads from this map instead of the live counters, so visual
95    /// regression tests are byte-deterministic.
96    pub demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
97}
98
99impl Default for TunnelState {
100    fn default() -> Self {
101        let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
102        let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
103        Self {
104            list: Vec::new(),
105            form: TunnelForm::new(),
106            active: HashMap::new(),
107            form_baseline: None,
108            pending_delete: None,
109            summaries_cache: HashMap::new(),
110            sort_mode: TunnelSortMode::default(),
111            parser_tx,
112            parser_rx,
113            lsof_tx,
114            lsof_rx,
115            clients: HashMap::new(),
116            conflicts: HashMap::new(),
117            last_lsof_at: None,
118            peer_viz: HashMap::new(),
119            peer_viz_last_push: None,
120            peer_viz_prev_push: None,
121            lsof: None,
122            demo_live_snapshots: HashMap::new(),
123        }
124    }
125}
126
127impl Drop for TunnelState {
128    fn drop(&mut self) {
129        if let Some(mut handle) = self.lsof.take() {
130            handle.shutdown();
131        }
132    }
133}
134
135impl TunnelState {
136    /// Open a delete confirmation for the tunnel at `idx`. The renderer
137    /// reads `pending_delete` to draw the confirm overlay.
138    pub fn request_delete(&mut self, idx: usize) {
139        self.pending_delete = Some(idx);
140    }
141
142    /// Dismiss a pending delete confirmation. Idempotent.
143    pub fn cancel_delete(&mut self) {
144        self.pending_delete = None;
145    }
146
147    /// Ensure the shared lsof poller is running. Idempotent: a second
148    /// call after the poller is already up is a noop. Caller is
149    /// responsible for updating `bind_ports` afterwards.
150    pub fn ensure_lsof_poller(&mut self) {
151        if self.lsof.is_some() {
152            return;
153        }
154        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
155        let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
156        let thread = crate::tunnel_live::spawn_lsof_poller(
157            bind_ports.clone(),
158            self.lsof_tx.clone(),
159            stop.clone(),
160        );
161        self.lsof = Some(LsofPollerHandle {
162            stop,
163            bind_ports,
164            thread: Some(thread),
165        });
166        log::debug!("[purple] Tunnel lsof poller started");
167    }
168
169    /// Replace the lsof poller's port list. Callers compute the
170    /// `(alias, bind_port, tunnel_pid)` tuples from the SSH config
171    /// directives because `ActiveTunnel` does not store the rule set
172    /// directly. The poller picks up the new list on its next iteration.
173    pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
174        if let Some(handle) = &self.lsof {
175            if let Ok(mut g) = handle.bind_ports.lock() {
176                *g = ports;
177            }
178        }
179    }
180
181    /// Drain the parser channel into per-tunnel live state, drain the
182    /// lsof channel into the shared `clients` and `conflicts` maps,
183    /// rotate per-tunnel history buckets and finally poll every active
184    /// child for exit. Returns (alias, message, is_error) tuples so the
185    /// outer loop can route them through `notify_*`.
186    pub fn poll(&mut self) -> Vec<(String, String, bool)> {
187        let now = Instant::now();
188        // Drain channel events first so any pending opens are reflected
189        // before we report exits.
190        while let Ok(msg) = self.parser_rx.try_recv() {
191            if let Some(tunnel) = self.active.get_mut(&msg.alias) {
192                tunnel.live.record_event(msg.event);
193            }
194        }
195        // Drain lsof snapshots — keep only the freshest one. Older
196        // pending messages are discarded because they would just
197        // overwrite each other.
198        let mut latest_lsof: Option<LsofMessage> = None;
199        while let Ok(msg) = self.lsof_rx.try_recv() {
200            latest_lsof = Some(msg);
201        }
202        if let Some(msg) = latest_lsof {
203            self.clients = msg.clients;
204            self.conflicts = msg.conflicts;
205            self.last_lsof_at = Some(msg.at);
206            // Roll the per-peer braille history forward exactly once
207            // per lsof arrival. The renderer derives a smooth phase
208            // from the timestamp pair below to fill in motion between
209            // pushes at terminal frame rate.
210            self.push_peer_viz(now);
211        }
212        // Rotate per-tunnel history. Bucket width is `BUCKET_SECS`
213        // (currently 2s), so this is effectively per-poll rotation.
214        for tunnel in self.active.values_mut() {
215            tunnel.live.rotate_if_due(now);
216        }
217        // Build a port → alias map once and reuse it for both the
218        // throughput aggregation and the concurrent-activity sampling
219        // below. Source: the lsof poller's `(alias, port, pid)` view of
220        // the active bind ports.
221        let port_to_alias: HashMap<u16, String> = self
222            .lsof
223            .as_ref()
224            .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
225            .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
226            .unwrap_or_default();
227        // Aggregate per-peer current bps into per-tunnel readouts. The
228        // tunnel-level value is the honest sum of every connected
229        // client's flow — it matches the per-peer numbers shown in the
230        // roster. The previous SSH-process sampler counted both ends
231        // of the loopback hop, which doubled the displayed speed.
232        let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
233        for (port, peers) in &self.clients {
234            let Some(alias) = port_to_alias.get(port) else {
235                continue;
236            };
237            let entry = bps_per_alias
238                .entry(alias.clone())
239                .or_insert((0u64, 0u64, false));
240            for peer in peers {
241                entry.0 = entry.0.saturating_add(peer.current_rx_bps);
242                entry.1 = entry.1.saturating_add(peer.current_tx_bps);
243                if peer.last_sample_at.is_some() {
244                    entry.2 = true;
245                }
246            }
247        }
248        for (alias, tunnel) in self.active.iter_mut() {
249            let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
250            tunnel.live.current_rx_bps = rx;
251            tunnel.live.current_tx_bps = tx;
252            tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
253            tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
254            if ready {
255                tunnel.live.last_throughput_at = Some(now);
256            }
257        }
258        // Sample concurrent activity per alias into the current bucket.
259        // Source = max(lsof ESTABLISHED clients, ssh active channels)
260        // summed across every bind_port that belongs to the alias. That
261        // way the sparkline reflects ongoing concurrency (a long-lived
262        // WebKit connection, a streaming HTTP/2 session) rather than
263        // only short channel-open bursts.
264        let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
265        for (port, peers) in &self.clients {
266            if let Some(alias) = port_to_alias.get(port) {
267                *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
268            }
269        }
270        for (alias, tunnel) in self.active.iter_mut() {
271            let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
272            let sample = lsof_count.max(tunnel.live.active_channels);
273            tunnel.live.sample_activity(sample);
274        }
275
276        if self.active.is_empty() {
277            return Vec::new();
278        }
279        let mut exited = Vec::new();
280        let mut to_remove = Vec::new();
281        for (alias, tunnel) in &mut self.active {
282            match tunnel.child.try_wait() {
283                Ok(Some(status)) => {
284                    // The parser thread holds child.stderr; ask the
285                    // shared stderr ringbuffer for the last meaningful
286                    // line instead of re-reading the pipe.
287                    let stderr_msg = tunnel
288                        .live
289                        .stderr_buffer
290                        .lock()
291                        .ok()
292                        .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
293                        .map(|s| s.trim().to_string())
294                        .filter(|s| !s.is_empty());
295                    let exit_code = status.code().unwrap_or(-1);
296                    if !status.success() {
297                        log::error!(
298                            "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
299                        );
300                        if let Some(ref err) = stderr_msg {
301                            log::debug!("[external] Tunnel stderr: {}", err.trim());
302                        }
303                    }
304                    let last_exit_line = stderr_msg
305                        .clone()
306                        .unwrap_or_else(|| format!("exit code {}", exit_code));
307                    tunnel.live.last_exit = Some((exit_code, last_exit_line));
308                    tunnel.live.parser_stop.store(true, Ordering::Relaxed);
309                    // Mark all currently-open channels as closed so the
310                    // active count drops to zero on exit.
311                    if tunnel.live.active_channels > 0 {
312                        let close_now = ChannelEventKind::Close;
313                        let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
314                        for id in ids {
315                            tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
316                                at: now,
317                                channel_id: id,
318                                kind: close_now,
319                                channel_kind: None,
320                                opened_at: None,
321                            });
322                        }
323                    }
324                    let (msg, is_error) = if status.success() {
325                        (format!("Tunnel for {} closed.", alias), false)
326                    } else if let Some(err) = stderr_msg {
327                        (format!("Tunnel for {}: {}", alias, err), true)
328                    } else {
329                        (
330                            format!("Tunnel for {} exited with code {}.", alias, exit_code),
331                            true,
332                        )
333                    };
334                    exited.push((alias.clone(), msg, is_error));
335                    to_remove.push(alias.clone());
336                }
337                Ok(None) => {}
338                Err(e) => {
339                    exited.push((
340                        alias.clone(),
341                        format!("Tunnel for {} lost: {}", alias, e),
342                        true,
343                    ));
344                    to_remove.push(alias.clone());
345                }
346            }
347        }
348        for alias in to_remove {
349            self.active.remove(&alias);
350        }
351        exited
352    }
353
354    /// Push one bucket of per-peer braille history. Called exactly
355    /// once per lsof arrival so the visible window encodes
356    /// `PEER_VIZ_BUCKETS` consecutive poll snapshots — long enough to
357    /// see the trend, short enough to react quickly to changes. The
358    /// renderer fills in smooth motion between pushes via
359    /// `peer_viz_last_push` / `peer_viz_prev_push`. Garbage-collects
360    /// entries for peers that no longer appear in `self.clients`.
361    pub fn push_peer_viz(&mut self, now: Instant) {
362        let mut live: HashSet<(u16, String)> = HashSet::new();
363        for (port, peers) in &self.clients {
364            for peer in peers {
365                let key = (*port, peer.src.clone());
366                live.insert(key.clone());
367                let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
368                let history = self
369                    .peer_viz
370                    .entry(key)
371                    .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
372                history.rotate_left(1);
373                history[PEER_VIZ_BUCKETS - 1] = combined;
374            }
375        }
376        self.peer_viz.retain(|key, _| live.contains(key));
377        self.peer_viz_prev_push = self.peer_viz_last_push;
378        self.peer_viz_last_push = Some(now);
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[test]
387    fn default_state_is_empty() {
388        let s = TunnelState::default();
389        assert!(s.list.is_empty());
390        assert!(s.active.is_empty());
391        assert!(s.pending_delete.is_none());
392        assert!(s.summaries_cache.is_empty());
393    }
394
395    #[test]
396    fn poll_on_empty_returns_empty_vec() {
397        // Fast path: no active tunnels means no exit events to report and
398        // no child processes to reap. Spawning real ssh child processes
399        // belongs in integration tests.
400        let mut s = TunnelState::default();
401        let result = s.poll();
402        assert!(result.is_empty());
403        assert!(s.active.is_empty());
404    }
405
406    fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
407        ClientPeer {
408            src: src.to_string(),
409            process: "curl".into(),
410            pid: 1234,
411            since: Instant::now(),
412            responsible_app: None,
413            current_rx_bps: rx,
414            current_tx_bps: tx,
415            bytes_rcvd: None,
416            bytes_sent: None,
417            last_sample_at: Some(Instant::now()),
418        }
419    }
420
421    #[test]
422    fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
423        let mut s = TunnelState::default();
424        s.clients
425            .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
426        let now = Instant::now();
427        s.push_peer_viz(now);
428        let key = (8080u16, "127.0.0.1:1".to_string());
429        let history = s.peer_viz.get(&key).expect("entry created on first push");
430        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
431        for cell in &history[..PEER_VIZ_BUCKETS - 1] {
432            assert_eq!(*cell, 0);
433        }
434        assert_eq!(s.peer_viz_last_push, Some(now));
435        assert_eq!(s.peer_viz_prev_push, None);
436    }
437
438    #[test]
439    fn push_peer_viz_rotates_left_on_each_call() {
440        let mut s = TunnelState::default();
441        s.clients
442            .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
443        let t0 = Instant::now();
444        s.push_peer_viz(t0);
445        // Update the bps reading and push again to simulate a second
446        // lsof arrival.
447        if let Some(peers) = s.clients.get_mut(&8080) {
448            peers[0].current_rx_bps = 200;
449        }
450        let t1 = t0 + std::time::Duration::from_secs(2);
451        s.push_peer_viz(t1);
452        let key = (8080u16, "127.0.0.1:1".to_string());
453        let history = s.peer_viz.get(&key).expect("entry exists");
454        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
455        assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
456        // Both timestamps populated so the renderer can derive a
457        // smooth phase from the actual interval.
458        assert_eq!(s.peer_viz_last_push, Some(t1));
459        assert_eq!(s.peer_viz_prev_push, Some(t0));
460    }
461
462    #[test]
463    fn push_peer_viz_garbage_collects_disappeared_peers() {
464        let mut s = TunnelState::default();
465        s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
466        let t0 = Instant::now();
467        s.push_peer_viz(t0);
468        assert!(
469            s.peer_viz
470                .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
471        );
472        // Peer disappears from the lsof snapshot on the next poll.
473        s.clients.clear();
474        s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
475        assert!(s.peer_viz.is_empty());
476    }
477
478    #[test]
479    fn request_delete_sets_pending_delete_to_some_idx() {
480        let mut s = TunnelState::default();
481        s.request_delete(3);
482        assert_eq!(s.pending_delete, Some(3));
483    }
484
485    #[test]
486    fn cancel_delete_clears_pending_delete() {
487        let mut s = TunnelState::default();
488        s.pending_delete = Some(2);
489        s.cancel_delete();
490        assert!(s.pending_delete.is_none());
491    }
492
493    #[test]
494    fn request_delete_overwrites_existing_pending() {
495        let mut s = TunnelState::default();
496        s.pending_delete = Some(1);
497        s.request_delete(7);
498        assert_eq!(s.pending_delete, Some(7));
499    }
500
501    #[test]
502    fn cancel_delete_is_idempotent_on_empty_pending() {
503        let mut s = TunnelState::default();
504        s.cancel_delete();
505        s.cancel_delete();
506        assert!(s.pending_delete.is_none());
507    }
508}