Skip to main content

purple_ssh/
tunnel_live.rs

1//! Live-data layer for active SSH tunnels. Two worker types:
2//!
3//! - One stderr-parser thread per tunnel that reads `ssh -v` output and
4//!   emits `ChannelEvent`s when ssh logs `debug1: channel N: new` and
5//!   `debug1: channel N: free` lines. Also keeps the last few stderr
6//!   lines so `poll_tunnels()` can surface a meaningful exit reason.
7//!
8//! - One shared lsof poller that runs `lsof -iTCP -P -n` every 2s,
9//!   filters by the active tunnel bind ports and emits `LsofMessage`
10//!   snapshots with connected clients and port conflicts.
11//!
12//! All communication is via `std::sync::mpsc` channels. No tokio.
13
14use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader};
16#[cfg(any(target_os = "macos", target_os = "linux"))]
17use std::process::Stdio;
18use std::process::{ChildStderr, Command};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::mpsc::Sender;
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::{Duration, Instant};
24
25/// Maximum number of channel events kept in the per-tunnel ringbuffer.
26pub const MAX_EVENTS: usize = 50;
27/// Bucket width of the rolling activity history, in seconds. Two seconds
28/// matches the lsof poll interval, so the sparkline is fed at the same
29/// rate it is sampled. Each bucket carries the peak concurrent client
30/// count observed within its window.
31pub const BUCKET_SECS: u64 = 2;
32
33/// Number of buckets in the rolling activity history. Combined with
34/// `BUCKET_SECS` this is `HISTORY_BUCKETS * BUCKET_SECS = 300s ≈ 5 min`
35/// of history. Wide window so sparse traffic still has visible content
36/// across the chart. Bucket `HISTORY_BUCKETS - 1` is "now".
37pub const HISTORY_BUCKETS: usize = 150;
38/// Number of stderr lines kept per tunnel for exit-reason display.
39pub const STDERR_BUFFER_LINES: usize = 10;
40/// Maximum number of clients reported per bind port. The card only
41/// renders a handful and lsof can list hundreds of peers on a busy
42/// load balancer, so we cap to keep allocation bounded.
43pub const MAX_CLIENTS_PER_PORT: usize = 64;
44
45/// Channel-open or -close event observed in `ssh -v` stderr.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ChannelEventKind {
48    Open,
49    Close,
50}
51
52/// What kind of ssh channel this event belongs to. The bracketed token
53/// after `channel N: new` in `ssh -v` output ("port listener",
54/// "direct-tcpip", "dynamic-tcpip" etc.) maps onto these variants.
55///
56/// Only `Direct`, `Forwarded` and `Dynamic` represent end-user traffic;
57/// the rest are ssh-internal bookkeeping (the listener that binds the
58/// local port, mux master channels, agent forwarding) and are filtered
59/// out before rendering the EVENTS card.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ChannelKind {
62    /// `direct-tcpip` — outgoing client connection through a LocalForward.
63    Direct,
64    /// `forwarded-tcpip` — incoming client connection arriving via a
65    /// RemoteForward.
66    Forwarded,
67    /// `dynamic-tcpip` — single SOCKS request through a DynamicForward.
68    Dynamic,
69    /// Anything else (`port listener`, `client-session`, `mux-master`,
70    /// `auth-agent@openssh.com`, `x11`, …).
71    Other,
72}
73
74impl ChannelKind {
75    /// Parse the bracketed token from a `channel N: new [<type>]` line.
76    pub fn from_bracket(token: &str) -> Self {
77        match token {
78            "direct-tcpip" => Self::Direct,
79            "forwarded-tcpip" => Self::Forwarded,
80            "dynamic-tcpip" => Self::Dynamic,
81            _ => Self::Other,
82        }
83    }
84
85    /// True if this kind represents end-user traffic that should appear
86    /// in the EVENTS card.
87    pub fn is_user_visible(self) -> bool {
88        matches!(self, Self::Direct | Self::Forwarded | Self::Dynamic)
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct ChannelEvent {
94    pub at: Instant,
95    pub channel_id: u32,
96    pub kind: ChannelEventKind,
97    /// Bracketed channel type from the `new` line, or the kind that
98    /// was recorded when the channel opened (for `Close` events).
99    /// `None` if neither side could be parsed (defensive — the
100    /// regression suite expects every event to have a kind).
101    pub channel_kind: Option<ChannelKind>,
102    /// For `Close` events, the matching open time (so the UI can render
103    /// the channel duration). `None` if the open was missed.
104    pub opened_at: Option<Instant>,
105}
106
107/// Number of buckets in the per-client throughput history. The renderer
108/// uses `VIZ_TICK_MS` cadence (one shift per 100ms), so 12 cells covers
109/// the most recent ~1.2 seconds. The latest `current_rx_bps + current_tx_bps`
110/// reading is pushed into the rightmost cell every tick, which gives the
111/// braille wave continuous leftward motion at terminal frame rate even
112/// when underlying lsof samples land only every few seconds.
113pub const PEER_VIZ_BUCKETS: usize = 12;
114
115/// One peer observed by the lsof poller as connected to a forwarded port.
116#[derive(Debug, Clone)]
117pub struct ClientPeer {
118    pub src: String,
119    pub process: String,
120    /// Owner pid of the client socket. Surfaced in the CLIENTS card so
121    /// users can correlate a connection with a local process.
122    pub pid: u32,
123    pub since: Instant,
124    /// On macOS, the user-facing app that "owns" this socket according
125    /// to the kernel-tracked responsible-pid. Lets the CLIENTS card show
126    /// `Safari` for a `WebKit.Networking` XPC daemon, or `Ghostty` for
127    /// a `psql` started from a Ghostty terminal. `None` when responsible
128    /// pid resolution is unavailable, equals self, or the lookup failed.
129    pub responsible_app: Option<String>,
130    /// Most-recent rx/tx bytes-per-second sample, derived by diffing
131    /// per-socket cumulative byte counters between lsof polls. Zero
132    /// until a second sample arrives. macOS uses a per-pid fallback
133    /// when per-socket counters are not available.
134    pub current_rx_bps: u64,
135    pub current_tx_bps: u64,
136    /// Cumulative byte counters at the most recent sample. Used to diff
137    /// against the next sample. `None` until the first sample arrives.
138    pub bytes_rcvd: Option<u64>,
139    pub bytes_sent: Option<u64>,
140    /// Time of the most recent sample. `None` until the first sample.
141    pub last_sample_at: Option<Instant>,
142}
143
144/// Another process bound to the same port as our tunnel. Detected when
145/// lsof shows a LISTEN row on a tunnel bind port owned by a pid that
146/// is not our tunnel pid. Reserved for future port-conflict surfacing
147/// (see `app::tunnel_state::TunnelsState::conflicts`); the minimal
148/// option-A detail panel does not show conflicts inline.
149#[derive(Debug, Clone)]
150#[allow(dead_code)]
151pub struct PortConflict {
152    pub port: u16,
153    pub process: String,
154    pub pid: u32,
155}
156
157/// Per-tunnel live state. Lives on `ActiveTunnel` so a HashMap remove()
158/// drops the state and joins the parser thread (no zijkanaal map that
159/// could leak after `tunnels.active.remove()`).
160pub struct TunnelLiveState {
161    pub events: VecDeque<ChannelEvent>,
162    pub opens_history: [u8; HISTORY_BUCKETS],
163    /// Wall-clock time of the most recent rotate. Used by `rotate_if_due`
164    /// to advance the history buckets one slot per elapsed minute.
165    pub history_last_rotate: Instant,
166    pub peak_concurrent: u32,
167    pub total_opens: u32,
168    pub last_event_at: Option<Instant>,
169    pub active_channels: u32,
170    /// Map of currently-open channel id -> open time + kind, used to
171    /// compute duration and restore the kind when a `Close` arrives.
172    pub channel_open: HashMap<u32, (Instant, ChannelKind)>,
173    /// Filled by `poll_tunnels()` when the child exits unexpectedly.
174    pub last_exit: Option<(i32, String)>,
175    /// Last few stderr lines, written by the parser thread, read by
176    /// `poll_tunnels()` on exit to compose `last_exit`.
177    pub stderr_buffer: Arc<Mutex<VecDeque<String>>>,
178    /// Joined when the `ActiveTunnel` is dropped.
179    pub parser_thread: Option<JoinHandle<()>>,
180    /// Set to true on `ActiveTunnel` drop so the parser thread can exit
181    /// promptly even if stderr is still readable. The pipe close from
182    /// `child.kill()` is the primary signal; this is a belt-and-braces.
183    pub parser_stop: Arc<AtomicBool>,
184
185    /// Rolling history of bytes per second received (downstream from
186    /// remote to laptop). Same bucket layout as `opens_history`.
187    pub rx_history: [u64; HISTORY_BUCKETS],
188    /// Rolling history of bytes per second sent (upstream from laptop
189    /// to remote).
190    pub tx_history: [u64; HISTORY_BUCKETS],
191    /// Aggregated bytes-per-second across every connected client. Set
192    /// from the per-peer lsof samples in `TunnelState::poll` so this
193    /// value matches what the roster's per-peer rows report.
194    pub current_rx_bps: u64,
195    pub current_tx_bps: u64,
196    /// Peak observed since tunnel start. Tracks the running max of
197    /// `current_rx_bps` / `current_tx_bps` across polls.
198    pub peak_rx_bps: u64,
199    pub peak_tx_bps: u64,
200    /// Wall-clock of the most recent throughput aggregation that saw
201    /// at least one sampled peer. `None` until the first sample
202    /// arrives — UI shows `sampling…` in that gap.
203    pub last_throughput_at: Option<Instant>,
204}
205
206impl TunnelLiveState {
207    pub fn new(started_at: Instant) -> Self {
208        Self {
209            events: VecDeque::with_capacity(MAX_EVENTS),
210            opens_history: [0u8; HISTORY_BUCKETS],
211            history_last_rotate: started_at,
212            peak_concurrent: 0,
213            total_opens: 0,
214            last_event_at: None,
215            active_channels: 0,
216            channel_open: HashMap::new(),
217            last_exit: None,
218            stderr_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_BUFFER_LINES))),
219            parser_thread: None,
220            parser_stop: Arc::new(AtomicBool::new(false)),
221            rx_history: [0u64; HISTORY_BUCKETS],
222            tx_history: [0u64; HISTORY_BUCKETS],
223            current_rx_bps: 0,
224            current_tx_bps: 0,
225            peak_rx_bps: 0,
226            peak_tx_bps: 0,
227            last_throughput_at: None,
228        }
229    }
230
231    /// Record an open or close event from the stderr parser thread.
232    /// Updates counters and the bounded ringbuffer. The rolling
233    /// `opens_history` is fed by `sample_activity` rather than by
234    /// individual events: the sparkline tracks ongoing concurrency, not
235    /// just channel-open bursts. End-user channels (Direct, Forwarded,
236    /// Dynamic) bump `total_opens` and `peak_concurrent`; ssh-internal
237    /// listeners and master channels stay in the ringbuffer for
238    /// diagnostics only.
239    pub fn record_event(&mut self, mut event: ChannelEvent) {
240        self.rotate_if_due(event.at);
241        match event.kind {
242            ChannelEventKind::Open => {
243                let kind = event.channel_kind.unwrap_or(ChannelKind::Other);
244                if kind.is_user_visible() {
245                    self.total_opens = self.total_opens.saturating_add(1);
246                    self.active_channels = self.active_channels.saturating_add(1);
247                    self.peak_concurrent = self.peak_concurrent.max(self.active_channels);
248                }
249                self.channel_open.insert(event.channel_id, (event.at, kind));
250            }
251            ChannelEventKind::Close => {
252                if let Some((opened_at, kind)) = self.channel_open.remove(&event.channel_id) {
253                    event.opened_at = Some(opened_at);
254                    event.channel_kind = Some(kind);
255                    if kind.is_user_visible() {
256                        self.active_channels = self.active_channels.saturating_sub(1);
257                    }
258                }
259            }
260        }
261        self.last_event_at = Some(event.at);
262        if self.events.len() == MAX_EVENTS {
263            self.events.pop_front();
264        }
265        self.events.push_back(event);
266        log::debug!(
267            "[purple] Tunnel live event: total_opens={} active={} peak={}",
268            self.total_opens,
269            self.active_channels,
270            self.peak_concurrent
271        );
272    }
273
274    /// Advance the rolling history if at least one bucket-width has
275    /// elapsed since the previous rotate. Per-elapsed-bucket we shift
276    /// left by one and push a fresh `0` at the right (now) edge.
277    /// Rotates `opens_history`, `rx_history` and `tx_history` together
278    /// so the three sparklines stay aligned in the UI.
279    pub fn rotate_if_due(&mut self, now: Instant) {
280        let elapsed = now.saturating_duration_since(self.history_last_rotate);
281        let ticks = elapsed.as_secs() / BUCKET_SECS;
282        if ticks == 0 {
283            return;
284        }
285        let shift = (ticks as usize).min(HISTORY_BUCKETS);
286        if shift >= HISTORY_BUCKETS {
287            self.opens_history.fill(0);
288            self.rx_history.fill(0);
289            self.tx_history.fill(0);
290        } else {
291            self.opens_history.rotate_left(shift);
292            for slot in self.opens_history.iter_mut().rev().take(shift) {
293                *slot = 0;
294            }
295            self.rx_history.rotate_left(shift);
296            for slot in self.rx_history.iter_mut().rev().take(shift) {
297                *slot = 0;
298            }
299            self.tx_history.rotate_left(shift);
300            for slot in self.tx_history.iter_mut().rev().take(shift) {
301                *slot = 0;
302            }
303        }
304        self.history_last_rotate += Duration::from_secs(ticks * BUCKET_SECS);
305    }
306
307    /// Write the peak concurrent client count for the current bucket.
308    /// Called once per `TunnelState::poll` after the lsof snapshot has
309    /// been drained, with `concurrent` = `max(lsof_clients_for_alias,
310    /// active_channels)`. Uses `max` rather than `+=` so repeated polls
311    /// inside one bucket converge on the bucket's peak rather than
312    /// double-counting.
313    pub fn sample_activity(&mut self, concurrent: u32) {
314        let sample = u8::try_from(concurrent).unwrap_or(u8::MAX);
315        if let Some(last) = self.opens_history.last_mut() {
316            *last = (*last).max(sample);
317        }
318    }
319}
320
321/// Message sent from a per-tunnel parser thread to the main loop drain.
322#[derive(Debug, Clone)]
323pub struct ParserMessage {
324    pub alias: String,
325    pub event: ChannelEvent,
326}
327
328/// Snapshot emitted by the lsof poller every poll cycle. Keys are the
329/// tunnel bind ports.
330#[derive(Debug, Clone)]
331pub struct LsofMessage {
332    pub at: Instant,
333    pub clients: HashMap<u16, Vec<ClientPeer>>,
334    pub conflicts: HashMap<u16, PortConflict>,
335}
336
337impl LsofMessage {
338    pub fn empty(at: Instant) -> Self {
339        Self {
340            at,
341            clients: HashMap::new(),
342            conflicts: HashMap::new(),
343        }
344    }
345}
346
347/// Public wrapper around an in-flight lsof poller thread.
348pub struct LsofPollerHandle {
349    pub stop: Arc<AtomicBool>,
350    pub bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
351    pub thread: Option<JoinHandle<()>>,
352}
353
354impl LsofPollerHandle {
355    pub fn shutdown(&mut self) {
356        self.stop.store(true, Ordering::Relaxed);
357        if let Some(handle) = self.thread.take() {
358            let _ = handle.join();
359        }
360    }
361}
362
363/// Spawn the per-tunnel stderr parser thread. Reads the child's stderr
364/// line-by-line, captures channel events into `tx` and stores the last
365/// few raw lines in `stderr_buffer` for exit-reason display.
366pub fn spawn_parser_thread(
367    stderr: ChildStderr,
368    alias: String,
369    tx: Sender<ParserMessage>,
370    stderr_buffer: Arc<Mutex<VecDeque<String>>>,
371    stop: Arc<AtomicBool>,
372) -> JoinHandle<()> {
373    thread::Builder::new()
374        .name(format!("purple-tunnel-parser-{alias}"))
375        .spawn(move || {
376            let reader = BufReader::new(stderr);
377            for line in reader.lines() {
378                if stop.load(Ordering::Relaxed) {
379                    break;
380                }
381                let Ok(line) = line else { break };
382                if let Ok(mut buf) = stderr_buffer.lock() {
383                    if buf.len() == STDERR_BUFFER_LINES {
384                        buf.pop_front();
385                    }
386                    buf.push_back(line.clone());
387                }
388                if let Some(event) = parse_channel_line(&line) {
389                    let msg = ParserMessage {
390                        alias: alias.clone(),
391                        event,
392                    };
393                    if tx.send(msg).is_err() {
394                        break;
395                    }
396                }
397            }
398            log::debug!("[purple] Tunnel parser thread exit: alias={alias}");
399        })
400        .expect("spawn purple-tunnel-parser thread")
401}
402
403/// Parse one stderr line. Recognises both OpenSSH log formats:
404///
405/// Modern (8.x+, 9.x, 10.x): `channel N: new <ctype> [<rname>] ...`.
406/// ctype sits BEFORE the brackets and the brackets carry the remote
407/// name; the `(inactive timeout: T)` suffix added in 9.x is ignored.
408///
409/// Legacy/test: `channel N: new [<ctype>]`. Kept for backwards
410/// compatibility with test fixtures and any patched ssh build that
411/// emits the older shape.
412///
413/// Both formats also produce `channel N: free: ...` for Close.
414/// All other lines return `None`.
415pub fn parse_channel_line(line: &str) -> Option<ChannelEvent> {
416    let trimmed = line.trim_start();
417    let rest = trimmed.strip_prefix("debug1: channel ")?;
418    let (id_str, after) = rest.split_once(':')?;
419    let channel_id: u32 = id_str.trim().parse().ok()?;
420    let after = after.trim_start();
421    let (kind, channel_kind) = if let Some(after_new) = after.strip_prefix("new") {
422        let after_new = after_new.trim_start();
423        let ctype = if let Some(rest) = after_new.strip_prefix('[') {
424            // Legacy `new [<ctype>]` — ctype is inside the brackets.
425            rest.split_once(']').map(|(t, _)| t.trim().to_string())
426        } else {
427            // Modern `new <ctype> [<rname>] (…)` — first whitespace-
428            // delimited token is the ctype; everything after is the
429            // remote name plus optional parenthesised attributes.
430            after_new
431                .split_whitespace()
432                .next()
433                .map(|s| s.to_string())
434                .filter(|s| !s.is_empty())
435        };
436        let chan_kind = ctype.as_deref().map(ChannelKind::from_bracket);
437        (ChannelEventKind::Open, chan_kind)
438    } else if after.starts_with("free") {
439        (ChannelEventKind::Close, None)
440    } else {
441        return None;
442    };
443    Some(ChannelEvent {
444        at: Instant::now(),
445        channel_id,
446        kind,
447        channel_kind,
448        opened_at: None,
449    })
450}
451
452/// Spawn the shared lsof poller. Polls `lsof -iTCP -P -n` every 2s,
453/// filters by the bind ports passed via `bind_ports`, and emits a
454/// `LsofMessage` per poll. macOS + Linux only — purple is unix-only.
455#[cfg(any(target_os = "macos", target_os = "linux"))]
456pub fn spawn_lsof_poller(
457    bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
458    tx: Sender<LsofMessage>,
459    stop: Arc<AtomicBool>,
460) -> JoinHandle<()> {
461    thread::Builder::new()
462        .name("purple-tunnel-lsof".into())
463        .spawn(move || {
464            // Track first-seen times per (port, src) so the CLIENTS card
465            // can render a real "age" value across polls.
466            let mut first_seen: HashMap<(u16, String), Instant> = HashMap::new();
467            // Cache responsible-pid lookups across polls so the same client
468            // does not pay the FFI cost every 2s tick.
469            let mut responsible_cache = ResponsibleAppCache::default();
470            // Per-peer throughput history. Key matches `first_seen` so a
471            // peer that goes away and comes back resets cleanly.
472            let mut peer_state: HashMap<(u16, String), PeerSampleCache> = HashMap::new();
473            while !stop.load(Ordering::Relaxed) {
474                let ports: Vec<(String, u16, u32)> = match bind_ports.lock() {
475                    Ok(g) => g.clone(),
476                    Err(p) => p.into_inner().clone(),
477                };
478                if ports.is_empty() {
479                    thread::sleep(Duration::from_millis(500));
480                    continue;
481                }
482                let now = Instant::now();
483                let mut msg = run_lsof_once(&ports, &mut first_seen, now);
484                annotate_responsible_apps(&mut msg, &mut responsible_cache);
485                annotate_peer_throughput(&mut msg, &mut peer_state, now);
486                if tx.send(msg).is_err() {
487                    break;
488                }
489                for _ in 0..20 {
490                    if stop.load(Ordering::Relaxed) {
491                        break;
492                    }
493                    thread::sleep(Duration::from_millis(100));
494                }
495            }
496            log::debug!("[purple] Tunnel lsof poller thread exit");
497        })
498        .expect("spawn purple-tunnel-lsof thread")
499}
500
501/// Stub lsof poller for non-unix builds. Purple does not officially
502/// support Windows but the cfg keeps the codebase compilable.
503#[cfg(not(any(target_os = "macos", target_os = "linux")))]
504pub fn spawn_lsof_poller(
505    _bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
506    _tx: Sender<LsofMessage>,
507    stop: Arc<AtomicBool>,
508) -> JoinHandle<()> {
509    thread::spawn(move || {
510        while !stop.load(Ordering::Relaxed) {
511            thread::sleep(Duration::from_millis(500));
512        }
513    })
514}
515
516// macOS-only FFI to the kernel-tracked "responsible pid" for a process.
517// `responsibility_get_pid_responsible_for_pid` is exported by libsystem
518// (libquarantine) and used by Activity Monitor and the TCC subsystem.
519// Stable across macOS releases and reachable without root or special
520// entitlements for processes owned by the calling user. Returns the
521// pid that "speaks for" `pid` when the kernel attributes resource use,
522// sandbox decisions or TCC prompts. That mapping is what lets us label
523// a `WebKit.Networking` daemon as `Safari`, or a shell/`psql` started
524// inside Ghostty as `Ghostty`. Returns `0` or a negative value when no
525// responsibility chain exists (most system daemons), and `pid` itself
526// when the process is its own responsible (terminal apps, foreground
527// GUI apps).
528#[cfg(target_os = "macos")]
529unsafe extern "C" {
530    fn responsibility_get_pid_responsible_for_pid(pid: libc::pid_t) -> libc::pid_t;
531}
532
533/// Resolve a client pid to a user-facing app name via the macOS
534/// responsibility API. Returns `None` when:
535///   * the lookup failed (eg. process exited or kernel returned <= 0),
536///   * the responsible pid equals the client pid (self-responsible —
537///     the process name from lsof is already the right label), or
538///   * the responsible pid maps to the same name as the client process
539///     (no extra signal to add).
540#[cfg(target_os = "macos")]
541fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
542    // SAFETY: the FFI takes a pid_t by value and returns one. No memory
543    // is exchanged. Calling with an exited pid is well-defined: the
544    // kernel returns -1 / 0 which we treat as "no responsible".
545    let rpid = unsafe { responsibility_get_pid_responsible_for_pid(pid as libc::pid_t) };
546    if rpid <= 0 {
547        return None;
548    }
549    if rpid as u32 == pid {
550        return None;
551    }
552    let name = process_name(rpid as u32)?;
553    if name.eq_ignore_ascii_case(client_process) {
554        return None;
555    }
556    Some(name)
557}
558
559/// Look up the bare process name for a pid via libproc's `proc_name`,
560/// which gives the truncated `comm` value (matches what `ps -o comm=`
561/// returns without the full executable path). Used to label the
562/// responsible-pid in the CLIENTS card.
563#[cfg(target_os = "macos")]
564fn process_name(pid: u32) -> Option<String> {
565    unsafe extern "C" {
566        fn proc_name(pid: libc::c_int, buffer: *mut libc::c_void, buffersize: u32) -> libc::c_int;
567    }
568    let mut buf = [0u8; 256];
569    // SAFETY: `buf` is a valid writeable region of `buf.len()` bytes.
570    // `proc_name` writes a NUL-terminated string into it and returns
571    // the byte count. Negative or zero means failure / unknown pid.
572    let n = unsafe {
573        proc_name(
574            pid as libc::c_int,
575            buf.as_mut_ptr().cast(),
576            buf.len() as u32,
577        )
578    };
579    if n <= 0 {
580        return None;
581    }
582    let bytes = &buf[..(n as usize).min(buf.len())];
583    let s = std::str::from_utf8(bytes).ok()?.trim_end_matches('\0');
584    if s.is_empty() {
585        None
586    } else {
587        Some(beautify_process(s))
588    }
589}
590
591/// Linux: walk to the process's session leader via `/proc/PID/stat`
592/// and use its comm as the user-facing app label. The session leader
593/// is the process whose pid equals the session id; for shell-spawned
594/// tools that leader is typically the terminal (`ghostty`, `konsole`,
595/// `gnome-terminal-`); for GUI clients it is the app itself
596/// (`dbeaver`, `firefox`, `code`). Generic — any process ancestry
597/// rooted in a user session works without a hardcoded app list.
598///
599/// Returns `None` when the process exited mid-poll, when the session
600/// leader equals the client pid (already its own app), or when the
601/// leader's comm matches the client process name (no extra signal).
602#[cfg(target_os = "linux")]
603fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
604    let session_id = read_session_leader(pid)?;
605    if session_id == pid {
606        return None;
607    }
608    let comm = std::fs::read_to_string(format!("/proc/{}/comm", session_id)).ok()?;
609    let name = comm.trim();
610    if name.is_empty() || name.eq_ignore_ascii_case(client_process) {
611        return None;
612    }
613    Some(beautify_process(name))
614}
615
616/// Parse `/proc/PID/stat` and return field 5 (the session id). The
617/// `comm` field is parenthesized and may contain spaces, so we slice
618/// at the LAST `)` and tokenize what follows: `state ppid pgid sid ...`
619/// → field index 3 of the post-comm slice is the session id.
620#[cfg(target_os = "linux")]
621fn read_session_leader(pid: u32) -> Option<u32> {
622    let stat = std::fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
623    let close = stat.rfind(')')?;
624    let after = stat[close + 1..].trim();
625    let fields: Vec<&str> = after.split_whitespace().collect();
626    fields.get(3).and_then(|s| s.parse().ok())
627}
628
629#[cfg(not(any(target_os = "macos", target_os = "linux")))]
630fn lookup_responsible_app(_pid: u32, _client_process: &str) -> Option<String> {
631    None
632}
633
634/// Cache of `pid -> Option<responsible_app>` so the lsof poller does not
635/// re-resolve the same connection on every 2s tick. Keyed on pid; a
636/// stale entry is at worst a stale label until the pid is reused, which
637/// is acceptable for an observability surface. Size-bounded by the
638/// per-port client cap times a generous fan-out — in practice the lsof
639/// pid set turns over slowly.
640#[derive(Default)]
641struct ResponsibleAppCache {
642    map: HashMap<u32, Option<String>>,
643}
644
645impl ResponsibleAppCache {
646    fn resolve(&mut self, pid: u32, client_process: &str) -> Option<String> {
647        if let Some(cached) = self.map.get(&pid) {
648            return cached.clone();
649        }
650        let resolved = lookup_responsible_app(pid, client_process);
651        self.map.insert(pid, resolved.clone());
652        resolved
653    }
654
655    /// Drop entries for pids no longer present in `live`, so the cache
656    /// cannot grow unbounded across many short-lived clients.
657    fn retain_pids(&mut self, live: &std::collections::HashSet<u32>) {
658        self.map.retain(|pid, _| live.contains(pid));
659    }
660}
661
662/// Walk every client peer in `msg` and fill in `responsible_app` from
663/// the macOS responsibility API. Cache hits are O(1); cache misses pay
664/// one FFI plus one libproc call per pid. After resolution the cache is
665/// trimmed to the still-live set of pids so it cannot grow unbounded.
666fn annotate_responsible_apps(msg: &mut LsofMessage, cache: &mut ResponsibleAppCache) {
667    let mut live: std::collections::HashSet<u32> = std::collections::HashSet::new();
668    for peers in msg.clients.values_mut() {
669        for peer in peers.iter_mut() {
670            live.insert(peer.pid);
671            peer.responsible_app = cache.resolve(peer.pid, &peer.process);
672        }
673    }
674    cache.retain_pids(&live);
675}
676
677/// Per-peer sampling state retained across lsof polls. Holds the last
678/// observed cumulative byte counters so the next poll can derive a
679/// bytes-per-second rate. The braille sparkline history is owned by
680/// `TunnelState::peer_viz` on the main thread (ticked at 100ms).
681#[derive(Debug, Clone, Default)]
682struct PeerSampleCache {
683    last_rcvd: u64,
684    last_sent: u64,
685    last_at: Option<Instant>,
686}
687
688/// Per-peer sample shape. Linux returns cumulative byte counters that
689/// the diff path turns into a rate. macOS returns per-pid bps deltas
690/// directly because the underlying nettop call already emits a 1s
691/// delta sample. The annotator picks the right path per peer.
692#[derive(Debug, Default)]
693struct PerPeerSamples {
694    /// Cumulative byte counters per `(pid, local_port)` from `ss` on
695    /// Linux. Keyed precisely so each socket has its own row even when
696    /// a process owns several sockets through the tunnel.
697    per_socket_cumulative: HashMap<(u32, u16), (u64, u64)>,
698    /// Bytes-per-second deltas per pid from `nettop` on macOS. The
699    /// kernel does not expose per-socket counters here, so per-pid is
700    /// the finest-grained truth available — adequate for the typical
701    /// "one app, one tunnel client" case.
702    per_pid_bps: HashMap<u32, (u64, u64)>,
703}
704
705/// Walk every client peer in `msg`, run a per-platform sampler, and
706/// fill in `current_rx_bps`, `current_tx_bps`, and the cumulative
707/// `bytes_rcvd`/`bytes_sent` where available. Stale cache entries are
708/// pruned to the still-live peer set so the map cannot grow unbounded.
709/// The braille sparkline history is owned by `TunnelState::peer_viz`
710/// on the main thread; this function only refreshes the bps readout
711/// that the main thread feeds into the rolling 100ms-tick history.
712fn annotate_peer_throughput(
713    msg: &mut LsofMessage,
714    cache: &mut HashMap<(u16, String), PeerSampleCache>,
715    now: Instant,
716) {
717    let samples = sample_peer_throughput();
718    let mut live: std::collections::HashSet<(u16, String)> = std::collections::HashSet::new();
719
720    for (port, peers) in msg.clients.iter_mut() {
721        for peer in peers.iter_mut() {
722            let key = (*port, peer.src.clone());
723            live.insert(key.clone());
724            let entry = cache.entry(key).or_default();
725            let src_port = src_port_from(&peer.src);
726
727            // Path A: Linux per-socket cumulative counters → diff against cache.
728            let cumulative =
729                src_port.and_then(|p| samples.per_socket_cumulative.get(&(peer.pid, p)).copied());
730            if let Some((rcvd, sent)) = cumulative {
731                if let Some(prev_at) = entry.last_at {
732                    let dt = now.saturating_duration_since(prev_at).as_secs_f64();
733                    if dt > 0.0 {
734                        let rx_bps = ((rcvd.saturating_sub(entry.last_rcvd)) as f64 / dt) as u64;
735                        let tx_bps = ((sent.saturating_sub(entry.last_sent)) as f64 / dt) as u64;
736                        peer.current_rx_bps = rx_bps;
737                        peer.current_tx_bps = tx_bps;
738                    }
739                }
740                entry.last_rcvd = rcvd;
741                entry.last_sent = sent;
742                entry.last_at = Some(now);
743                peer.bytes_rcvd = Some(rcvd);
744                peer.bytes_sent = Some(sent);
745                peer.last_sample_at = Some(now);
746                continue;
747            }
748
749            // Path B: macOS per-pid bps directly — no diff math needed.
750            if let Some((rx_bps, tx_bps)) = samples.per_pid_bps.get(&peer.pid).copied() {
751                peer.current_rx_bps = rx_bps;
752                peer.current_tx_bps = tx_bps;
753                entry.last_at = Some(now);
754                peer.last_sample_at = Some(now);
755            }
756        }
757    }
758
759    cache.retain(|key, _| live.contains(key));
760}
761
762/// Parse `ss -H -t -i -n -p state established` output into per-socket
763/// cumulative byte counters keyed by `(pid, local_port)`. The full
764/// parser is not implemented yet, so this returns an empty map and the
765/// caller falls back to status-only display on Linux. The Linux
766/// throughput renderer therefore exercises the not-yet-throughput-ready
767/// branch until a real parser lands.
768#[cfg(target_os = "linux")]
769fn parse_ss_per_socket(_input: &str) -> HashMap<(u32, u16), (u64, u64)> {
770    HashMap::new()
771}
772
773/// Run the per-platform peer-throughput sampler. On Linux this calls
774/// `ss` once and returns per-socket cumulative byte counters. On macOS
775/// it spawns a short `nettop` subprocess that yields one 1-second
776/// delta sample per pid. Returns an empty struct on unsupported
777/// platforms or when the underlying tool fails — the caller falls
778/// back to status-only display.
779fn sample_peer_throughput() -> PerPeerSamples {
780    let mut out = PerPeerSamples::default();
781    #[cfg(target_os = "linux")]
782    {
783        let output = Command::new("ss")
784            .args(["-H", "-t", "-i", "-n", "-p", "state", "established"])
785            .stdin(Stdio::null())
786            .stderr(Stdio::null())
787            .output();
788        if let Ok(o) = output {
789            if o.status.success() {
790                out.per_socket_cumulative =
791                    parse_ss_per_socket(&String::from_utf8_lossy(&o.stdout));
792            }
793        }
794    }
795    #[cfg(target_os = "macos")]
796    {
797        out.per_pid_bps = sample_nettop_per_pid_macos();
798    }
799    out
800}
801
802/// Spawn `nettop -P -L 2 -d -x -s 1` to capture one 1-second delta
803/// sample per process and parse it into a `pid -> (rx_bps, tx_bps)`
804/// map. Blocking call: returns after ~1.2 seconds. Falls back to an
805/// empty map if the subprocess fails to spawn or parse.
806///
807/// `-d` enables delta mode (each emitted sample is the diff over the
808/// previous interval). `-L 2` exits after two samples. The first
809/// sample is cumulative-since-process-start (we drop it); the second
810/// is the 1-second delta we want.
811#[cfg(target_os = "macos")]
812fn sample_nettop_per_pid_macos() -> HashMap<u32, (u64, u64)> {
813    use std::time::Duration;
814    let output = Command::new("/usr/bin/nettop")
815        .args(["-P", "-d", "-x", "-s", "1", "-L", "2"])
816        .stdin(Stdio::null())
817        .stderr(Stdio::null())
818        .output();
819    let text = match output {
820        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).into_owned(),
821        _ => return HashMap::new(),
822    };
823    // Discard the warmup-sample block (everything before the second
824    // header) and parse the delta block. `nettop` re-emits the
825    // `time,proc.pid,...` header before each new sample, so the second
826    // header line marks the start of the delta block.
827    let mut blocks: Vec<&str> = Vec::new();
828    let mut start = 0;
829    for (i, line) in text.lines().enumerate() {
830        if line.starts_with("time,") {
831            blocks.push(&text[start..]);
832            start = text.lines().take(i).map(|l| l.len() + 1).sum();
833        }
834    }
835    let _ = Duration::ZERO; // silence unused import on platforms with no Duration use
836    let delta_block = if text.matches("\ntime,").count() >= 1 {
837        // Find the second header line and slice from there.
838        let first = text.find("time,").unwrap_or(0);
839        let second_rel = text[first + 5..].find("\ntime,");
840        match second_rel {
841            Some(off) => &text[first + 5 + off + 1..],
842            None => &text[first..],
843        }
844    } else {
845        text.as_str()
846    };
847    let mut out: HashMap<u32, (u64, u64)> = HashMap::new();
848    for line in delta_block.lines() {
849        if let Some((pid, rx, tx)) = parse_nettop_csv_row_per_pid(line) {
850            // nettop -d sample interval is 1s (`-s 1`), so the bytes
851            // in this row are bytes-per-second already.
852            let entry = out.entry(pid).or_insert((0, 0));
853            entry.0 = entry.0.saturating_add(rx);
854            entry.1 = entry.1.saturating_add(tx);
855        }
856    }
857    out
858}
859
860/// Parse one `nettop -P -d -x` CSV row into `(pid, rx_bytes, tx_bytes)`.
861/// Returns `None` for header rows, blanks, or rows with too few
862/// columns. Mirrors `parse_nettop_csv_row` but does not filter on a
863/// specific pid. macOS-only because the only caller is the nettop
864/// sampler.
865#[cfg(target_os = "macos")]
866fn parse_nettop_csv_row_per_pid(line: &str) -> Option<(u32, u64, u64)> {
867    let line = line.trim();
868    if line.is_empty() || line.starts_with("time,") {
869        return None;
870    }
871    let cols: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
872    if cols.len() < 6 {
873        return None;
874    }
875    let proc_pid = cols[1];
876    let dot = proc_pid.rfind('.')?;
877    let pid_str = &proc_pid[dot + 1..];
878    let pid: u32 = pid_str.parse().ok()?;
879    let rx: u64 = cols[4].parse().ok()?;
880    let tx: u64 = cols[5].parse().ok()?;
881    Some((pid, rx, tx))
882}
883
884/// Extract the port portion of an `addr:port` socket string.
885fn src_port_from(src: &str) -> Option<u16> {
886    src.rsplit_once(':').and_then(|(_, port)| port.parse().ok())
887}
888
889#[cfg(any(target_os = "macos", target_os = "linux"))]
890fn run_lsof_once(
891    ports: &[(String, u16, u32)],
892    first_seen: &mut HashMap<(u16, String), Instant>,
893    now: Instant,
894) -> LsofMessage {
895    // `+c 0` widens the COMMAND column to the full process name.
896    // macOS defaults to 9 characters which collapses every Apple
897    // framework process down to `com.apple`; Linux defaults to 15.
898    // Neither is useful in the CLIENTS card.
899    let output = Command::new("lsof")
900        .args(["-iTCP", "-P", "-n", "-w", "+c", "0"])
901        .output();
902    let stdout = match output {
903        Ok(o) if o.status.success() || !o.stdout.is_empty() => o.stdout,
904        _ => return LsofMessage::empty(now),
905    };
906    let text = String::from_utf8_lossy(&stdout);
907    parse_lsof_output(&text, ports, first_seen, now)
908}
909
910/// Parse `lsof -iTCP -P -n` output into a `LsofMessage`. Public for tests.
911pub fn parse_lsof_output(
912    text: &str,
913    ports: &[(String, u16, u32)],
914    first_seen: &mut HashMap<(u16, String), Instant>,
915    now: Instant,
916) -> LsofMessage {
917    let mut clients: HashMap<u16, Vec<ClientPeer>> = HashMap::new();
918    let mut conflicts: HashMap<u16, PortConflict> = HashMap::new();
919    let bind_ports: Vec<u16> = ports.iter().map(|(_, p, _)| *p).collect();
920    let tunnel_pids: Vec<u32> = ports.iter().map(|(_, _, pid)| *pid).collect();
921
922    for line in text.lines().skip(1) {
923        let row = match parse_lsof_row(line) {
924            Some(r) => r,
925            None => continue,
926        };
927        // LISTEN rows owned by another process on a tunnel bind port → conflict.
928        if row.is_listen && bind_ports.contains(&row.local_port) && !tunnel_pids.contains(&row.pid)
929        {
930            conflicts
931                .entry(row.local_port)
932                .or_insert_with(|| PortConflict {
933                    port: row.local_port,
934                    process: row.command.clone(),
935                    pid: row.pid,
936                });
937            continue;
938        }
939        if row.is_listen {
940            continue;
941        }
942        // ESTABLISHED rows where the client process owns a socket whose
943        // remote port is one of our bind ports. The client process != ssh.
944        if let Some(remote_port) = row.remote_port {
945            if bind_ports.contains(&remote_port) && !tunnel_pids.contains(&row.pid) {
946                let src = row.local_addr_port().unwrap_or_else(|| "?".to_string());
947                let key = (remote_port, src.clone());
948                let since = *first_seen.entry(key).or_insert(now);
949                let entry = clients.entry(remote_port).or_default();
950                if entry.len() >= MAX_CLIENTS_PER_PORT {
951                    continue;
952                }
953                entry.push(ClientPeer {
954                    src,
955                    process: beautify_process(&row.command),
956                    pid: row.pid,
957                    since,
958                    responsible_app: None,
959                    current_rx_bps: 0,
960                    current_tx_bps: 0,
961                    bytes_rcvd: None,
962                    bytes_sent: None,
963                    last_sample_at: None,
964                });
965            }
966        }
967    }
968    // Drop first-seen entries for sockets we no longer see, so their age
969    // counters do not grow unbounded across reconnects.
970    let live: std::collections::HashSet<(u16, String)> = clients
971        .iter()
972        .flat_map(|(port, peers)| peers.iter().map(move |p| (*port, p.src.clone())))
973        .collect();
974    first_seen.retain(|key, _| live.contains(key));
975
976    LsofMessage {
977        at: now,
978        clients,
979        conflicts,
980    }
981}
982
983/// One row of `lsof -iTCP -P -n` output, fields we care about.
984#[derive(Debug)]
985struct LsofRow {
986    command: String,
987    pid: u32,
988    is_listen: bool,
989    local_addr: String,
990    local_port: u16,
991    remote_addr: Option<String>,
992    remote_port: Option<u16>,
993}
994
995impl LsofRow {
996    fn local_addr_port(&self) -> Option<String> {
997        if let (Some(addr), Some(port)) = (self.remote_addr.as_deref(), self.remote_port) {
998            // The CLIENTS card shows the *peer* of our tunnel bind port,
999            // i.e. the side connecting in. From the client process row,
1000            // remote = our loopback bind. So the peer to show is the row's
1001            // local end.
1002            let _ = (addr, port);
1003        }
1004        Some(format!("{}:{}", self.local_addr, self.local_port))
1005    }
1006}
1007
1008fn parse_lsof_row(line: &str) -> Option<LsofRow> {
1009    if line.trim().is_empty() {
1010        return None;
1011    }
1012    // lsof columns are whitespace-separated. Format:
1013    //   COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1014    // NAME may contain spaces only inside parens (state). The address
1015    // tokens never contain spaces because we pass `-n` and `-P`.
1016    let mut fields = line.split_whitespace();
1017    let command = fields.next()?.to_string();
1018    let pid: u32 = fields.next()?.parse().ok()?;
1019    let _user = fields.next()?;
1020    let _fd = fields.next()?;
1021    let _ty = fields.next()?;
1022    let _dev = fields.next()?;
1023    let _size = fields.next()?;
1024    let _node = fields.next()?;
1025    let name = fields.next()?;
1026    let state = fields.next();
1027    if !name.contains(':') {
1028        return None;
1029    }
1030    let is_listen = matches!(state, Some(s) if s.contains("LISTEN"));
1031    let is_established = matches!(state, Some(s) if s.contains("ESTABLISHED"));
1032    if !is_listen && !is_established {
1033        return None;
1034    }
1035    let (local, remote) = match name.split_once("->") {
1036        Some((l, r)) => (l, Some(r)),
1037        None => (name, None),
1038    };
1039    let (local_addr, local_port) = split_addr_port(local)?;
1040    let (remote_addr, remote_port) = match remote {
1041        Some(r) => match split_addr_port(r) {
1042            Some((a, p)) => (Some(a), Some(p)),
1043            None => (None, None),
1044        },
1045        None => (None, None),
1046    };
1047    Some(LsofRow {
1048        command,
1049        pid,
1050        is_listen,
1051        local_addr,
1052        local_port,
1053        remote_addr,
1054        remote_port,
1055    })
1056}
1057
1058/// Trim noise that lsof reports for macOS Apple-framework processes
1059/// so the CLIENTS card shows useful identifiers. Examples:
1060///   `com.apple.WebKit.Networking` → `WebKit.Networking`
1061///   `com.apple.Safari`            → `Safari`
1062///   `curl`                        → `curl`
1063/// Other names pass through unchanged.
1064pub fn beautify_process(raw: &str) -> String {
1065    if let Some(rest) = raw.strip_prefix("com.apple.") {
1066        if !rest.is_empty() {
1067            return rest.to_string();
1068        }
1069    }
1070    raw.to_string()
1071}
1072
1073/// Split a `host:port` token. Handles bracketed IPv6: `[::1]:8080`.
1074fn split_addr_port(s: &str) -> Option<(String, u16)> {
1075    if let Some(rest) = s.strip_prefix('[') {
1076        let end = rest.find(']')?;
1077        let addr = &rest[..end];
1078        let after = &rest[end + 1..];
1079        let port_str = after.strip_prefix(':')?;
1080        let port: u16 = port_str.parse().ok()?;
1081        return Some((addr.to_string(), port));
1082    }
1083    let colon = s.rfind(':')?;
1084    let addr = &s[..colon];
1085    let port: u16 = s[colon + 1..].parse().ok()?;
1086    Some((addr.to_string(), port))
1087}
1088
1089// ---------------------------------------------------------------------------
1090// Snapshot path for demo and visual regression tests.
1091// ---------------------------------------------------------------------------
1092
1093/// Deterministic snapshot of a tunnel's live state. Used in `--demo` and
1094/// in visual regression tests so goldens do not depend on wall-clock or
1095/// background workers.
1096#[derive(Debug, Clone)]
1097#[allow(dead_code)]
1098pub struct TunnelLiveSnapshot {
1099    pub uptime_secs: u64,
1100    pub active_channels: u32,
1101    pub peak_concurrent: u32,
1102    pub total_opens: u32,
1103    pub idle_secs: u64,
1104    pub rx_history: [u64; HISTORY_BUCKETS],
1105    pub tx_history: [u64; HISTORY_BUCKETS],
1106    pub current_rx_bps: u64,
1107    pub current_tx_bps: u64,
1108    pub peak_rx_bps: u64,
1109    pub peak_tx_bps: u64,
1110    /// True when the throughput aggregator has produced at least one
1111    /// sample. UI shows `sampling…` until then.
1112    pub throughput_ready: bool,
1113    pub clients: Vec<DisplayClient>,
1114    pub events: Vec<DisplayEvent>,
1115    /// Currently-open channels at snapshot time. Each entry is
1116    /// `(channel_id, open_age_secs, kind)`. Drives the channel
1117    /// lifeline swimlane in the detail panel.
1118    pub currently_open: Vec<(u32, u64, ChannelKind)>,
1119    pub conflict: Option<PortConflict>,
1120    pub last_exit: Option<(i32, String)>,
1121}
1122
1123#[derive(Debug, Clone)]
1124pub struct DisplayClient {
1125    /// Source `addr:port` of the connected client. Reserved for the
1126    /// expanded clients-list view; the heartbeat-dial dashboard does
1127    /// not show it inline.
1128    #[allow(dead_code)]
1129    pub src: String,
1130    pub process: String,
1131    pub age_secs: u64,
1132    /// Connecting process PID. Reserved for the expanded clients-list
1133    /// view.
1134    #[allow(dead_code)]
1135    pub pid: u32,
1136    /// User-facing app that owns this socket on macOS. `None` if equal
1137    /// to `process` or unavailable. See `ClientPeer::responsible_app`.
1138    pub responsible_app: Option<String>,
1139    /// Per-client throughput readouts. Zero when no sample has been
1140    /// taken yet, or when the platform sampler does not produce
1141    /// per-client data.
1142    pub current_rx_bps: u64,
1143    pub current_tx_bps: u64,
1144    /// Rolling history of combined rx+tx bytes-per-second. Cell 0 is
1145    /// the oldest sample, the last cell is "now".
1146    pub viz_history: [u64; PEER_VIZ_BUCKETS],
1147    /// True after the per-client sampler has produced at least one
1148    /// sample. Lets the renderer distinguish "0 B/s, idle" from
1149    /// "no sampler available".
1150    pub throughput_ready: bool,
1151}
1152
1153#[derive(Debug, Clone)]
1154#[allow(dead_code)]
1155pub struct DisplayEvent {
1156    pub age_secs: u64,
1157    /// Surfaced through the runtime path for completeness; the
1158    /// renderer currently does not show channel ids because the
1159    /// numbers are noisy without context. Reserved for future use.
1160    #[allow(dead_code)]
1161    pub channel_id: u32,
1162    pub kind: ChannelEventKind,
1163    /// Filtered to `Direct | Forwarded | Dynamic` — internal channels
1164    /// are dropped by the UI before this struct is built, so this is
1165    /// always one of the user-visible kinds.
1166    pub channel_kind: ChannelKind,
1167    /// Open→close duration when known. Reserved for the expanded
1168    /// events-list view.
1169    #[allow(dead_code)]
1170    pub duration_secs: Option<u64>,
1171    /// Number of co-occurring events of the same kind/age that were
1172    /// folded into this row. `1` means "just one event"; higher values
1173    /// render as a `(3x)` suffix.
1174    pub count: u32,
1175}
1176
1177// ---------------------------------------------------------------------------
1178// Tests.
1179// ---------------------------------------------------------------------------
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184
1185    fn user_open(channel_id: u32, at: Instant) -> ChannelEvent {
1186        ChannelEvent {
1187            at,
1188            channel_id,
1189            kind: ChannelEventKind::Open,
1190            channel_kind: Some(ChannelKind::Direct),
1191            opened_at: None,
1192        }
1193    }
1194
1195    #[test]
1196    fn parse_channel_open_simple() {
1197        let ev = parse_channel_line("debug1: channel 0: new [direct-tcpip]").unwrap();
1198        assert_eq!(ev.channel_id, 0);
1199        assert_eq!(ev.kind, ChannelEventKind::Open);
1200        assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1201    }
1202
1203    #[test]
1204    fn parse_channel_open_records_listener_kind() {
1205        let ev = parse_channel_line("debug1: channel 1: new [port listener]").unwrap();
1206        assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1207    }
1208
1209    #[test]
1210    fn parse_channel_open_records_dynamic_kind() {
1211        let ev = parse_channel_line("debug1: channel 4: new [dynamic-tcpip]").unwrap();
1212        assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1213    }
1214
1215    #[test]
1216    fn parse_channel_close_simple() {
1217        let ev = parse_channel_line("debug1: channel 12: free: blah blah").unwrap();
1218        assert_eq!(ev.channel_id, 12);
1219        assert_eq!(ev.kind, ChannelEventKind::Close);
1220        // Close lines do not carry the kind; the recorder fills it in
1221        // from the open-channel map.
1222        assert_eq!(ev.channel_kind, None);
1223    }
1224
1225    #[test]
1226    fn parse_channel_with_leading_whitespace() {
1227        let ev = parse_channel_line("   debug1: channel 5: new [forwarded-tcpip]").unwrap();
1228        assert_eq!(ev.channel_id, 5);
1229        assert_eq!(ev.kind, ChannelEventKind::Open);
1230        assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1231    }
1232
1233    #[test]
1234    fn parse_channel_modern_openssh_format_with_inactive_timeout() {
1235        // OpenSSH 10.x: `channel N: new <ctype> [<rname>] (inactive timeout: T)`.
1236        // The ctype sits BEFORE the brackets; the brackets hold the
1237        // remote endpoint name (which we ignore). Without this branch
1238        // the parser silently set channel_kind=None and counters never
1239        // advanced past zero on real-world OpenSSH ≥ 9.x.
1240        let ev = parse_channel_line(
1241            "debug1: channel 3: new direct-tcpip [127.0.0.1:54321] (inactive timeout: 0)",
1242        )
1243        .unwrap();
1244        assert_eq!(ev.channel_id, 3);
1245        assert_eq!(ev.kind, ChannelEventKind::Open);
1246        assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1247    }
1248
1249    #[test]
1250    fn parse_channel_modern_openssh_format_forwarded() {
1251        let ev = parse_channel_line(
1252            "debug1: channel 7: new forwarded-tcpip [10.0.0.1:443] (inactive timeout: 0)",
1253        )
1254        .unwrap();
1255        assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1256    }
1257
1258    #[test]
1259    fn parse_channel_modern_openssh_format_dynamic() {
1260        let ev = parse_channel_line("debug1: channel 9: new dynamic-tcpip [client] (timeout: 5)")
1261            .unwrap();
1262        assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1263    }
1264
1265    #[test]
1266    fn parse_channel_modern_openssh_format_internal_listener_is_other() {
1267        // The local-port listener that ssh creates at startup uses
1268        // ctype "port-listener" or "listener" depending on version.
1269        // Either way it must NOT be promoted to a user-visible kind.
1270        let ev = parse_channel_line(
1271            "debug1: channel 0: new port-listener [::1:8080] (inactive timeout: 0)",
1272        )
1273        .unwrap();
1274        assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1275    }
1276
1277    #[test]
1278    fn parse_channel_unrelated_line_returns_none() {
1279        assert!(parse_channel_line("debug1: client_input_global_request").is_none());
1280        assert!(parse_channel_line("not even ssh output").is_none());
1281        assert!(parse_channel_line("debug1: channel abc: new").is_none());
1282        assert!(parse_channel_line("debug1: channel 1: confirm").is_none());
1283    }
1284
1285    #[test]
1286    fn record_event_open_increments_counters_for_user_visible_kinds() {
1287        let now = Instant::now();
1288        let mut state = TunnelLiveState::new(now);
1289        state.record_event(user_open(1, now));
1290        assert_eq!(state.total_opens, 1);
1291        assert_eq!(state.active_channels, 1);
1292        assert_eq!(state.peak_concurrent, 1);
1293        // Activity history is fed by sample_activity, not record_event.
1294        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1295    }
1296
1297    #[test]
1298    fn record_event_skips_counters_for_internal_channels() {
1299        // Listener / session / mux-master channels should NOT inflate
1300        // peak_concurrent or total_opens — only user-visible traffic
1301        // counts towards the activity figures.
1302        let now = Instant::now();
1303        let mut state = TunnelLiveState::new(now);
1304        state.record_event(ChannelEvent {
1305            at: now,
1306            channel_id: 0,
1307            kind: ChannelEventKind::Open,
1308            channel_kind: Some(ChannelKind::Other),
1309            opened_at: None,
1310        });
1311        assert_eq!(state.total_opens, 0);
1312        assert_eq!(state.active_channels, 0);
1313        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1314        // The event is still kept in the ringbuffer for diagnostics.
1315        assert_eq!(state.events.len(), 1);
1316    }
1317
1318    #[test]
1319    fn sample_activity_writes_peak_into_current_bucket() {
1320        let now = Instant::now();
1321        let mut state = TunnelLiveState::new(now);
1322        state.sample_activity(2);
1323        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1324        // A lower sample inside the same bucket must not erase the peak.
1325        state.sample_activity(1);
1326        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1327        // A higher sample raises the peak.
1328        state.sample_activity(5);
1329        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 5);
1330    }
1331
1332    #[test]
1333    fn sample_activity_clamps_to_u8_max() {
1334        let now = Instant::now();
1335        let mut state = TunnelLiveState::new(now);
1336        state.sample_activity(u32::MAX);
1337        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], u8::MAX);
1338    }
1339
1340    #[test]
1341    fn record_event_close_pairs_with_open_for_duration() {
1342        let t0 = Instant::now();
1343        let t1 = t0 + Duration::from_secs(5);
1344        let mut state = TunnelLiveState::new(t0);
1345        state.record_event(user_open(7, t0));
1346        state.record_event(ChannelEvent {
1347            at: t1,
1348            channel_id: 7,
1349            kind: ChannelEventKind::Close,
1350            channel_kind: None,
1351            opened_at: None,
1352        });
1353        assert_eq!(state.active_channels, 0);
1354        let last = state.events.back().unwrap();
1355        assert_eq!(last.kind, ChannelEventKind::Close);
1356        assert_eq!(last.opened_at, Some(t0));
1357        // The close event picks up the kind that was recorded on open.
1358        assert_eq!(last.channel_kind, Some(ChannelKind::Direct));
1359    }
1360
1361    #[test]
1362    fn record_event_caps_ringbuffer_at_max() {
1363        let now = Instant::now();
1364        let mut state = TunnelLiveState::new(now);
1365        for i in 0..(MAX_EVENTS as u32 + 5) {
1366            state.record_event(user_open(i, now));
1367        }
1368        assert_eq!(state.events.len(), MAX_EVENTS);
1369    }
1370
1371    #[test]
1372    fn rotate_if_due_shifts_buckets_per_tick() {
1373        let t0 = Instant::now();
1374        let mut state = TunnelLiveState::new(t0);
1375        state.opens_history[HISTORY_BUCKETS - 1] = 7;
1376        state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS));
1377        // After one rotate the value moved one slot left.
1378        assert_eq!(state.opens_history[HISTORY_BUCKETS - 2], 7);
1379        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1380    }
1381
1382    #[test]
1383    fn rotate_if_due_clamps_at_full_window() {
1384        let t0 = Instant::now();
1385        let mut state = TunnelLiveState::new(t0);
1386        state.opens_history[HISTORY_BUCKETS - 1] = 9;
1387        // Far beyond the window — rotation should clear it entirely.
1388        state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS * HISTORY_BUCKETS as u64 * 4));
1389        assert!(state.opens_history.iter().all(|&v| v == 0));
1390    }
1391
1392    #[test]
1393    fn rotate_if_due_noop_within_one_bucket() {
1394        let t0 = Instant::now();
1395        let mut state = TunnelLiveState::new(t0);
1396        state.opens_history[HISTORY_BUCKETS - 1] = 3;
1397        // Less than one BUCKET_SECS elapsed — no rotation should happen.
1398        state.rotate_if_due(t0 + Duration::from_millis(BUCKET_SECS * 1000 / 2));
1399        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 3);
1400    }
1401
1402    #[test]
1403    fn parse_lsof_listen_row() {
1404        let line = "ssh    12345 user 3u IPv4 0xabc 0t0 TCP 127.0.0.1:8080 (LISTEN)";
1405        let row = parse_lsof_row(line).unwrap();
1406        assert_eq!(row.command, "ssh");
1407        assert_eq!(row.pid, 12345);
1408        assert!(row.is_listen);
1409        assert_eq!(row.local_addr, "127.0.0.1");
1410        assert_eq!(row.local_port, 8080);
1411        assert!(row.remote_port.is_none());
1412    }
1413
1414    #[test]
1415    fn parse_lsof_established_row() {
1416        let line =
1417            "curl   23456 user 4u IPv4 0xdef 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)";
1418        let row = parse_lsof_row(line).unwrap();
1419        assert_eq!(row.command, "curl");
1420        assert_eq!(row.pid, 23456);
1421        assert!(!row.is_listen);
1422        assert_eq!(row.local_port, 54321);
1423        assert_eq!(row.remote_port, Some(8080));
1424    }
1425
1426    #[test]
1427    fn parse_lsof_other_states_skipped() {
1428        // CLOSE_WAIT, TIME_WAIT, etc. are not interesting for our display.
1429        let line = "x 1 u 0u IPv4 0 0t0 TCP 1.2.3.4:1->5.6.7.8:9 (CLOSE_WAIT)";
1430        assert!(parse_lsof_row(line).is_none());
1431    }
1432
1433    #[test]
1434    fn parse_lsof_output_finds_clients_for_bind_port() {
1435        let txt = "\
1436COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1437ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1438curl   23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1439ssh    12345 u 5u IPv4 0xc 0t0 TCP 127.0.0.1:8080->127.0.0.1:54321 (ESTABLISHED)
1440";
1441        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1442        let mut seen = HashMap::new();
1443        let now = Instant::now();
1444        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1445        let peers = msg.clients.get(&8080).expect("clients on 8080");
1446        assert_eq!(peers.len(), 1);
1447        assert_eq!(peers[0].process, "curl");
1448        assert_eq!(peers[0].pid, 23456);
1449        assert!(peers[0].src.contains("54321"));
1450        assert!(msg.conflicts.is_empty());
1451    }
1452
1453    #[test]
1454    fn parse_lsof_output_detects_port_conflict() {
1455        let txt = "\
1456COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1457nginx  99999 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1458";
1459        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1460        let mut seen = HashMap::new();
1461        let now = Instant::now();
1462        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1463        let conflict = msg.conflicts.get(&8080).expect("conflict on 8080");
1464        assert_eq!(conflict.process, "nginx");
1465        assert_eq!(conflict.pid, 99999);
1466    }
1467
1468    #[test]
1469    fn parse_lsof_output_skips_own_listen() {
1470        let txt = "\
1471COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1472ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1473";
1474        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1475        let mut seen = HashMap::new();
1476        let now = Instant::now();
1477        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1478        assert!(msg.conflicts.is_empty());
1479    }
1480
1481    #[test]
1482    fn parse_lsof_output_first_seen_persists_across_polls() {
1483        let txt = "\
1484COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1485ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1486curl   23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1487";
1488        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1489        let mut seen = HashMap::new();
1490        let t0 = Instant::now();
1491        let msg1 = parse_lsof_output(txt, &ports, &mut seen, t0);
1492        let t1 = t0 + Duration::from_secs(5);
1493        let msg2 = parse_lsof_output(txt, &ports, &mut seen, t1);
1494        let p1 = &msg1.clients[&8080][0];
1495        let p2 = &msg2.clients[&8080][0];
1496        assert_eq!(p1.since, p2.since, "first_seen should be sticky");
1497    }
1498
1499    #[test]
1500    fn split_addr_port_handles_ipv6_brackets() {
1501        let (a, p) = split_addr_port("[::1]:8080").unwrap();
1502        assert_eq!(a, "::1");
1503        assert_eq!(p, 8080);
1504    }
1505
1506    #[test]
1507    fn split_addr_port_handles_ipv4() {
1508        let (a, p) = split_addr_port("127.0.0.1:8080").unwrap();
1509        assert_eq!(a, "127.0.0.1");
1510        assert_eq!(p, 8080);
1511    }
1512
1513    #[test]
1514    fn beautify_process_strips_com_apple_prefix() {
1515        assert_eq!(
1516            beautify_process("com.apple.WebKit.Networking"),
1517            "WebKit.Networking"
1518        );
1519        assert_eq!(beautify_process("com.apple.Safari"), "Safari");
1520    }
1521
1522    #[test]
1523    fn beautify_process_passes_other_names_through_unchanged() {
1524        assert_eq!(beautify_process("curl"), "curl");
1525        assert_eq!(beautify_process("nginx"), "nginx");
1526        assert_eq!(beautify_process("python3"), "python3");
1527    }
1528
1529    #[test]
1530    fn beautify_process_does_not_strip_when_only_prefix() {
1531        // Edge case: the bare `com.apple.` string would otherwise
1532        // collapse to "" and disappear from the card. Keep the raw
1533        // value so the user at least sees that lsof reported something.
1534        assert_eq!(beautify_process("com.apple."), "com.apple.");
1535    }
1536
1537    #[test]
1538    fn parse_lsof_output_unwraps_apple_framework_names() {
1539        let txt = "\
1540COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1541ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1542com.apple.WebKit.Networking 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1543";
1544        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1545        let mut seen = HashMap::new();
1546        let now = Instant::now();
1547        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1548        let peers = msg.clients.get(&8080).expect("clients on 8080");
1549        assert_eq!(peers.len(), 1);
1550        // Process name is shown without the noisy `com.apple.` prefix.
1551        assert_eq!(peers[0].process, "WebKit.Networking");
1552    }
1553}