use std::collections::{HashMap, VecDeque};
use std::io::{BufRead, BufReader};
#[cfg(target_os = "macos")]
use std::process::Stdio;
use std::process::{ChildStderr, Command};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
pub const MAX_EVENTS: usize = 50;
pub const BUCKET_SECS: u64 = 2;
pub const HISTORY_BUCKETS: usize = 150;
pub const STDERR_BUFFER_LINES: usize = 10;
pub const MAX_CLIENTS_PER_PORT: usize = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChannelEventKind {
Open,
Close,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChannelKind {
Direct,
Forwarded,
Dynamic,
Other,
}
impl ChannelKind {
pub fn from_bracket(token: &str) -> Self {
match token {
"direct-tcpip" => Self::Direct,
"forwarded-tcpip" => Self::Forwarded,
"dynamic-tcpip" => Self::Dynamic,
_ => Self::Other,
}
}
pub fn is_user_visible(self) -> bool {
matches!(self, Self::Direct | Self::Forwarded | Self::Dynamic)
}
}
#[derive(Debug, Clone)]
pub struct ChannelEvent {
pub at: Instant,
pub channel_id: u32,
pub kind: ChannelEventKind,
pub channel_kind: Option<ChannelKind>,
pub opened_at: Option<Instant>,
}
pub const PEER_VIZ_BUCKETS: usize = 12;
#[derive(Debug, Clone)]
pub struct ClientPeer {
pub src: String,
pub process: String,
pub pid: u32,
pub since: Instant,
pub responsible_app: Option<String>,
pub current_rx_bps: u64,
pub current_tx_bps: u64,
pub bytes_rcvd: Option<u64>,
pub bytes_sent: Option<u64>,
pub last_sample_at: Option<Instant>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct PortConflict {
pub port: u16,
pub process: String,
pub pid: u32,
}
pub struct TunnelLiveState {
pub events: VecDeque<ChannelEvent>,
pub opens_history: [u8; HISTORY_BUCKETS],
pub history_last_rotate: Instant,
pub peak_concurrent: u32,
pub total_opens: u32,
pub last_event_at: Option<Instant>,
pub active_channels: u32,
pub channel_open: HashMap<u32, (Instant, ChannelKind)>,
pub last_exit: Option<(i32, String)>,
pub stderr_buffer: Arc<Mutex<VecDeque<String>>>,
pub parser_thread: Option<JoinHandle<()>>,
pub parser_stop: Arc<AtomicBool>,
pub rx_history: [u64; HISTORY_BUCKETS],
pub tx_history: [u64; HISTORY_BUCKETS],
pub current_rx_bps: u64,
pub current_tx_bps: u64,
pub peak_rx_bps: u64,
pub peak_tx_bps: u64,
pub last_throughput_at: Option<Instant>,
}
impl TunnelLiveState {
pub fn new(started_at: Instant) -> Self {
Self {
events: VecDeque::with_capacity(MAX_EVENTS),
opens_history: [0u8; HISTORY_BUCKETS],
history_last_rotate: started_at,
peak_concurrent: 0,
total_opens: 0,
last_event_at: None,
active_channels: 0,
channel_open: HashMap::new(),
last_exit: None,
stderr_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_BUFFER_LINES))),
parser_thread: None,
parser_stop: Arc::new(AtomicBool::new(false)),
rx_history: [0u64; HISTORY_BUCKETS],
tx_history: [0u64; HISTORY_BUCKETS],
current_rx_bps: 0,
current_tx_bps: 0,
peak_rx_bps: 0,
peak_tx_bps: 0,
last_throughput_at: None,
}
}
pub fn record_event(&mut self, mut event: ChannelEvent) {
self.rotate_if_due(event.at);
match event.kind {
ChannelEventKind::Open => {
let kind = event.channel_kind.unwrap_or(ChannelKind::Other);
if kind.is_user_visible() {
self.total_opens = self.total_opens.saturating_add(1);
self.active_channels = self.active_channels.saturating_add(1);
self.peak_concurrent = self.peak_concurrent.max(self.active_channels);
}
self.channel_open.insert(event.channel_id, (event.at, kind));
}
ChannelEventKind::Close => {
if let Some((opened_at, kind)) = self.channel_open.remove(&event.channel_id) {
event.opened_at = Some(opened_at);
event.channel_kind = Some(kind);
if kind.is_user_visible() {
self.active_channels = self.active_channels.saturating_sub(1);
}
}
}
}
self.last_event_at = Some(event.at);
if self.events.len() == MAX_EVENTS {
self.events.pop_front();
}
self.events.push_back(event);
log::debug!(
"[purple] Tunnel live event: total_opens={} active={} peak={}",
self.total_opens,
self.active_channels,
self.peak_concurrent
);
}
pub fn rotate_if_due(&mut self, now: Instant) {
let elapsed = now.saturating_duration_since(self.history_last_rotate);
let ticks = elapsed.as_secs() / BUCKET_SECS;
if ticks == 0 {
return;
}
let shift = (ticks as usize).min(HISTORY_BUCKETS);
if shift >= HISTORY_BUCKETS {
self.opens_history.fill(0);
self.rx_history.fill(0);
self.tx_history.fill(0);
} else {
self.opens_history.rotate_left(shift);
for slot in self.opens_history.iter_mut().rev().take(shift) {
*slot = 0;
}
self.rx_history.rotate_left(shift);
for slot in self.rx_history.iter_mut().rev().take(shift) {
*slot = 0;
}
self.tx_history.rotate_left(shift);
for slot in self.tx_history.iter_mut().rev().take(shift) {
*slot = 0;
}
}
self.history_last_rotate += Duration::from_secs(ticks * BUCKET_SECS);
}
pub fn sample_activity(&mut self, concurrent: u32) {
let sample = u8::try_from(concurrent).unwrap_or(u8::MAX);
if let Some(last) = self.opens_history.last_mut() {
*last = (*last).max(sample);
}
}
}
#[derive(Debug, Clone)]
pub struct ParserMessage {
pub alias: String,
pub event: ChannelEvent,
}
#[derive(Debug, Clone)]
pub struct LsofMessage {
pub at: Instant,
pub clients: HashMap<u16, Vec<ClientPeer>>,
pub conflicts: HashMap<u16, PortConflict>,
}
impl LsofMessage {
pub fn empty(at: Instant) -> Self {
Self {
at,
clients: HashMap::new(),
conflicts: HashMap::new(),
}
}
}
pub struct LsofPollerHandle {
pub stop: Arc<AtomicBool>,
pub bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
pub thread: Option<JoinHandle<()>>,
}
impl LsofPollerHandle {
pub fn shutdown(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
}
pub fn spawn_parser_thread(
stderr: ChildStderr,
alias: String,
tx: Sender<ParserMessage>,
stderr_buffer: Arc<Mutex<VecDeque<String>>>,
stop: Arc<AtomicBool>,
) -> JoinHandle<()> {
thread::Builder::new()
.name(format!("purple-tunnel-parser-{alias}"))
.spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
if stop.load(Ordering::Relaxed) {
break;
}
let Ok(line) = line else { break };
if let Ok(mut buf) = stderr_buffer.lock() {
if buf.len() == STDERR_BUFFER_LINES {
buf.pop_front();
}
buf.push_back(line.clone());
}
if let Some(event) = parse_channel_line(&line) {
let msg = ParserMessage {
alias: alias.clone(),
event,
};
if tx.send(msg).is_err() {
break;
}
}
}
log::debug!("[purple] Tunnel parser thread exit: alias={alias}");
})
.expect("spawn purple-tunnel-parser thread")
}
pub fn parse_channel_line(line: &str) -> Option<ChannelEvent> {
let trimmed = line.trim_start();
let rest = trimmed.strip_prefix("debug1: channel ")?;
let (id_str, after) = rest.split_once(':')?;
let channel_id: u32 = id_str.trim().parse().ok()?;
let after = after.trim_start();
let (kind, channel_kind) = if let Some(after_new) = after.strip_prefix("new") {
let after_new = after_new.trim_start();
let ctype = if let Some(rest) = after_new.strip_prefix('[') {
rest.split_once(']').map(|(t, _)| t.trim().to_string())
} else {
after_new
.split_whitespace()
.next()
.map(|s| s.to_string())
.filter(|s| !s.is_empty())
};
let chan_kind = ctype.as_deref().map(ChannelKind::from_bracket);
(ChannelEventKind::Open, chan_kind)
} else if after.starts_with("free") {
(ChannelEventKind::Close, None)
} else {
return None;
};
Some(ChannelEvent {
at: Instant::now(),
channel_id,
kind,
channel_kind,
opened_at: None,
})
}
#[cfg(any(target_os = "macos", target_os = "linux"))]
pub fn spawn_lsof_poller(
bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
tx: Sender<LsofMessage>,
stop: Arc<AtomicBool>,
) -> JoinHandle<()> {
thread::Builder::new()
.name("purple-tunnel-lsof".into())
.spawn(move || {
let mut first_seen: HashMap<(u16, String), Instant> = HashMap::new();
let mut responsible_cache = ResponsibleAppCache::default();
let mut peer_state: HashMap<(u16, String), PeerSampleCache> = HashMap::new();
while !stop.load(Ordering::Relaxed) {
let ports: Vec<(String, u16, u32)> = match bind_ports.lock() {
Ok(g) => g.clone(),
Err(p) => p.into_inner().clone(),
};
if ports.is_empty() {
thread::sleep(Duration::from_millis(500));
continue;
}
let now = Instant::now();
let mut msg = run_lsof_once(&ports, &mut first_seen, now);
annotate_responsible_apps(&mut msg, &mut responsible_cache);
annotate_peer_throughput(&mut msg, &mut peer_state, now);
if tx.send(msg).is_err() {
break;
}
for _ in 0..20 {
if stop.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(100));
}
}
log::debug!("[purple] Tunnel lsof poller thread exit");
})
.expect("spawn purple-tunnel-lsof thread")
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
pub fn spawn_lsof_poller(
_bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
_tx: Sender<LsofMessage>,
stop: Arc<AtomicBool>,
) -> JoinHandle<()> {
thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(500));
}
})
}
#[cfg(target_os = "macos")]
unsafe extern "C" {
fn responsibility_get_pid_responsible_for_pid(pid: libc::pid_t) -> libc::pid_t;
}
#[cfg(target_os = "macos")]
fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
let rpid = unsafe { responsibility_get_pid_responsible_for_pid(pid as libc::pid_t) };
if rpid <= 0 {
return None;
}
if rpid as u32 == pid {
return None;
}
let name = process_name(rpid as u32)?;
if name.eq_ignore_ascii_case(client_process) {
return None;
}
Some(name)
}
#[cfg(target_os = "macos")]
fn process_name(pid: u32) -> Option<String> {
unsafe extern "C" {
fn proc_name(pid: libc::c_int, buffer: *mut libc::c_void, buffersize: u32) -> libc::c_int;
}
let mut buf = [0u8; 256];
let n = unsafe {
proc_name(
pid as libc::c_int,
buf.as_mut_ptr().cast(),
buf.len() as u32,
)
};
if n <= 0 {
return None;
}
let bytes = &buf[..(n as usize).min(buf.len())];
let s = std::str::from_utf8(bytes).ok()?.trim_end_matches('\0');
if s.is_empty() {
None
} else {
Some(beautify_process(s))
}
}
#[cfg(target_os = "linux")]
fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
let session_id = read_session_leader(pid)?;
if session_id == pid {
return None;
}
let comm = std::fs::read_to_string(format!("/proc/{}/comm", session_id)).ok()?;
let name = comm.trim();
if name.is_empty() || name.eq_ignore_ascii_case(client_process) {
return None;
}
Some(beautify_process(name))
}
#[cfg(target_os = "linux")]
fn read_session_leader(pid: u32) -> Option<u32> {
let stat = std::fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
let close = stat.rfind(')')?;
let after = stat[close + 1..].trim();
let fields: Vec<&str> = after.split_whitespace().collect();
fields.get(3).and_then(|s| s.parse().ok())
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
fn lookup_responsible_app(_pid: u32, _client_process: &str) -> Option<String> {
None
}
#[derive(Default)]
struct ResponsibleAppCache {
map: HashMap<u32, Option<String>>,
}
impl ResponsibleAppCache {
fn resolve(&mut self, pid: u32, client_process: &str) -> Option<String> {
if let Some(cached) = self.map.get(&pid) {
return cached.clone();
}
let resolved = lookup_responsible_app(pid, client_process);
self.map.insert(pid, resolved.clone());
resolved
}
fn retain_pids(&mut self, live: &std::collections::HashSet<u32>) {
self.map.retain(|pid, _| live.contains(pid));
}
}
fn annotate_responsible_apps(msg: &mut LsofMessage, cache: &mut ResponsibleAppCache) {
let mut live: std::collections::HashSet<u32> = std::collections::HashSet::new();
for peers in msg.clients.values_mut() {
for peer in peers.iter_mut() {
live.insert(peer.pid);
peer.responsible_app = cache.resolve(peer.pid, &peer.process);
}
}
cache.retain_pids(&live);
}
#[derive(Debug, Clone, Default)]
struct PeerSampleCache {
last_rcvd: u64,
last_sent: u64,
last_at: Option<Instant>,
}
#[derive(Debug, Default)]
struct PerPeerSamples {
per_local_port_cumulative: HashMap<u16, (u64, u64)>,
per_pid_bps: HashMap<u32, (u64, u64)>,
}
fn annotate_peer_throughput(
msg: &mut LsofMessage,
cache: &mut HashMap<(u16, String), PeerSampleCache>,
now: Instant,
) {
let samples = sample_peer_throughput();
let mut live: std::collections::HashSet<(u16, String)> = std::collections::HashSet::new();
for (port, peers) in msg.clients.iter_mut() {
for peer in peers.iter_mut() {
let key = (*port, peer.src.clone());
live.insert(key.clone());
let entry = cache.entry(key).or_default();
let src_port = src_port_from(&peer.src);
let cumulative =
src_port.and_then(|p| samples.per_local_port_cumulative.get(&p).copied());
if let Some((rcvd, sent)) = cumulative {
if let Some(prev_at) = entry.last_at {
let dt = now.saturating_duration_since(prev_at).as_secs_f64();
if dt > 0.0 {
let rx_bps = ((rcvd.saturating_sub(entry.last_rcvd)) as f64 / dt) as u64;
let tx_bps = ((sent.saturating_sub(entry.last_sent)) as f64 / dt) as u64;
peer.current_rx_bps = rx_bps;
peer.current_tx_bps = tx_bps;
}
}
entry.last_rcvd = rcvd;
entry.last_sent = sent;
entry.last_at = Some(now);
peer.bytes_rcvd = Some(rcvd);
peer.bytes_sent = Some(sent);
peer.last_sample_at = Some(now);
continue;
}
if let Some((rx_bps, tx_bps)) = samples.per_pid_bps.get(&peer.pid).copied() {
peer.current_rx_bps = rx_bps;
peer.current_tx_bps = tx_bps;
entry.last_at = Some(now);
peer.last_sample_at = Some(now);
}
}
}
cache.retain(|key, _| live.contains(key));
}
fn sample_peer_throughput() -> PerPeerSamples {
let mut out = PerPeerSamples::default();
#[cfg(target_os = "linux")]
{
out.per_local_port_cumulative = crate::tcp_diag::sample_per_local_port();
}
#[cfg(target_os = "macos")]
{
out.per_pid_bps = sample_nettop_per_pid_macos();
}
out
}
#[cfg(target_os = "macos")]
fn sample_nettop_per_pid_macos() -> HashMap<u32, (u64, u64)> {
use std::time::Duration;
let output = Command::new("/usr/bin/nettop")
.args(["-P", "-d", "-x", "-s", "1", "-L", "2"])
.stdin(Stdio::null())
.stderr(Stdio::null())
.output();
let text = match output {
Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).into_owned(),
_ => return HashMap::new(),
};
let mut blocks: Vec<&str> = Vec::new();
let mut start = 0;
for (i, line) in text.lines().enumerate() {
if line.starts_with("time,") {
blocks.push(&text[start..]);
start = text.lines().take(i).map(|l| l.len() + 1).sum();
}
}
let _ = Duration::ZERO; let delta_block = if text.matches("\ntime,").count() >= 1 {
let first = text.find("time,").unwrap_or(0);
let second_rel = text[first + 5..].find("\ntime,");
match second_rel {
Some(off) => &text[first + 5 + off + 1..],
None => &text[first..],
}
} else {
text.as_str()
};
let mut out: HashMap<u32, (u64, u64)> = HashMap::new();
for line in delta_block.lines() {
if let Some((pid, rx, tx)) = parse_nettop_csv_row_per_pid(line) {
let entry = out.entry(pid).or_insert((0, 0));
entry.0 = entry.0.saturating_add(rx);
entry.1 = entry.1.saturating_add(tx);
}
}
out
}
#[cfg(target_os = "macos")]
fn parse_nettop_csv_row_per_pid(line: &str) -> Option<(u32, u64, u64)> {
let line = line.trim();
if line.is_empty() || line.starts_with("time,") {
return None;
}
let cols: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
if cols.len() < 6 {
return None;
}
let proc_pid = cols[1];
let dot = proc_pid.rfind('.')?;
let pid_str = &proc_pid[dot + 1..];
let pid: u32 = pid_str.parse().ok()?;
let rx: u64 = cols[4].parse().ok()?;
let tx: u64 = cols[5].parse().ok()?;
Some((pid, rx, tx))
}
fn src_port_from(src: &str) -> Option<u16> {
src.rsplit_once(':').and_then(|(_, port)| port.parse().ok())
}
#[cfg(any(target_os = "macos", target_os = "linux"))]
fn run_lsof_once(
ports: &[(String, u16, u32)],
first_seen: &mut HashMap<(u16, String), Instant>,
now: Instant,
) -> LsofMessage {
let output = Command::new("lsof")
.args(["-iTCP", "-P", "-n", "-w", "+c", "0"])
.output();
let stdout = match output {
Ok(o) if o.status.success() || !o.stdout.is_empty() => o.stdout,
_ => return LsofMessage::empty(now),
};
let text = String::from_utf8_lossy(&stdout);
parse_lsof_output(&text, ports, first_seen, now)
}
pub fn parse_lsof_output(
text: &str,
ports: &[(String, u16, u32)],
first_seen: &mut HashMap<(u16, String), Instant>,
now: Instant,
) -> LsofMessage {
let mut clients: HashMap<u16, Vec<ClientPeer>> = HashMap::new();
let mut conflicts: HashMap<u16, PortConflict> = HashMap::new();
let bind_ports: Vec<u16> = ports.iter().map(|(_, p, _)| *p).collect();
let tunnel_pids: Vec<u32> = ports.iter().map(|(_, _, pid)| *pid).collect();
for line in text.lines().skip(1) {
let row = match parse_lsof_row(line) {
Some(r) => r,
None => continue,
};
if row.is_listen && bind_ports.contains(&row.local_port) && !tunnel_pids.contains(&row.pid)
{
conflicts
.entry(row.local_port)
.or_insert_with(|| PortConflict {
port: row.local_port,
process: row.command.clone(),
pid: row.pid,
});
continue;
}
if row.is_listen {
continue;
}
if let Some(remote_port) = row.remote_port {
if bind_ports.contains(&remote_port) && !tunnel_pids.contains(&row.pid) {
let src = row.local_addr_port().unwrap_or_else(|| "?".to_string());
let key = (remote_port, src.clone());
let since = *first_seen.entry(key).or_insert(now);
let entry = clients.entry(remote_port).or_default();
if entry.len() >= MAX_CLIENTS_PER_PORT {
continue;
}
entry.push(ClientPeer {
src,
process: beautify_process(&row.command),
pid: row.pid,
since,
responsible_app: None,
current_rx_bps: 0,
current_tx_bps: 0,
bytes_rcvd: None,
bytes_sent: None,
last_sample_at: None,
});
}
}
}
let live: std::collections::HashSet<(u16, String)> = clients
.iter()
.flat_map(|(port, peers)| peers.iter().map(move |p| (*port, p.src.clone())))
.collect();
first_seen.retain(|key, _| live.contains(key));
LsofMessage {
at: now,
clients,
conflicts,
}
}
#[derive(Debug)]
struct LsofRow {
command: String,
pid: u32,
is_listen: bool,
local_addr: String,
local_port: u16,
remote_addr: Option<String>,
remote_port: Option<u16>,
}
impl LsofRow {
fn local_addr_port(&self) -> Option<String> {
if let (Some(addr), Some(port)) = (self.remote_addr.as_deref(), self.remote_port) {
let _ = (addr, port);
}
Some(format!("{}:{}", self.local_addr, self.local_port))
}
}
fn parse_lsof_row(line: &str) -> Option<LsofRow> {
if line.trim().is_empty() {
return None;
}
let mut fields = line.split_whitespace();
let command = fields.next()?.to_string();
let pid: u32 = fields.next()?.parse().ok()?;
let _user = fields.next()?;
let _fd = fields.next()?;
let _ty = fields.next()?;
let _dev = fields.next()?;
let _size = fields.next()?;
let _node = fields.next()?;
let name = fields.next()?;
let state = fields.next();
if !name.contains(':') {
return None;
}
let is_listen = matches!(state, Some(s) if s.contains("LISTEN"));
let is_established = matches!(state, Some(s) if s.contains("ESTABLISHED"));
if !is_listen && !is_established {
return None;
}
let (local, remote) = match name.split_once("->") {
Some((l, r)) => (l, Some(r)),
None => (name, None),
};
let (local_addr, local_port) = split_addr_port(local)?;
let (remote_addr, remote_port) = match remote {
Some(r) => match split_addr_port(r) {
Some((a, p)) => (Some(a), Some(p)),
None => (None, None),
},
None => (None, None),
};
Some(LsofRow {
command,
pid,
is_listen,
local_addr,
local_port,
remote_addr,
remote_port,
})
}
pub fn beautify_process(raw: &str) -> String {
if let Some(rest) = raw.strip_prefix("com.apple.") {
if !rest.is_empty() {
return rest.to_string();
}
}
raw.to_string()
}
fn split_addr_port(s: &str) -> Option<(String, u16)> {
if let Some(rest) = s.strip_prefix('[') {
let end = rest.find(']')?;
let addr = &rest[..end];
let after = &rest[end + 1..];
let port_str = after.strip_prefix(':')?;
let port: u16 = port_str.parse().ok()?;
return Some((addr.to_string(), port));
}
let colon = s.rfind(':')?;
let addr = &s[..colon];
let port: u16 = s[colon + 1..].parse().ok()?;
Some((addr.to_string(), port))
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TunnelLiveSnapshot {
pub uptime_secs: u64,
pub active_channels: u32,
pub peak_concurrent: u32,
pub total_opens: u32,
pub idle_secs: u64,
pub rx_history: [u64; HISTORY_BUCKETS],
pub tx_history: [u64; HISTORY_BUCKETS],
pub current_rx_bps: u64,
pub current_tx_bps: u64,
pub peak_rx_bps: u64,
pub peak_tx_bps: u64,
pub throughput_ready: bool,
pub clients: Vec<DisplayClient>,
pub events: Vec<DisplayEvent>,
pub currently_open: Vec<(u32, u64, ChannelKind)>,
pub conflict: Option<PortConflict>,
pub last_exit: Option<(i32, String)>,
}
#[derive(Debug, Clone)]
pub struct DisplayClient {
#[allow(dead_code)]
pub src: String,
pub process: String,
pub age_secs: u64,
#[allow(dead_code)]
pub pid: u32,
pub responsible_app: Option<String>,
pub current_rx_bps: u64,
pub current_tx_bps: u64,
pub viz_history: [u64; PEER_VIZ_BUCKETS],
pub throughput_ready: bool,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct DisplayEvent {
pub age_secs: u64,
#[allow(dead_code)]
pub channel_id: u32,
pub kind: ChannelEventKind,
pub channel_kind: ChannelKind,
#[allow(dead_code)]
pub duration_secs: Option<u64>,
pub count: u32,
}
#[cfg(test)]
mod tests {
use super::*;
fn user_open(channel_id: u32, at: Instant) -> ChannelEvent {
ChannelEvent {
at,
channel_id,
kind: ChannelEventKind::Open,
channel_kind: Some(ChannelKind::Direct),
opened_at: None,
}
}
#[test]
fn parse_channel_open_simple() {
let ev = parse_channel_line("debug1: channel 0: new [direct-tcpip]").unwrap();
assert_eq!(ev.channel_id, 0);
assert_eq!(ev.kind, ChannelEventKind::Open);
assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
}
#[test]
fn parse_channel_open_records_listener_kind() {
let ev = parse_channel_line("debug1: channel 1: new [port listener]").unwrap();
assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
}
#[test]
fn parse_channel_open_records_dynamic_kind() {
let ev = parse_channel_line("debug1: channel 4: new [dynamic-tcpip]").unwrap();
assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
}
#[test]
fn parse_channel_close_simple() {
let ev = parse_channel_line("debug1: channel 12: free: blah blah").unwrap();
assert_eq!(ev.channel_id, 12);
assert_eq!(ev.kind, ChannelEventKind::Close);
assert_eq!(ev.channel_kind, None);
}
#[test]
fn parse_channel_with_leading_whitespace() {
let ev = parse_channel_line(" debug1: channel 5: new [forwarded-tcpip]").unwrap();
assert_eq!(ev.channel_id, 5);
assert_eq!(ev.kind, ChannelEventKind::Open);
assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
}
#[test]
fn parse_channel_modern_openssh_format_with_inactive_timeout() {
let ev = parse_channel_line(
"debug1: channel 3: new direct-tcpip [127.0.0.1:54321] (inactive timeout: 0)",
)
.unwrap();
assert_eq!(ev.channel_id, 3);
assert_eq!(ev.kind, ChannelEventKind::Open);
assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
}
#[test]
fn parse_channel_modern_openssh_format_forwarded() {
let ev = parse_channel_line(
"debug1: channel 7: new forwarded-tcpip [10.0.0.1:443] (inactive timeout: 0)",
)
.unwrap();
assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
}
#[test]
fn parse_channel_modern_openssh_format_dynamic() {
let ev = parse_channel_line("debug1: channel 9: new dynamic-tcpip [client] (timeout: 5)")
.unwrap();
assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
}
#[test]
fn parse_channel_modern_openssh_format_internal_listener_is_other() {
let ev = parse_channel_line(
"debug1: channel 0: new port-listener [::1:8080] (inactive timeout: 0)",
)
.unwrap();
assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
}
#[test]
fn parse_channel_unrelated_line_returns_none() {
assert!(parse_channel_line("debug1: client_input_global_request").is_none());
assert!(parse_channel_line("not even ssh output").is_none());
assert!(parse_channel_line("debug1: channel abc: new").is_none());
assert!(parse_channel_line("debug1: channel 1: confirm").is_none());
}
#[test]
fn record_event_open_increments_counters_for_user_visible_kinds() {
let now = Instant::now();
let mut state = TunnelLiveState::new(now);
state.record_event(user_open(1, now));
assert_eq!(state.total_opens, 1);
assert_eq!(state.active_channels, 1);
assert_eq!(state.peak_concurrent, 1);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
}
#[test]
fn record_event_skips_counters_for_internal_channels() {
let now = Instant::now();
let mut state = TunnelLiveState::new(now);
state.record_event(ChannelEvent {
at: now,
channel_id: 0,
kind: ChannelEventKind::Open,
channel_kind: Some(ChannelKind::Other),
opened_at: None,
});
assert_eq!(state.total_opens, 0);
assert_eq!(state.active_channels, 0);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
assert_eq!(state.events.len(), 1);
}
#[test]
fn sample_activity_writes_peak_into_current_bucket() {
let now = Instant::now();
let mut state = TunnelLiveState::new(now);
state.sample_activity(2);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
state.sample_activity(1);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
state.sample_activity(5);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 5);
}
#[test]
fn sample_activity_clamps_to_u8_max() {
let now = Instant::now();
let mut state = TunnelLiveState::new(now);
state.sample_activity(u32::MAX);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], u8::MAX);
}
#[test]
fn record_event_close_pairs_with_open_for_duration() {
let t0 = Instant::now();
let t1 = t0 + Duration::from_secs(5);
let mut state = TunnelLiveState::new(t0);
state.record_event(user_open(7, t0));
state.record_event(ChannelEvent {
at: t1,
channel_id: 7,
kind: ChannelEventKind::Close,
channel_kind: None,
opened_at: None,
});
assert_eq!(state.active_channels, 0);
let last = state.events.back().unwrap();
assert_eq!(last.kind, ChannelEventKind::Close);
assert_eq!(last.opened_at, Some(t0));
assert_eq!(last.channel_kind, Some(ChannelKind::Direct));
}
#[test]
fn record_event_caps_ringbuffer_at_max() {
let now = Instant::now();
let mut state = TunnelLiveState::new(now);
for i in 0..(MAX_EVENTS as u32 + 5) {
state.record_event(user_open(i, now));
}
assert_eq!(state.events.len(), MAX_EVENTS);
}
#[test]
fn rotate_if_due_shifts_buckets_per_tick() {
let t0 = Instant::now();
let mut state = TunnelLiveState::new(t0);
state.opens_history[HISTORY_BUCKETS - 1] = 7;
state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS));
assert_eq!(state.opens_history[HISTORY_BUCKETS - 2], 7);
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
}
#[test]
fn rotate_if_due_clamps_at_full_window() {
let t0 = Instant::now();
let mut state = TunnelLiveState::new(t0);
state.opens_history[HISTORY_BUCKETS - 1] = 9;
state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS * HISTORY_BUCKETS as u64 * 4));
assert!(state.opens_history.iter().all(|&v| v == 0));
}
#[test]
fn rotate_if_due_noop_within_one_bucket() {
let t0 = Instant::now();
let mut state = TunnelLiveState::new(t0);
state.opens_history[HISTORY_BUCKETS - 1] = 3;
state.rotate_if_due(t0 + Duration::from_millis(BUCKET_SECS * 1000 / 2));
assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 3);
}
#[test]
fn parse_lsof_listen_row() {
let line = "ssh 12345 user 3u IPv4 0xabc 0t0 TCP 127.0.0.1:8080 (LISTEN)";
let row = parse_lsof_row(line).unwrap();
assert_eq!(row.command, "ssh");
assert_eq!(row.pid, 12345);
assert!(row.is_listen);
assert_eq!(row.local_addr, "127.0.0.1");
assert_eq!(row.local_port, 8080);
assert!(row.remote_port.is_none());
}
#[test]
fn parse_lsof_established_row() {
let line =
"curl 23456 user 4u IPv4 0xdef 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)";
let row = parse_lsof_row(line).unwrap();
assert_eq!(row.command, "curl");
assert_eq!(row.pid, 23456);
assert!(!row.is_listen);
assert_eq!(row.local_port, 54321);
assert_eq!(row.remote_port, Some(8080));
}
#[test]
fn parse_lsof_other_states_skipped() {
let line = "x 1 u 0u IPv4 0 0t0 TCP 1.2.3.4:1->5.6.7.8:9 (CLOSE_WAIT)";
assert!(parse_lsof_row(line).is_none());
}
#[test]
fn parse_lsof_output_finds_clients_for_bind_port() {
let txt = "\
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
ssh 12345 u 5u IPv4 0xc 0t0 TCP 127.0.0.1:8080->127.0.0.1:54321 (ESTABLISHED)
";
let ports = vec![("foo".into(), 8080u16, 12345u32)];
let mut seen = HashMap::new();
let now = Instant::now();
let msg = parse_lsof_output(txt, &ports, &mut seen, now);
let peers = msg.clients.get(&8080).expect("clients on 8080");
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].process, "curl");
assert_eq!(peers[0].pid, 23456);
assert!(peers[0].src.contains("54321"));
assert!(msg.conflicts.is_empty());
}
#[test]
fn parse_lsof_output_detects_port_conflict() {
let txt = "\
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
nginx 99999 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
";
let ports = vec![("foo".into(), 8080u16, 12345u32)];
let mut seen = HashMap::new();
let now = Instant::now();
let msg = parse_lsof_output(txt, &ports, &mut seen, now);
let conflict = msg.conflicts.get(&8080).expect("conflict on 8080");
assert_eq!(conflict.process, "nginx");
assert_eq!(conflict.pid, 99999);
}
#[test]
fn parse_lsof_output_skips_own_listen() {
let txt = "\
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
";
let ports = vec![("foo".into(), 8080u16, 12345u32)];
let mut seen = HashMap::new();
let now = Instant::now();
let msg = parse_lsof_output(txt, &ports, &mut seen, now);
assert!(msg.conflicts.is_empty());
}
#[test]
fn parse_lsof_output_first_seen_persists_across_polls() {
let txt = "\
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
";
let ports = vec![("foo".into(), 8080u16, 12345u32)];
let mut seen = HashMap::new();
let t0 = Instant::now();
let msg1 = parse_lsof_output(txt, &ports, &mut seen, t0);
let t1 = t0 + Duration::from_secs(5);
let msg2 = parse_lsof_output(txt, &ports, &mut seen, t1);
let p1 = &msg1.clients[&8080][0];
let p2 = &msg2.clients[&8080][0];
assert_eq!(p1.since, p2.since, "first_seen should be sticky");
}
#[test]
fn split_addr_port_handles_ipv6_brackets() {
let (a, p) = split_addr_port("[::1]:8080").unwrap();
assert_eq!(a, "::1");
assert_eq!(p, 8080);
}
#[test]
fn split_addr_port_handles_ipv4() {
let (a, p) = split_addr_port("127.0.0.1:8080").unwrap();
assert_eq!(a, "127.0.0.1");
assert_eq!(p, 8080);
}
#[test]
fn beautify_process_strips_com_apple_prefix() {
assert_eq!(
beautify_process("com.apple.WebKit.Networking"),
"WebKit.Networking"
);
assert_eq!(beautify_process("com.apple.Safari"), "Safari");
}
#[test]
fn beautify_process_passes_other_names_through_unchanged() {
assert_eq!(beautify_process("curl"), "curl");
assert_eq!(beautify_process("nginx"), "nginx");
assert_eq!(beautify_process("python3"), "python3");
}
#[test]
fn beautify_process_does_not_strip_when_only_prefix() {
assert_eq!(beautify_process("com.apple."), "com.apple.");
}
#[test]
fn parse_lsof_output_unwraps_apple_framework_names() {
let txt = "\
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
com.apple.WebKit.Networking 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
";
let ports = vec![("foo".into(), 8080u16, 12345u32)];
let mut seen = HashMap::new();
let now = Instant::now();
let msg = parse_lsof_output(txt, &ports, &mut seen, now);
let peers = msg.clients.get(&8080).expect("clients on 8080");
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].process, "WebKit.Networking");
}
}