Skip to main content

purple_ssh/app/
tunnel_state.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11/// Sort order for the tunnels overview screen. Cycled with `s`.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14    /// Recently-used hosts first (uses `app.history.last_connected`). Active
15    /// tunnels rank by `started_at` so they always sit above idle ones.
16    #[default]
17    MostRecent,
18    /// Alphabetical by host alias, ascending.
19    AlphaHostname,
20}
21
22impl TunnelSortMode {
23    pub fn next(self) -> Self {
24        match self {
25            TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26            TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27        }
28    }
29
30    pub fn label(self) -> &'static str {
31        match self {
32            TunnelSortMode::MostRecent => "most recent",
33            TunnelSortMode::AlphaHostname => "A-Z hostname",
34        }
35    }
36}
37use crate::tunnel_live::{
38    ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39    PortConflict, TunnelLiveSnapshot,
40};
41
42/// Tunnel-owned state grouped off the `App` god-struct. Contains the rule
43/// list, the edit form, the live child-process map, form baseline for the
44/// dirty check, the pending delete index and the cached per-host summary
45/// strings. Pure state container; behaviour lives on `App` or on dedicated
46/// methods here.
47pub struct TunnelState {
48    pub(in crate::app) list: Vec<TunnelRule>,
49    pub(in crate::app) form: TunnelForm,
50    pub(in crate::app) active: HashMap<String, ActiveTunnel>,
51    pub(in crate::app) form_baseline: Option<TunnelFormBaseline>,
52    pub(in crate::app) pending_delete: Option<usize>,
53    pub(in crate::app) summaries_cache: HashMap<String, String>,
54    /// Sort mode for the tunnels overview screen. Cycled by `s`.
55    pub(in crate::app) sort_mode: TunnelSortMode,
56
57    // Live-data layer. Receivers are drained from `poll()`. The Sender
58    // halves are cloned into per-tunnel parser threads when start_tunnel
59    // succeeds.
60    pub(in crate::app) parser_tx: Sender<ParserMessage>,
61    pub(in crate::app) parser_rx: Receiver<ParserMessage>,
62    pub(in crate::app) lsof_tx: Sender<LsofMessage>,
63    pub(in crate::app) lsof_rx: Receiver<LsofMessage>,
64
65    // Last lsof snapshot, keyed by tunnel bind port. Replaced wholesale
66    // on every successful poll so closed sockets disappear.
67    pub(in crate::app) clients: HashMap<u16, Vec<ClientPeer>>,
68    pub(in crate::app) conflicts: HashMap<u16, PortConflict>,
69    pub(in crate::app) last_lsof_at: Option<Instant>,
70
71    /// Per-peer rolling braille history, pushed once per lsof poll
72    /// arrival from `poll()`. Keyed by `(bind_port, peer.src)` so it
73    /// survives wholesale `clients` replacements as long as the peer is
74    /// still in the new snapshot. Each cell carries one
75    /// `current_rx_bps + current_tx_bps` snapshot from a single lsof
76    /// poll, so the visible window covers `PEER_VIZ_BUCKETS` polls
77    /// (~24-48s on Linux/macOS).
78    pub(in crate::app) peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
79    /// Wall-clock of the most recent `push_peer_viz` rotation, and the
80    /// one before it. The renderer divides `(now - last) / (last - prev)`
81    /// to derive a smooth phase in `[0, 1]` that drifts the wave
82    /// leftward by exactly one bucket between pushes — adaptive to the
83    /// actual poll interval (which varies on macOS due to nettop
84    /// overhead).
85    pub(in crate::app) peer_viz_last_push: Option<Instant>,
86    pub(in crate::app) peer_viz_prev_push: Option<Instant>,
87
88    // The single shared lsof poller. Lazily started on first tunnel
89    // start; lives until App::Drop. The bind_ports list is cloned on
90    // every poll iteration so updates are eventually consistent.
91    pub(in crate::app) lsof: Option<LsofPollerHandle>,
92
93    /// Demo / test seed. When `App.demo_mode == true` the detail panel
94    /// reads from this map instead of the live counters, so visual
95    /// regression tests are byte-deterministic.
96    pub(in crate::app) demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
97}
98
99impl Default for TunnelState {
100    fn default() -> Self {
101        let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
102        let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
103        Self {
104            list: Vec::new(),
105            form: TunnelForm::new(),
106            active: HashMap::new(),
107            form_baseline: None,
108            pending_delete: None,
109            summaries_cache: HashMap::new(),
110            sort_mode: TunnelSortMode::default(),
111            parser_tx,
112            parser_rx,
113            lsof_tx,
114            lsof_rx,
115            clients: HashMap::new(),
116            conflicts: HashMap::new(),
117            last_lsof_at: None,
118            peer_viz: HashMap::new(),
119            peer_viz_last_push: None,
120            peer_viz_prev_push: None,
121            lsof: None,
122            demo_live_snapshots: HashMap::new(),
123        }
124    }
125}
126
127impl Drop for TunnelState {
128    fn drop(&mut self) {
129        if let Some(mut handle) = self.lsof.take() {
130            handle.shutdown();
131        }
132    }
133}
134
135impl TunnelState {
136    pub fn list(&self) -> &[TunnelRule] {
137        &self.list
138    }
139
140    pub fn list_mut(&mut self) -> &mut Vec<TunnelRule> {
141        &mut self.list
142    }
143
144    pub fn form(&self) -> &TunnelForm {
145        &self.form
146    }
147
148    pub fn form_mut(&mut self) -> &mut TunnelForm {
149        &mut self.form
150    }
151
152    pub fn reset_form(&mut self) {
153        self.form = TunnelForm::new();
154    }
155
156    /// Load this host's tunnel directives from the SSH config into `list`.
157    pub fn load_directives(
158        &mut self,
159        config: &crate::ssh_config::model::SshConfigFile,
160        alias: &str,
161    ) {
162        self.list = config.find_tunnel_directives(alias);
163    }
164
165    /// True if the tunnel form differs from its captured baseline.
166    pub fn form_is_dirty(&self) -> bool {
167        match &self.form_baseline {
168            Some(b) => {
169                self.form.tunnel_type != b.tunnel_type
170                    || self.form.bind_port != b.bind_port
171                    || self.form.remote_host != b.remote_host
172                    || self.form.remote_port != b.remote_port
173                    || self.form.bind_address != b.bind_address
174            }
175            None => false,
176        }
177    }
178
179    pub fn active(&self) -> &HashMap<String, ActiveTunnel> {
180        &self.active
181    }
182
183    pub fn active_get(&self, alias: &str) -> Option<&ActiveTunnel> {
184        self.active.get(alias)
185    }
186
187    pub fn active_get_mut(&mut self, alias: &str) -> Option<&mut ActiveTunnel> {
188        self.active.get_mut(alias)
189    }
190
191    pub fn active_contains(&self, alias: &str) -> bool {
192        self.active.contains_key(alias)
193    }
194
195    pub fn active_insert(&mut self, alias: String, tunnel: ActiveTunnel) {
196        self.active.insert(alias, tunnel);
197    }
198
199    pub fn active_remove(&mut self, alias: &str) -> Option<ActiveTunnel> {
200        self.active.remove(alias)
201    }
202
203    pub fn drain_active(&mut self) -> std::collections::hash_map::Drain<'_, String, ActiveTunnel> {
204        self.active.drain()
205    }
206
207    pub fn clear_active(&mut self) {
208        self.active.clear();
209    }
210
211    pub fn pending_delete(&self) -> Option<usize> {
212        self.pending_delete
213    }
214
215    pub fn take_pending_delete(&mut self) -> Option<usize> {
216        self.pending_delete.take()
217    }
218
219    pub fn sort_mode(&self) -> TunnelSortMode {
220        self.sort_mode
221    }
222
223    pub fn set_sort_mode(&mut self, mode: TunnelSortMode) {
224        self.sort_mode = mode;
225    }
226
227    pub fn form_baseline(&self) -> Option<&TunnelFormBaseline> {
228        self.form_baseline.as_ref()
229    }
230
231    pub fn set_form_baseline(&mut self, baseline: Option<TunnelFormBaseline>) {
232        self.form_baseline = baseline;
233    }
234
235    pub fn demo_live_snapshots(&self) -> &HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
236        &self.demo_live_snapshots
237    }
238
239    pub fn demo_live_snapshots_mut(
240        &mut self,
241    ) -> &mut HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
242        &mut self.demo_live_snapshots
243    }
244
245    pub fn parser_tx(&self) -> Sender<ParserMessage> {
246        self.parser_tx.clone()
247    }
248
249    pub fn clients(&self) -> &HashMap<u16, Vec<ClientPeer>> {
250        &self.clients
251    }
252
253    pub fn peer_viz(&self) -> &HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]> {
254        &self.peer_viz
255    }
256
257    pub fn peer_viz_last_push(&self) -> Option<Instant> {
258        self.peer_viz_last_push
259    }
260
261    pub fn peer_viz_prev_push(&self) -> Option<Instant> {
262        self.peer_viz_prev_push
263    }
264
265    pub fn summaries_cache(&self) -> &HashMap<String, String> {
266        &self.summaries_cache
267    }
268
269    pub fn summaries_cache_mut(&mut self) -> &mut HashMap<String, String> {
270        &mut self.summaries_cache
271    }
272
273    /// Open a delete confirmation for the tunnel at `idx`. The renderer
274    /// reads `pending_delete` to draw the confirm overlay.
275    pub fn request_delete(&mut self, idx: usize) {
276        self.pending_delete = Some(idx);
277    }
278
279    /// Dismiss a pending delete confirmation. Idempotent.
280    pub fn cancel_delete(&mut self) {
281        self.pending_delete = None;
282    }
283
284    /// Ensure the shared lsof poller is running. Idempotent: a second
285    /// call after the poller is already up is a noop. Caller is
286    /// responsible for updating `bind_ports` afterwards.
287    pub fn ensure_lsof_poller(&mut self) {
288        if self.lsof.is_some() {
289            return;
290        }
291        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
292        let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
293        let thread = crate::tunnel_live::spawn_lsof_poller(
294            bind_ports.clone(),
295            self.lsof_tx.clone(),
296            stop.clone(),
297        );
298        self.lsof = Some(LsofPollerHandle {
299            stop,
300            bind_ports,
301            thread: Some(thread),
302        });
303        log::debug!("[purple] Tunnel lsof poller started");
304    }
305
306    /// Replace the lsof poller's port list. Callers compute the
307    /// `(alias, bind_port, tunnel_pid)` tuples from the SSH config
308    /// directives because `ActiveTunnel` does not store the rule set
309    /// directly. The poller picks up the new list on its next iteration.
310    pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
311        if let Some(handle) = &self.lsof {
312            if let Ok(mut g) = handle.bind_ports.lock() {
313                *g = ports;
314            }
315        }
316    }
317
318    /// Drain the parser channel into per-tunnel live state, drain the
319    /// lsof channel into the shared `clients` and `conflicts` maps,
320    /// rotate per-tunnel history buckets and finally poll every active
321    /// child for exit. Returns (alias, message, is_error) tuples so the
322    /// outer loop can route them through `notify_*`.
323    pub fn poll(&mut self) -> Vec<(String, String, bool)> {
324        let now = Instant::now();
325        // Drain channel events first so any pending opens are reflected
326        // before we report exits.
327        while let Ok(msg) = self.parser_rx.try_recv() {
328            if let Some(tunnel) = self.active.get_mut(&msg.alias) {
329                tunnel.live.record_event(msg.event);
330            }
331        }
332        // Drain lsof snapshots — keep only the freshest one. Older
333        // pending messages are discarded because they would just
334        // overwrite each other.
335        let mut latest_lsof: Option<LsofMessage> = None;
336        while let Ok(msg) = self.lsof_rx.try_recv() {
337            latest_lsof = Some(msg);
338        }
339        if let Some(msg) = latest_lsof {
340            self.clients = msg.clients;
341            self.conflicts = msg.conflicts;
342            self.last_lsof_at = Some(msg.at);
343            // Roll the per-peer braille history forward exactly once
344            // per lsof arrival. The renderer derives a smooth phase
345            // from the timestamp pair below to fill in motion between
346            // pushes at terminal frame rate.
347            self.push_peer_viz(now);
348        }
349        // Rotate per-tunnel history. Bucket width is `BUCKET_SECS`
350        // (currently 2s), so this is effectively per-poll rotation.
351        for tunnel in self.active.values_mut() {
352            tunnel.live.rotate_if_due(now);
353        }
354        // Build a port → alias map once and reuse it for both the
355        // throughput aggregation and the concurrent-activity sampling
356        // below. Source: the lsof poller's `(alias, port, pid)` view of
357        // the active bind ports.
358        let port_to_alias: HashMap<u16, String> = self
359            .lsof
360            .as_ref()
361            .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
362            .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
363            .unwrap_or_default();
364        // Aggregate per-peer current bps into per-tunnel readouts. The
365        // tunnel-level value is the honest sum of every connected
366        // client's flow — it matches the per-peer numbers shown in the
367        // roster. The previous SSH-process sampler counted both ends
368        // of the loopback hop, which doubled the displayed speed.
369        let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
370        for (port, peers) in &self.clients {
371            let Some(alias) = port_to_alias.get(port) else {
372                continue;
373            };
374            let entry = bps_per_alias
375                .entry(alias.clone())
376                .or_insert((0u64, 0u64, false));
377            for peer in peers {
378                entry.0 = entry.0.saturating_add(peer.current_rx_bps);
379                entry.1 = entry.1.saturating_add(peer.current_tx_bps);
380                if peer.last_sample_at.is_some() {
381                    entry.2 = true;
382                }
383            }
384        }
385        for (alias, tunnel) in self.active.iter_mut() {
386            let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
387            tunnel.live.current_rx_bps = rx;
388            tunnel.live.current_tx_bps = tx;
389            tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
390            tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
391            if ready {
392                tunnel.live.last_throughput_at = Some(now);
393            }
394        }
395        // Sample concurrent activity per alias into the current bucket.
396        // Source = max(lsof ESTABLISHED clients, ssh active channels)
397        // summed across every bind_port that belongs to the alias. That
398        // way the sparkline reflects ongoing concurrency (a long-lived
399        // WebKit connection, a streaming HTTP/2 session) rather than
400        // only short channel-open bursts.
401        let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
402        for (port, peers) in &self.clients {
403            if let Some(alias) = port_to_alias.get(port) {
404                *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
405            }
406        }
407        for (alias, tunnel) in self.active.iter_mut() {
408            let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
409            let sample = lsof_count.max(tunnel.live.active_channels);
410            tunnel.live.sample_activity(sample);
411        }
412
413        if self.active.is_empty() {
414            return Vec::new();
415        }
416        let mut exited = Vec::new();
417        let mut to_remove = Vec::new();
418        for (alias, tunnel) in &mut self.active {
419            match tunnel.child.try_wait() {
420                Ok(Some(status)) => {
421                    // The parser thread holds child.stderr; ask the
422                    // shared stderr ringbuffer for the last meaningful
423                    // line instead of re-reading the pipe.
424                    let stderr_msg = tunnel
425                        .live
426                        .stderr_buffer
427                        .lock()
428                        .ok()
429                        .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
430                        .map(|s| s.trim().to_string())
431                        .filter(|s| !s.is_empty());
432                    let exit_code = status.code().unwrap_or(-1);
433                    if !status.success() {
434                        log::error!(
435                            "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
436                        );
437                        if let Some(ref err) = stderr_msg {
438                            log::debug!("[external] Tunnel stderr: {}", err.trim());
439                        }
440                    }
441                    let last_exit_line = stderr_msg
442                        .clone()
443                        .unwrap_or_else(|| format!("exit code {}", exit_code));
444                    tunnel.live.last_exit = Some((exit_code, last_exit_line));
445                    tunnel.live.parser_stop.store(true, Ordering::Relaxed);
446                    // Mark all currently-open channels as closed so the
447                    // active count drops to zero on exit.
448                    if tunnel.live.active_channels > 0 {
449                        let close_now = ChannelEventKind::Close;
450                        let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
451                        for id in ids {
452                            tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
453                                at: now,
454                                channel_id: id,
455                                kind: close_now,
456                                channel_kind: None,
457                                opened_at: None,
458                            });
459                        }
460                    }
461                    let (msg, is_error) = if status.success() {
462                        (format!("Tunnel for {} closed.", alias), false)
463                    } else if let Some(err) = stderr_msg {
464                        (format!("Tunnel for {}: {}", alias, err), true)
465                    } else {
466                        (
467                            format!("Tunnel for {} exited with code {}.", alias, exit_code),
468                            true,
469                        )
470                    };
471                    exited.push((alias.clone(), msg, is_error));
472                    to_remove.push(alias.clone());
473                }
474                Ok(None) => {}
475                Err(e) => {
476                    exited.push((
477                        alias.clone(),
478                        format!("Tunnel for {} lost: {}", alias, e),
479                        true,
480                    ));
481                    to_remove.push(alias.clone());
482                }
483            }
484        }
485        for alias in to_remove {
486            self.active.remove(&alias);
487        }
488        exited
489    }
490
491    /// Push one bucket of per-peer braille history. Called exactly
492    /// once per lsof arrival so the visible window encodes
493    /// `PEER_VIZ_BUCKETS` consecutive poll snapshots — long enough to
494    /// see the trend, short enough to react quickly to changes. The
495    /// renderer fills in smooth motion between pushes via
496    /// `peer_viz_last_push` / `peer_viz_prev_push`. Garbage-collects
497    /// entries for peers that no longer appear in `self.clients`.
498    pub fn push_peer_viz(&mut self, now: Instant) {
499        let mut live: HashSet<(u16, String)> = HashSet::new();
500        for (port, peers) in &self.clients {
501            for peer in peers {
502                let key = (*port, peer.src.clone());
503                live.insert(key.clone());
504                let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
505                let history = self
506                    .peer_viz
507                    .entry(key)
508                    .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
509                history.rotate_left(1);
510                history[PEER_VIZ_BUCKETS - 1] = combined;
511            }
512        }
513        self.peer_viz.retain(|key, _| live.contains(key));
514        self.peer_viz_prev_push = self.peer_viz_last_push;
515        self.peer_viz_last_push = Some(now);
516    }
517
518    /// Drop demo-mode tunnel snapshots whose alias is no longer in
519    /// `valid_aliases`. Called from `App::reload_hosts`. Outside demo
520    /// the map stays empty so this is a no-op, but a demo workflow that
521    /// renames or deletes a host should not leak the old snapshot.
522    pub fn prune_orphans(&mut self, valid_aliases: &HashSet<&str>) {
523        let pre = self.demo_live_snapshots.len();
524        self.demo_live_snapshots
525            .retain(|alias, _| valid_aliases.contains(alias.as_str()));
526        let dropped = pre.saturating_sub(self.demo_live_snapshots.len());
527        if dropped > 0 {
528            log::debug!(
529                "[purple] reload_hosts: dropped {dropped} orphan demo_live_snapshots entrie(s)"
530            );
531        }
532    }
533
534    /// Move the active-tunnel handle from `old` to `new` on host
535    /// rename. Called from `App::migrate_alias_keyed_caches` before
536    /// `reload_hosts`, whose prune step would otherwise drop entries
537    /// still under the old alias. No-op when `old == new` or no
538    /// active tunnel exists under `old`.
539    pub fn migrate_alias(&mut self, old: &str, new: &str) {
540        if old == new {
541            return;
542        }
543        if let Some(t) = self.active.remove(old) {
544            self.active.insert(new.to_string(), t);
545        }
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552
553    #[test]
554    fn default_state_is_empty() {
555        let s = TunnelState::default();
556        assert!(s.list.is_empty());
557        assert!(s.active.is_empty());
558        assert!(s.pending_delete.is_none());
559        assert!(s.summaries_cache.is_empty());
560    }
561
562    #[test]
563    fn poll_on_empty_returns_empty_vec() {
564        // Fast path: no active tunnels means no exit events to report and
565        // no child processes to reap. Spawning real ssh child processes
566        // belongs in integration tests.
567        let mut s = TunnelState::default();
568        let result = s.poll();
569        assert!(result.is_empty());
570        assert!(s.active.is_empty());
571    }
572
573    fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
574        ClientPeer {
575            src: src.to_string(),
576            process: "curl".into(),
577            pid: 1234,
578            since: Instant::now(),
579            responsible_app: None,
580            current_rx_bps: rx,
581            current_tx_bps: tx,
582            bytes_rcvd: None,
583            bytes_sent: None,
584            last_sample_at: Some(Instant::now()),
585        }
586    }
587
588    #[test]
589    fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
590        let mut s = TunnelState::default();
591        s.clients
592            .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
593        let now = Instant::now();
594        s.push_peer_viz(now);
595        let key = (8080u16, "127.0.0.1:1".to_string());
596        let history = s.peer_viz.get(&key).expect("entry created on first push");
597        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
598        for cell in &history[..PEER_VIZ_BUCKETS - 1] {
599            assert_eq!(*cell, 0);
600        }
601        assert_eq!(s.peer_viz_last_push, Some(now));
602        assert_eq!(s.peer_viz_prev_push, None);
603    }
604
605    #[test]
606    fn push_peer_viz_rotates_left_on_each_call() {
607        let mut s = TunnelState::default();
608        s.clients
609            .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
610        let t0 = Instant::now();
611        s.push_peer_viz(t0);
612        // Update the bps reading and push again to simulate a second
613        // lsof arrival.
614        if let Some(peers) = s.clients.get_mut(&8080) {
615            peers[0].current_rx_bps = 200;
616        }
617        let t1 = t0 + std::time::Duration::from_secs(2);
618        s.push_peer_viz(t1);
619        let key = (8080u16, "127.0.0.1:1".to_string());
620        let history = s.peer_viz.get(&key).expect("entry exists");
621        assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
622        assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
623        // Both timestamps populated so the renderer can derive a
624        // smooth phase from the actual interval.
625        assert_eq!(s.peer_viz_last_push, Some(t1));
626        assert_eq!(s.peer_viz_prev_push, Some(t0));
627    }
628
629    #[test]
630    fn push_peer_viz_garbage_collects_disappeared_peers() {
631        let mut s = TunnelState::default();
632        s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
633        let t0 = Instant::now();
634        s.push_peer_viz(t0);
635        assert!(
636            s.peer_viz
637                .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
638        );
639        // Peer disappears from the lsof snapshot on the next poll.
640        s.clients.clear();
641        s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
642        assert!(s.peer_viz.is_empty());
643    }
644
645    #[test]
646    fn request_delete_sets_pending_delete_to_some_idx() {
647        let mut s = TunnelState::default();
648        s.request_delete(3);
649        assert_eq!(s.pending_delete, Some(3));
650    }
651
652    #[test]
653    fn cancel_delete_clears_pending_delete() {
654        let mut s = TunnelState::default();
655        s.pending_delete = Some(2);
656        s.cancel_delete();
657        assert!(s.pending_delete.is_none());
658    }
659
660    #[test]
661    fn request_delete_overwrites_existing_pending() {
662        let mut s = TunnelState::default();
663        s.pending_delete = Some(1);
664        s.request_delete(7);
665        assert_eq!(s.pending_delete, Some(7));
666    }
667
668    #[test]
669    fn cancel_delete_is_idempotent_on_empty_pending() {
670        let mut s = TunnelState::default();
671        s.cancel_delete();
672        s.cancel_delete();
673        assert!(s.pending_delete.is_none());
674    }
675
676    fn empty_snapshot() -> crate::tunnel_live::TunnelLiveSnapshot {
677        crate::tunnel_live::TunnelLiveSnapshot {
678            uptime_secs: 0,
679            active_channels: 0,
680            peak_concurrent: 0,
681            total_opens: 0,
682            idle_secs: 0,
683            rx_history: [0; crate::tunnel_live::HISTORY_BUCKETS],
684            tx_history: [0; crate::tunnel_live::HISTORY_BUCKETS],
685            current_rx_bps: 0,
686            current_tx_bps: 0,
687            peak_rx_bps: 0,
688            peak_tx_bps: 0,
689            throughput_ready: false,
690            clients: vec![],
691            events: vec![],
692            currently_open: vec![],
693            conflict: None,
694            last_exit: None,
695        }
696    }
697
698    #[test]
699    fn prune_orphans_drops_unknown_demo_snapshots() {
700        let mut s = TunnelState::default();
701        s.demo_live_snapshots
702            .insert("keep".to_string(), empty_snapshot());
703        s.demo_live_snapshots
704            .insert("drop".to_string(), empty_snapshot());
705
706        let valid: HashSet<&str> = ["keep"].into_iter().collect();
707        s.prune_orphans(&valid);
708
709        assert!(s.demo_live_snapshots.contains_key("keep"));
710        assert!(!s.demo_live_snapshots.contains_key("drop"));
711    }
712
713    #[test]
714    fn migrate_alias_is_noop_on_empty_active_map() {
715        // Constructing a real ActiveTunnel requires a spawned child
716        // process; the active-map rename is covered by the
717        // `migrate_alias_keyed_caches_*` integration tests, which build
718        // a full App. Here we just verify the no-op contract on absent
719        // and self-rename inputs.
720        let mut s = TunnelState::default();
721        s.migrate_alias("missing", "new");
722        assert!(s.active.is_empty());
723        s.migrate_alias("same", "same");
724        assert!(s.active.is_empty());
725    }
726
727    fn state_matching_baseline() -> TunnelState {
728        let mut s = TunnelState::default();
729        s.form.tunnel_type = crate::tunnel::TunnelType::Local;
730        s.form.bind_port = "8080".into();
731        s.form.remote_host = "db.internal".into();
732        s.form.remote_port = "5432".into();
733        s.form.bind_address = "127.0.0.1".into();
734        s.set_form_baseline(Some(TunnelFormBaseline {
735            tunnel_type: crate::tunnel::TunnelType::Local,
736            bind_port: "8080".into(),
737            remote_host: "db.internal".into(),
738            remote_port: "5432".into(),
739            bind_address: "127.0.0.1".into(),
740        }));
741        s
742    }
743
744    #[test]
745    fn form_is_dirty_is_false_without_a_baseline() {
746        let mut s = TunnelState::default();
747        s.form.bind_port = "9000".into();
748        assert!(!s.form_is_dirty());
749    }
750
751    #[test]
752    fn form_is_dirty_is_false_when_form_equals_baseline() {
753        assert!(!state_matching_baseline().form_is_dirty());
754    }
755
756    fn assert_field_change_is_dirty(field: &str, mutate: impl FnOnce(&mut TunnelForm)) {
757        let mut s = state_matching_baseline();
758        mutate(&mut s.form);
759        assert!(s.form_is_dirty(), "a change in {field} must read dirty");
760    }
761
762    #[test]
763    fn form_is_dirty_detects_a_change_in_each_field() {
764        assert_field_change_is_dirty("tunnel_type", |f| {
765            f.tunnel_type = crate::tunnel::TunnelType::Remote;
766        });
767        assert_field_change_is_dirty("bind_port", |f| f.bind_port.push('1'));
768        assert_field_change_is_dirty("remote_host", |f| f.remote_host.push('x'));
769        assert_field_change_is_dirty("remote_port", |f| f.remote_port.push('1'));
770        assert_field_change_is_dirty("bind_address", |f| f.bind_address.push('9'));
771    }
772}