Skip to main content

purple_ssh/
tunnel_live.rs

1//! Live-data layer for active SSH tunnels. Two worker types:
2//!
3//! - One stderr-parser thread per tunnel that reads `ssh -v` output and
4//!   emits `ChannelEvent`s when ssh logs `debug1: channel N: new` and
5//!   `debug1: channel N: free` lines. Also keeps the last few stderr
6//!   lines so `poll_tunnels()` can surface a meaningful exit reason.
7//!
8//! - One shared lsof poller that runs `lsof -iTCP -P -n` every 2s,
9//!   filters by the active tunnel bind ports and emits `LsofMessage`
10//!   snapshots with connected clients and port conflicts.
11//!
12//! All communication is via `std::sync::mpsc` channels. No tokio.
13
14use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader};
16#[cfg(target_os = "macos")]
17use std::process::Stdio;
18use std::process::{ChildStderr, Command};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::mpsc::Sender;
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::{Duration, Instant};
24
25/// Maximum number of channel events kept in the per-tunnel ringbuffer.
26pub const MAX_EVENTS: usize = 50;
27/// Bucket width of the rolling activity history, in seconds. Two seconds
28/// matches the lsof poll interval, so the sparkline is fed at the same
29/// rate it is sampled. Each bucket carries the peak concurrent client
30/// count observed within its window.
31pub const BUCKET_SECS: u64 = 2;
32
33/// Number of buckets in the rolling activity history. Combined with
34/// `BUCKET_SECS` this is `HISTORY_BUCKETS * BUCKET_SECS = 300s ≈ 5 min`
35/// of history. Wide window so sparse traffic still has visible content
36/// across the chart. Bucket `HISTORY_BUCKETS - 1` is "now".
37pub const HISTORY_BUCKETS: usize = 150;
38/// Number of stderr lines kept per tunnel for exit-reason display.
39pub const STDERR_BUFFER_LINES: usize = 10;
40/// Maximum number of clients reported per bind port. The card only
41/// renders a handful and lsof can list hundreds of peers on a busy
42/// load balancer, so we cap to keep allocation bounded.
43pub const MAX_CLIENTS_PER_PORT: usize = 64;
44
45/// Channel-open or -close event observed in `ssh -v` stderr.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ChannelEventKind {
48    Open,
49    Close,
50}
51
52/// What kind of ssh channel this event belongs to. The bracketed token
53/// after `channel N: new` in `ssh -v` output ("port listener",
54/// "direct-tcpip", "dynamic-tcpip" etc.) maps onto these variants.
55///
56/// Only `Direct`, `Forwarded` and `Dynamic` represent end-user traffic;
57/// the rest are ssh-internal bookkeeping (the listener that binds the
58/// local port, mux master channels, agent forwarding) and are filtered
59/// out before rendering the EVENTS card.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ChannelKind {
62    /// `direct-tcpip` — outgoing client connection through a LocalForward.
63    Direct,
64    /// `forwarded-tcpip` — incoming client connection arriving via a
65    /// RemoteForward.
66    Forwarded,
67    /// `dynamic-tcpip` — single SOCKS request through a DynamicForward.
68    Dynamic,
69    /// Anything else (`port listener`, `client-session`, `mux-master`,
70    /// `auth-agent@openssh.com`, `x11`, …).
71    Other,
72}
73
74impl ChannelKind {
75    /// Parse the bracketed token from a `channel N: new [<type>]` line.
76    pub fn from_bracket(token: &str) -> Self {
77        match token {
78            "direct-tcpip" => Self::Direct,
79            "forwarded-tcpip" => Self::Forwarded,
80            "dynamic-tcpip" => Self::Dynamic,
81            _ => Self::Other,
82        }
83    }
84
85    /// True if this kind represents end-user traffic that should appear
86    /// in the EVENTS card.
87    pub fn is_user_visible(self) -> bool {
88        matches!(self, Self::Direct | Self::Forwarded | Self::Dynamic)
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct ChannelEvent {
94    pub at: Instant,
95    pub channel_id: u32,
96    pub kind: ChannelEventKind,
97    /// Bracketed channel type from the `new` line, or the kind that
98    /// was recorded when the channel opened (for `Close` events).
99    /// `None` if neither side could be parsed (defensive — the
100    /// regression suite expects every event to have a kind).
101    pub channel_kind: Option<ChannelKind>,
102    /// For `Close` events, the matching open time (so the UI can render
103    /// the channel duration). `None` if the open was missed.
104    pub opened_at: Option<Instant>,
105}
106
107/// Number of buckets in the per-client throughput history. The renderer
108/// uses `VIZ_TICK_MS` cadence (one shift per 100ms), so 12 cells covers
109/// the most recent ~1.2 seconds. The latest `current_rx_bps + current_tx_bps`
110/// reading is pushed into the rightmost cell every tick, which gives the
111/// braille wave continuous leftward motion at terminal frame rate even
112/// when underlying lsof samples land only every few seconds.
113pub const PEER_VIZ_BUCKETS: usize = 12;
114
115/// One peer observed by the lsof poller as connected to a forwarded port.
116#[derive(Debug, Clone)]
117pub struct ClientPeer {
118    pub src: String,
119    pub process: String,
120    /// Owner pid of the client socket. Surfaced in the CLIENTS card so
121    /// users can correlate a connection with a local process.
122    pub pid: u32,
123    pub since: Instant,
124    /// On macOS, the user-facing app that "owns" this socket according
125    /// to the kernel-tracked responsible-pid. Lets the CLIENTS card show
126    /// `Safari` for a `WebKit.Networking` XPC daemon, or `Ghostty` for
127    /// a `psql` started from a Ghostty terminal. `None` when responsible
128    /// pid resolution is unavailable, equals self, or the lookup failed.
129    pub responsible_app: Option<String>,
130    /// Most-recent rx/tx bytes-per-second sample, derived by diffing
131    /// per-socket cumulative byte counters between lsof polls. Zero
132    /// until a second sample arrives. macOS uses a per-pid fallback
133    /// when per-socket counters are not available.
134    pub current_rx_bps: u64,
135    pub current_tx_bps: u64,
136    /// Cumulative byte counters at the most recent sample. Used to diff
137    /// against the next sample. `None` until the first sample arrives.
138    pub bytes_rcvd: Option<u64>,
139    pub bytes_sent: Option<u64>,
140    /// Time of the most recent sample. `None` until the first sample.
141    pub last_sample_at: Option<Instant>,
142}
143
144/// Another process bound to the same port as our tunnel. Detected when
145/// lsof shows a LISTEN row on a tunnel bind port owned by a pid that
146/// is not our tunnel pid. Reserved for future port-conflict surfacing
147/// (see `app::tunnel_state::TunnelsState::conflicts`); the minimal
148/// option-A detail panel does not show conflicts inline.
149#[derive(Debug, Clone)]
150#[allow(dead_code)]
151pub struct PortConflict {
152    pub port: u16,
153    pub process: String,
154    pub pid: u32,
155}
156
157/// Per-tunnel live state. Lives on `ActiveTunnel` so a HashMap remove()
158/// drops the state and joins the parser thread (no zijkanaal map that
159/// could leak after `tunnels.active.remove()`).
160pub struct TunnelLiveState {
161    pub events: VecDeque<ChannelEvent>,
162    pub opens_history: [u8; HISTORY_BUCKETS],
163    /// Wall-clock time of the most recent rotate. Used by `rotate_if_due`
164    /// to advance the history buckets one slot per elapsed minute.
165    pub history_last_rotate: Instant,
166    pub peak_concurrent: u32,
167    pub total_opens: u32,
168    pub last_event_at: Option<Instant>,
169    pub active_channels: u32,
170    /// Map of currently-open channel id -> open time + kind, used to
171    /// compute duration and restore the kind when a `Close` arrives.
172    pub channel_open: HashMap<u32, (Instant, ChannelKind)>,
173    /// Filled by `poll_tunnels()` when the child exits unexpectedly.
174    pub last_exit: Option<(i32, String)>,
175    /// Last few stderr lines, written by the parser thread, read by
176    /// `poll_tunnels()` on exit to compose `last_exit`.
177    pub stderr_buffer: Arc<Mutex<VecDeque<String>>>,
178    /// Joined when the `ActiveTunnel` is dropped.
179    pub parser_thread: Option<JoinHandle<()>>,
180    /// Set to true on `ActiveTunnel` drop so the parser thread can exit
181    /// promptly even if stderr is still readable. The pipe close from
182    /// `child.kill()` is the primary signal; this is a belt-and-braces.
183    pub parser_stop: Arc<AtomicBool>,
184
185    /// Rolling history of bytes per second received (downstream from
186    /// remote to laptop). Same bucket layout as `opens_history`.
187    pub rx_history: [u64; HISTORY_BUCKETS],
188    /// Rolling history of bytes per second sent (upstream from laptop
189    /// to remote).
190    pub tx_history: [u64; HISTORY_BUCKETS],
191    /// Aggregated bytes-per-second across every connected client. Set
192    /// from the per-peer lsof samples in `TunnelState::poll` so this
193    /// value matches what the roster's per-peer rows report.
194    pub current_rx_bps: u64,
195    pub current_tx_bps: u64,
196    /// Peak observed since tunnel start. Tracks the running max of
197    /// `current_rx_bps` / `current_tx_bps` across polls.
198    pub peak_rx_bps: u64,
199    pub peak_tx_bps: u64,
200    /// Wall-clock of the most recent throughput aggregation that saw
201    /// at least one sampled peer. `None` until the first sample
202    /// arrives — UI shows `sampling…` in that gap.
203    pub last_throughput_at: Option<Instant>,
204}
205
206impl TunnelLiveState {
207    pub fn new(started_at: Instant) -> Self {
208        Self {
209            events: VecDeque::with_capacity(MAX_EVENTS),
210            opens_history: [0u8; HISTORY_BUCKETS],
211            history_last_rotate: started_at,
212            peak_concurrent: 0,
213            total_opens: 0,
214            last_event_at: None,
215            active_channels: 0,
216            channel_open: HashMap::new(),
217            last_exit: None,
218            stderr_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_BUFFER_LINES))),
219            parser_thread: None,
220            parser_stop: Arc::new(AtomicBool::new(false)),
221            rx_history: [0u64; HISTORY_BUCKETS],
222            tx_history: [0u64; HISTORY_BUCKETS],
223            current_rx_bps: 0,
224            current_tx_bps: 0,
225            peak_rx_bps: 0,
226            peak_tx_bps: 0,
227            last_throughput_at: None,
228        }
229    }
230
231    /// Record an open or close event from the stderr parser thread.
232    /// Updates counters and the bounded ringbuffer. The rolling
233    /// `opens_history` is fed by `sample_activity` rather than by
234    /// individual events: the sparkline tracks ongoing concurrency, not
235    /// just channel-open bursts. End-user channels (Direct, Forwarded,
236    /// Dynamic) bump `total_opens` and `peak_concurrent`; ssh-internal
237    /// listeners and master channels stay in the ringbuffer for
238    /// diagnostics only.
239    pub fn record_event(&mut self, mut event: ChannelEvent) {
240        self.rotate_if_due(event.at);
241        match event.kind {
242            ChannelEventKind::Open => {
243                let kind = event.channel_kind.unwrap_or(ChannelKind::Other);
244                if kind.is_user_visible() {
245                    self.total_opens = self.total_opens.saturating_add(1);
246                    self.active_channels = self.active_channels.saturating_add(1);
247                    self.peak_concurrent = self.peak_concurrent.max(self.active_channels);
248                }
249                self.channel_open.insert(event.channel_id, (event.at, kind));
250            }
251            ChannelEventKind::Close => {
252                if let Some((opened_at, kind)) = self.channel_open.remove(&event.channel_id) {
253                    event.opened_at = Some(opened_at);
254                    event.channel_kind = Some(kind);
255                    if kind.is_user_visible() {
256                        self.active_channels = self.active_channels.saturating_sub(1);
257                    }
258                }
259            }
260        }
261        self.last_event_at = Some(event.at);
262        if self.events.len() == MAX_EVENTS {
263            self.events.pop_front();
264        }
265        self.events.push_back(event);
266        log::debug!(
267            "[purple] Tunnel live event: total_opens={} active={} peak={}",
268            self.total_opens,
269            self.active_channels,
270            self.peak_concurrent
271        );
272    }
273
274    /// Advance the rolling history if at least one bucket-width has
275    /// elapsed since the previous rotate. Per-elapsed-bucket we shift
276    /// left by one and push a fresh `0` at the right (now) edge.
277    /// Rotates `opens_history`, `rx_history` and `tx_history` together
278    /// so the three sparklines stay aligned in the UI.
279    pub fn rotate_if_due(&mut self, now: Instant) {
280        let elapsed = now.saturating_duration_since(self.history_last_rotate);
281        let ticks = elapsed.as_secs() / BUCKET_SECS;
282        if ticks == 0 {
283            return;
284        }
285        let shift = (ticks as usize).min(HISTORY_BUCKETS);
286        if shift >= HISTORY_BUCKETS {
287            self.opens_history.fill(0);
288            self.rx_history.fill(0);
289            self.tx_history.fill(0);
290        } else {
291            self.opens_history.rotate_left(shift);
292            for slot in self.opens_history.iter_mut().rev().take(shift) {
293                *slot = 0;
294            }
295            self.rx_history.rotate_left(shift);
296            for slot in self.rx_history.iter_mut().rev().take(shift) {
297                *slot = 0;
298            }
299            self.tx_history.rotate_left(shift);
300            for slot in self.tx_history.iter_mut().rev().take(shift) {
301                *slot = 0;
302            }
303        }
304        self.history_last_rotate += Duration::from_secs(ticks * BUCKET_SECS);
305    }
306
307    /// Write the peak concurrent client count for the current bucket.
308    /// Called once per `TunnelState::poll` after the lsof snapshot has
309    /// been drained, with `concurrent` = `max(lsof_clients_for_alias,
310    /// active_channels)`. Uses `max` rather than `+=` so repeated polls
311    /// inside one bucket converge on the bucket's peak rather than
312    /// double-counting.
313    pub fn sample_activity(&mut self, concurrent: u32) {
314        let sample = u8::try_from(concurrent).unwrap_or(u8::MAX);
315        if let Some(last) = self.opens_history.last_mut() {
316            *last = (*last).max(sample);
317        }
318    }
319}
320
321/// Message sent from a per-tunnel parser thread to the main loop drain.
322#[derive(Debug, Clone)]
323pub struct ParserMessage {
324    pub alias: String,
325    pub event: ChannelEvent,
326}
327
328/// Snapshot emitted by the lsof poller every poll cycle. Keys are the
329/// tunnel bind ports.
330#[derive(Debug, Clone)]
331pub struct LsofMessage {
332    pub at: Instant,
333    pub clients: HashMap<u16, Vec<ClientPeer>>,
334    pub conflicts: HashMap<u16, PortConflict>,
335}
336
337impl LsofMessage {
338    pub fn empty(at: Instant) -> Self {
339        Self {
340            at,
341            clients: HashMap::new(),
342            conflicts: HashMap::new(),
343        }
344    }
345}
346
347/// Public wrapper around an in-flight lsof poller thread.
348pub struct LsofPollerHandle {
349    pub stop: Arc<AtomicBool>,
350    pub bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
351    pub thread: Option<JoinHandle<()>>,
352}
353
354impl LsofPollerHandle {
355    pub fn shutdown(&mut self) {
356        self.stop.store(true, Ordering::Relaxed);
357        if let Some(handle) = self.thread.take() {
358            let _ = handle.join();
359        }
360    }
361}
362
363/// Spawn the per-tunnel stderr parser thread. Reads the child's stderr
364/// line-by-line, captures channel events into `tx` and stores the last
365/// few raw lines in `stderr_buffer` for exit-reason display.
366pub fn spawn_parser_thread(
367    stderr: ChildStderr,
368    alias: String,
369    tx: Sender<ParserMessage>,
370    stderr_buffer: Arc<Mutex<VecDeque<String>>>,
371    stop: Arc<AtomicBool>,
372) -> JoinHandle<()> {
373    thread::Builder::new()
374        .name(format!("purple-tunnel-parser-{alias}"))
375        .spawn(move || {
376            let reader = BufReader::new(stderr);
377            for line in reader.lines() {
378                if stop.load(Ordering::Relaxed) {
379                    break;
380                }
381                let Ok(line) = line else { break };
382                if let Ok(mut buf) = stderr_buffer.lock() {
383                    if buf.len() == STDERR_BUFFER_LINES {
384                        buf.pop_front();
385                    }
386                    buf.push_back(line.clone());
387                }
388                if let Some(event) = parse_channel_line(&line) {
389                    let msg = ParserMessage {
390                        alias: alias.clone(),
391                        event,
392                    };
393                    if tx.send(msg).is_err() {
394                        break;
395                    }
396                }
397            }
398            log::debug!("[purple] Tunnel parser thread exit: alias={alias}");
399        })
400        .expect("spawn purple-tunnel-parser thread")
401}
402
403/// Parse one stderr line. Recognises both OpenSSH log formats:
404///
405/// Modern (8.x+, 9.x, 10.x): `channel N: new <ctype> [<rname>] ...`.
406/// ctype sits BEFORE the brackets and the brackets carry the remote
407/// name; the `(inactive timeout: T)` suffix added in 9.x is ignored.
408///
409/// Legacy/test: `channel N: new [<ctype>]`. Kept for backwards
410/// compatibility with test fixtures and any patched ssh build that
411/// emits the older shape.
412///
413/// Both formats also produce `channel N: free: ...` for Close.
414/// All other lines return `None`.
415pub fn parse_channel_line(line: &str) -> Option<ChannelEvent> {
416    let trimmed = line.trim_start();
417    let rest = trimmed.strip_prefix("debug1: channel ")?;
418    let (id_str, after) = rest.split_once(':')?;
419    let channel_id: u32 = id_str.trim().parse().ok()?;
420    let after = after.trim_start();
421    let (kind, channel_kind) = if let Some(after_new) = after.strip_prefix("new") {
422        let after_new = after_new.trim_start();
423        let ctype = if let Some(rest) = after_new.strip_prefix('[') {
424            // Legacy `new [<ctype>]` — ctype is inside the brackets.
425            rest.split_once(']').map(|(t, _)| t.trim().to_string())
426        } else {
427            // Modern `new <ctype> [<rname>] (…)` — first whitespace-
428            // delimited token is the ctype; everything after is the
429            // remote name plus optional parenthesised attributes.
430            after_new
431                .split_whitespace()
432                .next()
433                .map(|s| s.to_string())
434                .filter(|s| !s.is_empty())
435        };
436        let chan_kind = ctype.as_deref().map(ChannelKind::from_bracket);
437        (ChannelEventKind::Open, chan_kind)
438    } else if after.starts_with("free") {
439        (ChannelEventKind::Close, None)
440    } else {
441        return None;
442    };
443    Some(ChannelEvent {
444        at: Instant::now(),
445        channel_id,
446        kind,
447        channel_kind,
448        opened_at: None,
449    })
450}
451
452/// Spawn the shared lsof poller. Polls `lsof -iTCP -P -n` every 2s,
453/// filters by the bind ports passed via `bind_ports`, and emits a
454/// `LsofMessage` per poll. macOS + Linux only — purple is unix-only.
455#[cfg(any(target_os = "macos", target_os = "linux"))]
456pub fn spawn_lsof_poller(
457    bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
458    tx: Sender<LsofMessage>,
459    stop: Arc<AtomicBool>,
460) -> JoinHandle<()> {
461    thread::Builder::new()
462        .name("purple-tunnel-lsof".into())
463        .spawn(move || {
464            // Track first-seen times per (port, src) so the CLIENTS card
465            // can render a real "age" value across polls.
466            let mut first_seen: HashMap<(u16, String), Instant> = HashMap::new();
467            // Cache responsible-pid lookups across polls so the same client
468            // does not pay the FFI cost every 2s tick.
469            let mut responsible_cache = ResponsibleAppCache::default();
470            // Per-peer throughput history. Key matches `first_seen` so a
471            // peer that goes away and comes back resets cleanly.
472            let mut peer_state: HashMap<(u16, String), PeerSampleCache> = HashMap::new();
473            while !stop.load(Ordering::Relaxed) {
474                let ports: Vec<(String, u16, u32)> = match bind_ports.lock() {
475                    Ok(g) => g.clone(),
476                    Err(p) => p.into_inner().clone(),
477                };
478                if ports.is_empty() {
479                    thread::sleep(Duration::from_millis(500));
480                    continue;
481                }
482                let now = Instant::now();
483                let mut msg = run_lsof_once(&ports, &mut first_seen, now);
484                annotate_responsible_apps(&mut msg, &mut responsible_cache);
485                annotate_peer_throughput(&mut msg, &mut peer_state, now);
486                if tx.send(msg).is_err() {
487                    break;
488                }
489                for _ in 0..20 {
490                    if stop.load(Ordering::Relaxed) {
491                        break;
492                    }
493                    thread::sleep(Duration::from_millis(100));
494                }
495            }
496            log::debug!("[purple] Tunnel lsof poller thread exit");
497        })
498        .expect("spawn purple-tunnel-lsof thread")
499}
500
501/// Stub lsof poller for non-unix builds. Purple does not officially
502/// support Windows but the cfg keeps the codebase compilable.
503#[cfg(not(any(target_os = "macos", target_os = "linux")))]
504pub fn spawn_lsof_poller(
505    _bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
506    _tx: Sender<LsofMessage>,
507    stop: Arc<AtomicBool>,
508) -> JoinHandle<()> {
509    thread::spawn(move || {
510        while !stop.load(Ordering::Relaxed) {
511            thread::sleep(Duration::from_millis(500));
512        }
513    })
514}
515
516// macOS-only FFI to the kernel-tracked "responsible pid" for a process.
517// `responsibility_get_pid_responsible_for_pid` is exported by libsystem
518// (libquarantine) and used by Activity Monitor and the TCC subsystem.
519// Stable across macOS releases and reachable without root or special
520// entitlements for processes owned by the calling user. Returns the
521// pid that "speaks for" `pid` when the kernel attributes resource use,
522// sandbox decisions or TCC prompts. That mapping is what lets us label
523// a `WebKit.Networking` daemon as `Safari`, or a shell/`psql` started
524// inside Ghostty as `Ghostty`. Returns `0` or a negative value when no
525// responsibility chain exists (most system daemons), and `pid` itself
526// when the process is its own responsible (terminal apps, foreground
527// GUI apps).
528#[cfg(target_os = "macos")]
529unsafe extern "C" {
530    fn responsibility_get_pid_responsible_for_pid(pid: libc::pid_t) -> libc::pid_t;
531}
532
533/// Resolve a client pid to a user-facing app name via the macOS
534/// responsibility API. Returns `None` when:
535///   * the lookup failed (eg. process exited or kernel returned <= 0),
536///   * the responsible pid equals the client pid (self-responsible —
537///     the process name from lsof is already the right label), or
538///   * the responsible pid maps to the same name as the client process
539///     (no extra signal to add).
540#[cfg(target_os = "macos")]
541fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
542    // SAFETY: the FFI takes a pid_t by value and returns one. No memory
543    // is exchanged. Calling with an exited pid is well-defined: the
544    // kernel returns -1 / 0 which we treat as "no responsible".
545    let rpid = unsafe { responsibility_get_pid_responsible_for_pid(pid as libc::pid_t) };
546    if rpid <= 0 {
547        return None;
548    }
549    if rpid as u32 == pid {
550        return None;
551    }
552    let name = process_name(rpid as u32)?;
553    if name.eq_ignore_ascii_case(client_process) {
554        return None;
555    }
556    Some(name)
557}
558
559/// Look up the bare process name for a pid via libproc's `proc_name`,
560/// which gives the truncated `comm` value (matches what `ps -o comm=`
561/// returns without the full executable path). Used to label the
562/// responsible-pid in the CLIENTS card.
563#[cfg(target_os = "macos")]
564fn process_name(pid: u32) -> Option<String> {
565    unsafe extern "C" {
566        fn proc_name(pid: libc::c_int, buffer: *mut libc::c_void, buffersize: u32) -> libc::c_int;
567    }
568    let mut buf = [0u8; 256];
569    // SAFETY: `buf` is a valid writeable region of `buf.len()` bytes.
570    // `proc_name` writes a NUL-terminated string into it and returns
571    // the byte count. Negative or zero means failure / unknown pid.
572    let n = unsafe {
573        proc_name(
574            pid as libc::c_int,
575            buf.as_mut_ptr().cast(),
576            buf.len() as u32,
577        )
578    };
579    if n <= 0 {
580        return None;
581    }
582    let bytes = &buf[..(n as usize).min(buf.len())];
583    let s = std::str::from_utf8(bytes).ok()?.trim_end_matches('\0');
584    if s.is_empty() {
585        None
586    } else {
587        Some(beautify_process(s))
588    }
589}
590
591/// Linux: walk to the process's session leader via `/proc/PID/stat`
592/// and use its comm as the user-facing app label. The session leader
593/// is the process whose pid equals the session id; for shell-spawned
594/// tools that leader is typically the terminal (`ghostty`, `konsole`,
595/// `gnome-terminal-`); for GUI clients it is the app itself
596/// (`dbeaver`, `firefox`, `code`). Generic — any process ancestry
597/// rooted in a user session works without a hardcoded app list.
598///
599/// Returns `None` when the process exited mid-poll, when the session
600/// leader equals the client pid (already its own app), or when the
601/// leader's comm matches the client process name (no extra signal).
602#[cfg(target_os = "linux")]
603fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
604    let session_id = read_session_leader(pid)?;
605    if session_id == pid {
606        return None;
607    }
608    let comm = std::fs::read_to_string(format!("/proc/{}/comm", session_id)).ok()?;
609    let name = comm.trim();
610    if name.is_empty() || name.eq_ignore_ascii_case(client_process) {
611        return None;
612    }
613    Some(beautify_process(name))
614}
615
616/// Parse `/proc/PID/stat` and return field 5 (the session id). The
617/// `comm` field is parenthesized and may contain spaces, so we slice
618/// at the LAST `)` and tokenize what follows: `state ppid pgid sid ...`
619/// → field index 3 of the post-comm slice is the session id.
620#[cfg(target_os = "linux")]
621fn read_session_leader(pid: u32) -> Option<u32> {
622    let stat = std::fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
623    let close = stat.rfind(')')?;
624    let after = stat[close + 1..].trim();
625    let fields: Vec<&str> = after.split_whitespace().collect();
626    fields.get(3).and_then(|s| s.parse().ok())
627}
628
629#[cfg(not(any(target_os = "macos", target_os = "linux")))]
630fn lookup_responsible_app(_pid: u32, _client_process: &str) -> Option<String> {
631    None
632}
633
634/// Cache of `pid -> Option<responsible_app>` so the lsof poller does not
635/// re-resolve the same connection on every 2s tick. Keyed on pid; a
636/// stale entry is at worst a stale label until the pid is reused, which
637/// is acceptable for an observability surface. Size-bounded by the
638/// per-port client cap times a generous fan-out — in practice the lsof
639/// pid set turns over slowly.
640#[derive(Default)]
641struct ResponsibleAppCache {
642    map: HashMap<u32, Option<String>>,
643}
644
645impl ResponsibleAppCache {
646    fn resolve(&mut self, pid: u32, client_process: &str) -> Option<String> {
647        if let Some(cached) = self.map.get(&pid) {
648            return cached.clone();
649        }
650        let resolved = lookup_responsible_app(pid, client_process);
651        self.map.insert(pid, resolved.clone());
652        resolved
653    }
654
655    /// Drop entries for pids no longer present in `live`, so the cache
656    /// cannot grow unbounded across many short-lived clients.
657    fn retain_pids(&mut self, live: &std::collections::HashSet<u32>) {
658        self.map.retain(|pid, _| live.contains(pid));
659    }
660}
661
662/// Walk every client peer in `msg` and fill in `responsible_app` from
663/// the macOS responsibility API. Cache hits are O(1); cache misses pay
664/// one FFI plus one libproc call per pid. After resolution the cache is
665/// trimmed to the still-live set of pids so it cannot grow unbounded.
666fn annotate_responsible_apps(msg: &mut LsofMessage, cache: &mut ResponsibleAppCache) {
667    let mut live: std::collections::HashSet<u32> = std::collections::HashSet::new();
668    for peers in msg.clients.values_mut() {
669        for peer in peers.iter_mut() {
670            live.insert(peer.pid);
671            peer.responsible_app = cache.resolve(peer.pid, &peer.process);
672        }
673    }
674    cache.retain_pids(&live);
675}
676
677/// Per-peer sampling state retained across lsof polls. Holds the last
678/// observed cumulative byte counters so the next poll can derive a
679/// bytes-per-second rate. The braille sparkline history is owned by
680/// `TunnelState::peer_viz` on the main thread (ticked at 100ms).
681#[derive(Debug, Clone, Default)]
682struct PeerSampleCache {
683    last_rcvd: u64,
684    last_sent: u64,
685    last_at: Option<Instant>,
686}
687
688/// Per-peer sample shape. Linux returns cumulative byte counters that
689/// the diff path turns into a rate. macOS returns per-pid bps deltas
690/// directly because the underlying nettop call already emits a 1s
691/// delta sample. The annotator picks the right path per peer.
692#[derive(Debug, Default)]
693struct PerPeerSamples {
694    /// Cumulative byte counters per client local port from
695    /// `NETLINK_SOCK_DIAG` on Linux (see `crate::tcp_diag`). Netlink
696    /// returns per-socket data without pid attribution, so we key on
697    /// the local port alone and rely on the lsof-driven join in
698    /// `annotate_peer_throughput` to attribute counters back to the
699    /// owning client process.
700    per_local_port_cumulative: HashMap<u16, (u64, u64)>,
701    /// Bytes-per-second deltas per pid from `nettop` on macOS. The
702    /// kernel does not expose per-socket counters here, so per-pid is
703    /// the finest-grained truth available — adequate for the typical
704    /// "one app, one tunnel client" case.
705    per_pid_bps: HashMap<u32, (u64, u64)>,
706}
707
708/// Walk every client peer in `msg`, run a per-platform sampler, and
709/// fill in `current_rx_bps`, `current_tx_bps`, and the cumulative
710/// `bytes_rcvd`/`bytes_sent` where available. Stale cache entries are
711/// pruned to the still-live peer set so the map cannot grow unbounded.
712/// The braille sparkline history is owned by `TunnelState::peer_viz`
713/// on the main thread; this function only refreshes the bps readout
714/// that the main thread feeds into the rolling 100ms-tick history.
715fn annotate_peer_throughput(
716    msg: &mut LsofMessage,
717    cache: &mut HashMap<(u16, String), PeerSampleCache>,
718    now: Instant,
719) {
720    let samples = sample_peer_throughput();
721    let mut live: std::collections::HashSet<(u16, String)> = std::collections::HashSet::new();
722
723    for (port, peers) in msg.clients.iter_mut() {
724        for peer in peers.iter_mut() {
725            let key = (*port, peer.src.clone());
726            live.insert(key.clone());
727            let entry = cache.entry(key).or_default();
728            let src_port = src_port_from(&peer.src);
729
730            // Path A: Linux per-socket cumulative counters → diff against cache.
731            let cumulative =
732                src_port.and_then(|p| samples.per_local_port_cumulative.get(&p).copied());
733            if let Some((rcvd, sent)) = cumulative {
734                if let Some(prev_at) = entry.last_at {
735                    let dt = now.saturating_duration_since(prev_at).as_secs_f64();
736                    if dt > 0.0 {
737                        let rx_bps = ((rcvd.saturating_sub(entry.last_rcvd)) as f64 / dt) as u64;
738                        let tx_bps = ((sent.saturating_sub(entry.last_sent)) as f64 / dt) as u64;
739                        peer.current_rx_bps = rx_bps;
740                        peer.current_tx_bps = tx_bps;
741                    }
742                }
743                entry.last_rcvd = rcvd;
744                entry.last_sent = sent;
745                entry.last_at = Some(now);
746                peer.bytes_rcvd = Some(rcvd);
747                peer.bytes_sent = Some(sent);
748                peer.last_sample_at = Some(now);
749                continue;
750            }
751
752            // Path B: macOS per-pid bps directly — no diff math needed.
753            if let Some((rx_bps, tx_bps)) = samples.per_pid_bps.get(&peer.pid).copied() {
754                peer.current_rx_bps = rx_bps;
755                peer.current_tx_bps = tx_bps;
756                entry.last_at = Some(now);
757                peer.last_sample_at = Some(now);
758            }
759        }
760    }
761
762    cache.retain(|key, _| live.contains(key));
763}
764
765/// Run the per-platform peer-throughput sampler. On Linux this talks
766/// directly to the kernel via `NETLINK_SOCK_DIAG` (see `crate::tcp_diag`)
767/// and returns per-socket cumulative byte counters. On macOS it spawns
768/// a short `nettop` subprocess that yields one 1-second delta sample
769/// per pid. Returns an empty struct on unsupported platforms or when
770/// the underlying mechanism fails — the caller falls back to
771/// status-only display.
772fn sample_peer_throughput() -> PerPeerSamples {
773    let mut out = PerPeerSamples::default();
774    #[cfg(target_os = "linux")]
775    {
776        out.per_local_port_cumulative = crate::tcp_diag::sample_per_local_port();
777    }
778    #[cfg(target_os = "macos")]
779    {
780        out.per_pid_bps = sample_nettop_per_pid_macos();
781    }
782    out
783}
784
785/// Spawn `nettop -P -L 2 -d -x -s 1` to capture one 1-second delta
786/// sample per process and parse it into a `pid -> (rx_bps, tx_bps)`
787/// map. Blocking call: returns after ~1.2 seconds. Falls back to an
788/// empty map if the subprocess fails to spawn or parse.
789///
790/// `-d` enables delta mode (each emitted sample is the diff over the
791/// previous interval). `-L 2` exits after two samples. The first
792/// sample is cumulative-since-process-start (we drop it); the second
793/// is the 1-second delta we want.
794#[cfg(target_os = "macos")]
795fn sample_nettop_per_pid_macos() -> HashMap<u32, (u64, u64)> {
796    use std::time::Duration;
797    let output = Command::new("/usr/bin/nettop")
798        .args(["-P", "-d", "-x", "-s", "1", "-L", "2"])
799        .stdin(Stdio::null())
800        .stderr(Stdio::null())
801        .output();
802    let text = match output {
803        Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).into_owned(),
804        _ => return HashMap::new(),
805    };
806    // Discard the warmup-sample block (everything before the second
807    // header) and parse the delta block. `nettop` re-emits the
808    // `time,proc.pid,...` header before each new sample, so the second
809    // header line marks the start of the delta block.
810    let mut blocks: Vec<&str> = Vec::new();
811    let mut start = 0;
812    for (i, line) in text.lines().enumerate() {
813        if line.starts_with("time,") {
814            blocks.push(&text[start..]);
815            start = text.lines().take(i).map(|l| l.len() + 1).sum();
816        }
817    }
818    let _ = Duration::ZERO; // silence unused import on platforms with no Duration use
819    let delta_block = if text.matches("\ntime,").count() >= 1 {
820        // Find the second header line and slice from there.
821        let first = text.find("time,").unwrap_or(0);
822        let second_rel = text[first + 5..].find("\ntime,");
823        match second_rel {
824            Some(off) => &text[first + 5 + off + 1..],
825            None => &text[first..],
826        }
827    } else {
828        text.as_str()
829    };
830    let mut out: HashMap<u32, (u64, u64)> = HashMap::new();
831    for line in delta_block.lines() {
832        if let Some((pid, rx, tx)) = parse_nettop_csv_row_per_pid(line) {
833            // nettop -d sample interval is 1s (`-s 1`), so the bytes
834            // in this row are bytes-per-second already.
835            let entry = out.entry(pid).or_insert((0, 0));
836            entry.0 = entry.0.saturating_add(rx);
837            entry.1 = entry.1.saturating_add(tx);
838        }
839    }
840    out
841}
842
843/// Parse one `nettop -P -d -x` CSV row into `(pid, rx_bytes, tx_bytes)`.
844/// Returns `None` for header rows, blanks, or rows with too few
845/// columns. Mirrors `parse_nettop_csv_row` but does not filter on a
846/// specific pid. macOS-only because the only caller is the nettop
847/// sampler.
848#[cfg(target_os = "macos")]
849fn parse_nettop_csv_row_per_pid(line: &str) -> Option<(u32, u64, u64)> {
850    let line = line.trim();
851    if line.is_empty() || line.starts_with("time,") {
852        return None;
853    }
854    let cols: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
855    if cols.len() < 6 {
856        return None;
857    }
858    let proc_pid = cols[1];
859    let dot = proc_pid.rfind('.')?;
860    let pid_str = &proc_pid[dot + 1..];
861    let pid: u32 = pid_str.parse().ok()?;
862    let rx: u64 = cols[4].parse().ok()?;
863    let tx: u64 = cols[5].parse().ok()?;
864    Some((pid, rx, tx))
865}
866
867/// Extract the port portion of an `addr:port` socket string.
868fn src_port_from(src: &str) -> Option<u16> {
869    src.rsplit_once(':').and_then(|(_, port)| port.parse().ok())
870}
871
872#[cfg(any(target_os = "macos", target_os = "linux"))]
873fn run_lsof_once(
874    ports: &[(String, u16, u32)],
875    first_seen: &mut HashMap<(u16, String), Instant>,
876    now: Instant,
877) -> LsofMessage {
878    // `+c 0` widens the COMMAND column to the full process name.
879    // macOS defaults to 9 characters which collapses every Apple
880    // framework process down to `com.apple`; Linux defaults to 15.
881    // Neither is useful in the CLIENTS card.
882    let output = Command::new("lsof")
883        .args(["-iTCP", "-P", "-n", "-w", "+c", "0"])
884        .output();
885    let stdout = match output {
886        Ok(o) if o.status.success() || !o.stdout.is_empty() => o.stdout,
887        _ => return LsofMessage::empty(now),
888    };
889    let text = String::from_utf8_lossy(&stdout);
890    parse_lsof_output(&text, ports, first_seen, now)
891}
892
893/// Parse `lsof -iTCP -P -n` output into a `LsofMessage`. Public for tests.
894pub fn parse_lsof_output(
895    text: &str,
896    ports: &[(String, u16, u32)],
897    first_seen: &mut HashMap<(u16, String), Instant>,
898    now: Instant,
899) -> LsofMessage {
900    let mut clients: HashMap<u16, Vec<ClientPeer>> = HashMap::new();
901    let mut conflicts: HashMap<u16, PortConflict> = HashMap::new();
902    let bind_ports: Vec<u16> = ports.iter().map(|(_, p, _)| *p).collect();
903    let tunnel_pids: Vec<u32> = ports.iter().map(|(_, _, pid)| *pid).collect();
904
905    for line in text.lines().skip(1) {
906        let row = match parse_lsof_row(line) {
907            Some(r) => r,
908            None => continue,
909        };
910        // LISTEN rows owned by another process on a tunnel bind port → conflict.
911        if row.is_listen && bind_ports.contains(&row.local_port) && !tunnel_pids.contains(&row.pid)
912        {
913            conflicts
914                .entry(row.local_port)
915                .or_insert_with(|| PortConflict {
916                    port: row.local_port,
917                    process: row.command.clone(),
918                    pid: row.pid,
919                });
920            continue;
921        }
922        if row.is_listen {
923            continue;
924        }
925        // ESTABLISHED rows where the client process owns a socket whose
926        // remote port is one of our bind ports. The client process != ssh.
927        if let Some(remote_port) = row.remote_port {
928            if bind_ports.contains(&remote_port) && !tunnel_pids.contains(&row.pid) {
929                let src = row.local_addr_port().unwrap_or_else(|| "?".to_string());
930                let key = (remote_port, src.clone());
931                let since = *first_seen.entry(key).or_insert(now);
932                let entry = clients.entry(remote_port).or_default();
933                if entry.len() >= MAX_CLIENTS_PER_PORT {
934                    continue;
935                }
936                entry.push(ClientPeer {
937                    src,
938                    process: beautify_process(&row.command),
939                    pid: row.pid,
940                    since,
941                    responsible_app: None,
942                    current_rx_bps: 0,
943                    current_tx_bps: 0,
944                    bytes_rcvd: None,
945                    bytes_sent: None,
946                    last_sample_at: None,
947                });
948            }
949        }
950    }
951    // Drop first-seen entries for sockets we no longer see, so their age
952    // counters do not grow unbounded across reconnects.
953    let live: std::collections::HashSet<(u16, String)> = clients
954        .iter()
955        .flat_map(|(port, peers)| peers.iter().map(move |p| (*port, p.src.clone())))
956        .collect();
957    first_seen.retain(|key, _| live.contains(key));
958
959    LsofMessage {
960        at: now,
961        clients,
962        conflicts,
963    }
964}
965
966/// One row of `lsof -iTCP -P -n` output, fields we care about.
967#[derive(Debug)]
968struct LsofRow {
969    command: String,
970    pid: u32,
971    is_listen: bool,
972    local_addr: String,
973    local_port: u16,
974    remote_addr: Option<String>,
975    remote_port: Option<u16>,
976}
977
978impl LsofRow {
979    fn local_addr_port(&self) -> Option<String> {
980        if let (Some(addr), Some(port)) = (self.remote_addr.as_deref(), self.remote_port) {
981            // The CLIENTS card shows the *peer* of our tunnel bind port,
982            // i.e. the side connecting in. From the client process row,
983            // remote = our loopback bind. So the peer to show is the row's
984            // local end.
985            let _ = (addr, port);
986        }
987        Some(format!("{}:{}", self.local_addr, self.local_port))
988    }
989}
990
991fn parse_lsof_row(line: &str) -> Option<LsofRow> {
992    if line.trim().is_empty() {
993        return None;
994    }
995    // lsof columns are whitespace-separated. Format:
996    //   COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
997    // NAME may contain spaces only inside parens (state). The address
998    // tokens never contain spaces because we pass `-n` and `-P`.
999    let mut fields = line.split_whitespace();
1000    let command = fields.next()?.to_string();
1001    let pid: u32 = fields.next()?.parse().ok()?;
1002    let _user = fields.next()?;
1003    let _fd = fields.next()?;
1004    let _ty = fields.next()?;
1005    let _dev = fields.next()?;
1006    let _size = fields.next()?;
1007    let _node = fields.next()?;
1008    let name = fields.next()?;
1009    let state = fields.next();
1010    if !name.contains(':') {
1011        return None;
1012    }
1013    let is_listen = matches!(state, Some(s) if s.contains("LISTEN"));
1014    let is_established = matches!(state, Some(s) if s.contains("ESTABLISHED"));
1015    if !is_listen && !is_established {
1016        return None;
1017    }
1018    let (local, remote) = match name.split_once("->") {
1019        Some((l, r)) => (l, Some(r)),
1020        None => (name, None),
1021    };
1022    let (local_addr, local_port) = split_addr_port(local)?;
1023    let (remote_addr, remote_port) = match remote {
1024        Some(r) => match split_addr_port(r) {
1025            Some((a, p)) => (Some(a), Some(p)),
1026            None => (None, None),
1027        },
1028        None => (None, None),
1029    };
1030    Some(LsofRow {
1031        command,
1032        pid,
1033        is_listen,
1034        local_addr,
1035        local_port,
1036        remote_addr,
1037        remote_port,
1038    })
1039}
1040
1041/// Trim noise that lsof reports for macOS Apple-framework processes
1042/// so the CLIENTS card shows useful identifiers. Examples:
1043///   `com.apple.WebKit.Networking` → `WebKit.Networking`
1044///   `com.apple.Safari`            → `Safari`
1045///   `curl`                        → `curl`
1046/// Other names pass through unchanged.
1047pub fn beautify_process(raw: &str) -> String {
1048    if let Some(rest) = raw.strip_prefix("com.apple.") {
1049        if !rest.is_empty() {
1050            return rest.to_string();
1051        }
1052    }
1053    raw.to_string()
1054}
1055
1056/// Split a `host:port` token. Handles bracketed IPv6: `[::1]:8080`.
1057fn split_addr_port(s: &str) -> Option<(String, u16)> {
1058    if let Some(rest) = s.strip_prefix('[') {
1059        let end = rest.find(']')?;
1060        let addr = &rest[..end];
1061        let after = &rest[end + 1..];
1062        let port_str = after.strip_prefix(':')?;
1063        let port: u16 = port_str.parse().ok()?;
1064        return Some((addr.to_string(), port));
1065    }
1066    let colon = s.rfind(':')?;
1067    let addr = &s[..colon];
1068    let port: u16 = s[colon + 1..].parse().ok()?;
1069    Some((addr.to_string(), port))
1070}
1071
1072// ---------------------------------------------------------------------------
1073// Snapshot path for demo and visual regression tests.
1074// ---------------------------------------------------------------------------
1075
1076/// Deterministic snapshot of a tunnel's live state. Used in `--demo` and
1077/// in visual regression tests so goldens do not depend on wall-clock or
1078/// background workers.
1079#[derive(Debug, Clone)]
1080#[allow(dead_code)]
1081pub struct TunnelLiveSnapshot {
1082    pub uptime_secs: u64,
1083    pub active_channels: u32,
1084    pub peak_concurrent: u32,
1085    pub total_opens: u32,
1086    pub idle_secs: u64,
1087    pub rx_history: [u64; HISTORY_BUCKETS],
1088    pub tx_history: [u64; HISTORY_BUCKETS],
1089    pub current_rx_bps: u64,
1090    pub current_tx_bps: u64,
1091    pub peak_rx_bps: u64,
1092    pub peak_tx_bps: u64,
1093    /// True when the throughput aggregator has produced at least one
1094    /// sample. UI shows `sampling…` until then.
1095    pub throughput_ready: bool,
1096    pub clients: Vec<DisplayClient>,
1097    pub events: Vec<DisplayEvent>,
1098    /// Currently-open channels at snapshot time. Each entry is
1099    /// `(channel_id, open_age_secs, kind)`. Drives the channel
1100    /// lifeline swimlane in the detail panel.
1101    pub currently_open: Vec<(u32, u64, ChannelKind)>,
1102    pub conflict: Option<PortConflict>,
1103    pub last_exit: Option<(i32, String)>,
1104}
1105
1106#[derive(Debug, Clone)]
1107pub struct DisplayClient {
1108    /// Source `addr:port` of the connected client. Reserved for the
1109    /// expanded clients-list view; the heartbeat-dial dashboard does
1110    /// not show it inline.
1111    #[allow(dead_code)]
1112    pub src: String,
1113    pub process: String,
1114    pub age_secs: u64,
1115    /// Connecting process PID. Reserved for the expanded clients-list
1116    /// view.
1117    #[allow(dead_code)]
1118    pub pid: u32,
1119    /// User-facing app that owns this socket on macOS. `None` if equal
1120    /// to `process` or unavailable. See `ClientPeer::responsible_app`.
1121    pub responsible_app: Option<String>,
1122    /// Per-client throughput readouts. Zero when no sample has been
1123    /// taken yet, or when the platform sampler does not produce
1124    /// per-client data.
1125    pub current_rx_bps: u64,
1126    pub current_tx_bps: u64,
1127    /// Rolling history of combined rx+tx bytes-per-second. Cell 0 is
1128    /// the oldest sample, the last cell is "now".
1129    pub viz_history: [u64; PEER_VIZ_BUCKETS],
1130    /// True after the per-client sampler has produced at least one
1131    /// sample. Lets the renderer distinguish "0 B/s, idle" from
1132    /// "no sampler available".
1133    pub throughput_ready: bool,
1134}
1135
1136#[derive(Debug, Clone)]
1137#[allow(dead_code)]
1138pub struct DisplayEvent {
1139    pub age_secs: u64,
1140    /// Surfaced through the runtime path for completeness; the
1141    /// renderer currently does not show channel ids because the
1142    /// numbers are noisy without context. Reserved for future use.
1143    #[allow(dead_code)]
1144    pub channel_id: u32,
1145    pub kind: ChannelEventKind,
1146    /// Filtered to `Direct | Forwarded | Dynamic` — internal channels
1147    /// are dropped by the UI before this struct is built, so this is
1148    /// always one of the user-visible kinds.
1149    pub channel_kind: ChannelKind,
1150    /// Open→close duration when known. Reserved for the expanded
1151    /// events-list view.
1152    #[allow(dead_code)]
1153    pub duration_secs: Option<u64>,
1154    /// Number of co-occurring events of the same kind/age that were
1155    /// folded into this row. `1` means "just one event"; higher values
1156    /// render as a `(3x)` suffix.
1157    pub count: u32,
1158}
1159
1160// ---------------------------------------------------------------------------
1161// Tests.
1162// ---------------------------------------------------------------------------
1163
1164#[cfg(test)]
1165mod tests {
1166    use super::*;
1167
1168    fn user_open(channel_id: u32, at: Instant) -> ChannelEvent {
1169        ChannelEvent {
1170            at,
1171            channel_id,
1172            kind: ChannelEventKind::Open,
1173            channel_kind: Some(ChannelKind::Direct),
1174            opened_at: None,
1175        }
1176    }
1177
1178    #[test]
1179    fn parse_channel_open_simple() {
1180        let ev = parse_channel_line("debug1: channel 0: new [direct-tcpip]").unwrap();
1181        assert_eq!(ev.channel_id, 0);
1182        assert_eq!(ev.kind, ChannelEventKind::Open);
1183        assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1184    }
1185
1186    #[test]
1187    fn parse_channel_open_records_listener_kind() {
1188        let ev = parse_channel_line("debug1: channel 1: new [port listener]").unwrap();
1189        assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1190    }
1191
1192    #[test]
1193    fn parse_channel_open_records_dynamic_kind() {
1194        let ev = parse_channel_line("debug1: channel 4: new [dynamic-tcpip]").unwrap();
1195        assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1196    }
1197
1198    #[test]
1199    fn parse_channel_close_simple() {
1200        let ev = parse_channel_line("debug1: channel 12: free: blah blah").unwrap();
1201        assert_eq!(ev.channel_id, 12);
1202        assert_eq!(ev.kind, ChannelEventKind::Close);
1203        // Close lines do not carry the kind; the recorder fills it in
1204        // from the open-channel map.
1205        assert_eq!(ev.channel_kind, None);
1206    }
1207
1208    #[test]
1209    fn parse_channel_with_leading_whitespace() {
1210        let ev = parse_channel_line("   debug1: channel 5: new [forwarded-tcpip]").unwrap();
1211        assert_eq!(ev.channel_id, 5);
1212        assert_eq!(ev.kind, ChannelEventKind::Open);
1213        assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1214    }
1215
1216    #[test]
1217    fn parse_channel_modern_openssh_format_with_inactive_timeout() {
1218        // OpenSSH 10.x: `channel N: new <ctype> [<rname>] (inactive timeout: T)`.
1219        // The ctype sits BEFORE the brackets; the brackets hold the
1220        // remote endpoint name (which we ignore). Without this branch
1221        // the parser silently set channel_kind=None and counters never
1222        // advanced past zero on real-world OpenSSH ≥ 9.x.
1223        let ev = parse_channel_line(
1224            "debug1: channel 3: new direct-tcpip [127.0.0.1:54321] (inactive timeout: 0)",
1225        )
1226        .unwrap();
1227        assert_eq!(ev.channel_id, 3);
1228        assert_eq!(ev.kind, ChannelEventKind::Open);
1229        assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1230    }
1231
1232    #[test]
1233    fn parse_channel_modern_openssh_format_forwarded() {
1234        let ev = parse_channel_line(
1235            "debug1: channel 7: new forwarded-tcpip [10.0.0.1:443] (inactive timeout: 0)",
1236        )
1237        .unwrap();
1238        assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1239    }
1240
1241    #[test]
1242    fn parse_channel_modern_openssh_format_dynamic() {
1243        let ev = parse_channel_line("debug1: channel 9: new dynamic-tcpip [client] (timeout: 5)")
1244            .unwrap();
1245        assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1246    }
1247
1248    #[test]
1249    fn parse_channel_modern_openssh_format_internal_listener_is_other() {
1250        // The local-port listener that ssh creates at startup uses
1251        // ctype "port-listener" or "listener" depending on version.
1252        // Either way it must NOT be promoted to a user-visible kind.
1253        let ev = parse_channel_line(
1254            "debug1: channel 0: new port-listener [::1:8080] (inactive timeout: 0)",
1255        )
1256        .unwrap();
1257        assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1258    }
1259
1260    #[test]
1261    fn parse_channel_unrelated_line_returns_none() {
1262        assert!(parse_channel_line("debug1: client_input_global_request").is_none());
1263        assert!(parse_channel_line("not even ssh output").is_none());
1264        assert!(parse_channel_line("debug1: channel abc: new").is_none());
1265        assert!(parse_channel_line("debug1: channel 1: confirm").is_none());
1266    }
1267
1268    #[test]
1269    fn record_event_open_increments_counters_for_user_visible_kinds() {
1270        let now = Instant::now();
1271        let mut state = TunnelLiveState::new(now);
1272        state.record_event(user_open(1, now));
1273        assert_eq!(state.total_opens, 1);
1274        assert_eq!(state.active_channels, 1);
1275        assert_eq!(state.peak_concurrent, 1);
1276        // Activity history is fed by sample_activity, not record_event.
1277        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1278    }
1279
1280    #[test]
1281    fn record_event_skips_counters_for_internal_channels() {
1282        // Listener / session / mux-master channels should NOT inflate
1283        // peak_concurrent or total_opens — only user-visible traffic
1284        // counts towards the activity figures.
1285        let now = Instant::now();
1286        let mut state = TunnelLiveState::new(now);
1287        state.record_event(ChannelEvent {
1288            at: now,
1289            channel_id: 0,
1290            kind: ChannelEventKind::Open,
1291            channel_kind: Some(ChannelKind::Other),
1292            opened_at: None,
1293        });
1294        assert_eq!(state.total_opens, 0);
1295        assert_eq!(state.active_channels, 0);
1296        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1297        // The event is still kept in the ringbuffer for diagnostics.
1298        assert_eq!(state.events.len(), 1);
1299    }
1300
1301    #[test]
1302    fn sample_activity_writes_peak_into_current_bucket() {
1303        let now = Instant::now();
1304        let mut state = TunnelLiveState::new(now);
1305        state.sample_activity(2);
1306        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1307        // A lower sample inside the same bucket must not erase the peak.
1308        state.sample_activity(1);
1309        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1310        // A higher sample raises the peak.
1311        state.sample_activity(5);
1312        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 5);
1313    }
1314
1315    #[test]
1316    fn sample_activity_clamps_to_u8_max() {
1317        let now = Instant::now();
1318        let mut state = TunnelLiveState::new(now);
1319        state.sample_activity(u32::MAX);
1320        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], u8::MAX);
1321    }
1322
1323    #[test]
1324    fn record_event_close_pairs_with_open_for_duration() {
1325        let t0 = Instant::now();
1326        let t1 = t0 + Duration::from_secs(5);
1327        let mut state = TunnelLiveState::new(t0);
1328        state.record_event(user_open(7, t0));
1329        state.record_event(ChannelEvent {
1330            at: t1,
1331            channel_id: 7,
1332            kind: ChannelEventKind::Close,
1333            channel_kind: None,
1334            opened_at: None,
1335        });
1336        assert_eq!(state.active_channels, 0);
1337        let last = state.events.back().unwrap();
1338        assert_eq!(last.kind, ChannelEventKind::Close);
1339        assert_eq!(last.opened_at, Some(t0));
1340        // The close event picks up the kind that was recorded on open.
1341        assert_eq!(last.channel_kind, Some(ChannelKind::Direct));
1342    }
1343
1344    #[test]
1345    fn record_event_caps_ringbuffer_at_max() {
1346        let now = Instant::now();
1347        let mut state = TunnelLiveState::new(now);
1348        for i in 0..(MAX_EVENTS as u32 + 5) {
1349            state.record_event(user_open(i, now));
1350        }
1351        assert_eq!(state.events.len(), MAX_EVENTS);
1352    }
1353
1354    #[test]
1355    fn rotate_if_due_shifts_buckets_per_tick() {
1356        let t0 = Instant::now();
1357        let mut state = TunnelLiveState::new(t0);
1358        state.opens_history[HISTORY_BUCKETS - 1] = 7;
1359        state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS));
1360        // After one rotate the value moved one slot left.
1361        assert_eq!(state.opens_history[HISTORY_BUCKETS - 2], 7);
1362        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1363    }
1364
1365    #[test]
1366    fn rotate_if_due_clamps_at_full_window() {
1367        let t0 = Instant::now();
1368        let mut state = TunnelLiveState::new(t0);
1369        state.opens_history[HISTORY_BUCKETS - 1] = 9;
1370        // Far beyond the window — rotation should clear it entirely.
1371        state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS * HISTORY_BUCKETS as u64 * 4));
1372        assert!(state.opens_history.iter().all(|&v| v == 0));
1373    }
1374
1375    #[test]
1376    fn rotate_if_due_noop_within_one_bucket() {
1377        let t0 = Instant::now();
1378        let mut state = TunnelLiveState::new(t0);
1379        state.opens_history[HISTORY_BUCKETS - 1] = 3;
1380        // Less than one BUCKET_SECS elapsed — no rotation should happen.
1381        state.rotate_if_due(t0 + Duration::from_millis(BUCKET_SECS * 1000 / 2));
1382        assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 3);
1383    }
1384
1385    #[test]
1386    fn parse_lsof_listen_row() {
1387        let line = "ssh    12345 user 3u IPv4 0xabc 0t0 TCP 127.0.0.1:8080 (LISTEN)";
1388        let row = parse_lsof_row(line).unwrap();
1389        assert_eq!(row.command, "ssh");
1390        assert_eq!(row.pid, 12345);
1391        assert!(row.is_listen);
1392        assert_eq!(row.local_addr, "127.0.0.1");
1393        assert_eq!(row.local_port, 8080);
1394        assert!(row.remote_port.is_none());
1395    }
1396
1397    #[test]
1398    fn parse_lsof_established_row() {
1399        let line =
1400            "curl   23456 user 4u IPv4 0xdef 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)";
1401        let row = parse_lsof_row(line).unwrap();
1402        assert_eq!(row.command, "curl");
1403        assert_eq!(row.pid, 23456);
1404        assert!(!row.is_listen);
1405        assert_eq!(row.local_port, 54321);
1406        assert_eq!(row.remote_port, Some(8080));
1407    }
1408
1409    #[test]
1410    fn parse_lsof_other_states_skipped() {
1411        // CLOSE_WAIT, TIME_WAIT, etc. are not interesting for our display.
1412        let line = "x 1 u 0u IPv4 0 0t0 TCP 1.2.3.4:1->5.6.7.8:9 (CLOSE_WAIT)";
1413        assert!(parse_lsof_row(line).is_none());
1414    }
1415
1416    #[test]
1417    fn parse_lsof_output_finds_clients_for_bind_port() {
1418        let txt = "\
1419COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1420ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1421curl   23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1422ssh    12345 u 5u IPv4 0xc 0t0 TCP 127.0.0.1:8080->127.0.0.1:54321 (ESTABLISHED)
1423";
1424        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1425        let mut seen = HashMap::new();
1426        let now = Instant::now();
1427        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1428        let peers = msg.clients.get(&8080).expect("clients on 8080");
1429        assert_eq!(peers.len(), 1);
1430        assert_eq!(peers[0].process, "curl");
1431        assert_eq!(peers[0].pid, 23456);
1432        assert!(peers[0].src.contains("54321"));
1433        assert!(msg.conflicts.is_empty());
1434    }
1435
1436    #[test]
1437    fn parse_lsof_output_detects_port_conflict() {
1438        let txt = "\
1439COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1440nginx  99999 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1441";
1442        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1443        let mut seen = HashMap::new();
1444        let now = Instant::now();
1445        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1446        let conflict = msg.conflicts.get(&8080).expect("conflict on 8080");
1447        assert_eq!(conflict.process, "nginx");
1448        assert_eq!(conflict.pid, 99999);
1449    }
1450
1451    #[test]
1452    fn parse_lsof_output_skips_own_listen() {
1453        let txt = "\
1454COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1455ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1456";
1457        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1458        let mut seen = HashMap::new();
1459        let now = Instant::now();
1460        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1461        assert!(msg.conflicts.is_empty());
1462    }
1463
1464    #[test]
1465    fn parse_lsof_output_first_seen_persists_across_polls() {
1466        let txt = "\
1467COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1468ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1469curl   23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1470";
1471        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1472        let mut seen = HashMap::new();
1473        let t0 = Instant::now();
1474        let msg1 = parse_lsof_output(txt, &ports, &mut seen, t0);
1475        let t1 = t0 + Duration::from_secs(5);
1476        let msg2 = parse_lsof_output(txt, &ports, &mut seen, t1);
1477        let p1 = &msg1.clients[&8080][0];
1478        let p2 = &msg2.clients[&8080][0];
1479        assert_eq!(p1.since, p2.since, "first_seen should be sticky");
1480    }
1481
1482    #[test]
1483    fn split_addr_port_handles_ipv6_brackets() {
1484        let (a, p) = split_addr_port("[::1]:8080").unwrap();
1485        assert_eq!(a, "::1");
1486        assert_eq!(p, 8080);
1487    }
1488
1489    #[test]
1490    fn split_addr_port_handles_ipv4() {
1491        let (a, p) = split_addr_port("127.0.0.1:8080").unwrap();
1492        assert_eq!(a, "127.0.0.1");
1493        assert_eq!(p, 8080);
1494    }
1495
1496    #[test]
1497    fn beautify_process_strips_com_apple_prefix() {
1498        assert_eq!(
1499            beautify_process("com.apple.WebKit.Networking"),
1500            "WebKit.Networking"
1501        );
1502        assert_eq!(beautify_process("com.apple.Safari"), "Safari");
1503    }
1504
1505    #[test]
1506    fn beautify_process_passes_other_names_through_unchanged() {
1507        assert_eq!(beautify_process("curl"), "curl");
1508        assert_eq!(beautify_process("nginx"), "nginx");
1509        assert_eq!(beautify_process("python3"), "python3");
1510    }
1511
1512    #[test]
1513    fn beautify_process_does_not_strip_when_only_prefix() {
1514        // Edge case: the bare `com.apple.` string would otherwise
1515        // collapse to "" and disappear from the card. Keep the raw
1516        // value so the user at least sees that lsof reported something.
1517        assert_eq!(beautify_process("com.apple."), "com.apple.");
1518    }
1519
1520    #[test]
1521    fn parse_lsof_output_unwraps_apple_framework_names() {
1522        let txt = "\
1523COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1524ssh    12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1525com.apple.WebKit.Networking 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1526";
1527        let ports = vec![("foo".into(), 8080u16, 12345u32)];
1528        let mut seen = HashMap::new();
1529        let now = Instant::now();
1530        let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1531        let peers = msg.clients.get(&8080).expect("clients on 8080");
1532        assert_eq!(peers.len(), 1);
1533        // Process name is shown without the noisy `com.apple.` prefix.
1534        assert_eq!(peers[0].process, "WebKit.Networking");
1535    }
1536}