1use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader};
16#[cfg(target_os = "macos")]
17use std::process::Stdio;
18use std::process::{ChildStderr, Command};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::mpsc::Sender;
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::{Duration, Instant};
24
25pub const MAX_EVENTS: usize = 50;
27pub const BUCKET_SECS: u64 = 2;
32
33pub const HISTORY_BUCKETS: usize = 150;
38pub const STDERR_BUFFER_LINES: usize = 10;
40pub const MAX_CLIENTS_PER_PORT: usize = 64;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ChannelEventKind {
48 Open,
49 Close,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ChannelKind {
62 Direct,
64 Forwarded,
67 Dynamic,
69 Other,
72}
73
74impl ChannelKind {
75 pub fn from_bracket(token: &str) -> Self {
77 match token {
78 "direct-tcpip" => Self::Direct,
79 "forwarded-tcpip" => Self::Forwarded,
80 "dynamic-tcpip" => Self::Dynamic,
81 _ => Self::Other,
82 }
83 }
84
85 pub fn is_user_visible(self) -> bool {
88 matches!(self, Self::Direct | Self::Forwarded | Self::Dynamic)
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct ChannelEvent {
94 pub at: Instant,
95 pub channel_id: u32,
96 pub kind: ChannelEventKind,
97 pub channel_kind: Option<ChannelKind>,
102 pub opened_at: Option<Instant>,
105}
106
107pub const PEER_VIZ_BUCKETS: usize = 12;
114
115#[derive(Debug, Clone)]
117pub struct ClientPeer {
118 pub src: String,
119 pub process: String,
120 pub pid: u32,
123 pub since: Instant,
124 pub responsible_app: Option<String>,
130 pub current_rx_bps: u64,
135 pub current_tx_bps: u64,
136 pub bytes_rcvd: Option<u64>,
139 pub bytes_sent: Option<u64>,
140 pub last_sample_at: Option<Instant>,
142}
143
144#[derive(Debug, Clone)]
150#[allow(dead_code)]
151pub struct PortConflict {
152 pub port: u16,
153 pub process: String,
154 pub pid: u32,
155}
156
157pub struct TunnelLiveState {
161 pub events: VecDeque<ChannelEvent>,
162 pub opens_history: [u8; HISTORY_BUCKETS],
163 pub history_last_rotate: Instant,
166 pub peak_concurrent: u32,
167 pub total_opens: u32,
168 pub last_event_at: Option<Instant>,
169 pub active_channels: u32,
170 pub channel_open: HashMap<u32, (Instant, ChannelKind)>,
173 pub last_exit: Option<(i32, String)>,
175 pub stderr_buffer: Arc<Mutex<VecDeque<String>>>,
178 pub parser_thread: Option<JoinHandle<()>>,
180 pub parser_stop: Arc<AtomicBool>,
184
185 pub rx_history: [u64; HISTORY_BUCKETS],
188 pub tx_history: [u64; HISTORY_BUCKETS],
191 pub current_rx_bps: u64,
195 pub current_tx_bps: u64,
196 pub peak_rx_bps: u64,
199 pub peak_tx_bps: u64,
200 pub last_throughput_at: Option<Instant>,
204}
205
206impl TunnelLiveState {
207 pub fn new(started_at: Instant) -> Self {
208 Self {
209 events: VecDeque::with_capacity(MAX_EVENTS),
210 opens_history: [0u8; HISTORY_BUCKETS],
211 history_last_rotate: started_at,
212 peak_concurrent: 0,
213 total_opens: 0,
214 last_event_at: None,
215 active_channels: 0,
216 channel_open: HashMap::new(),
217 last_exit: None,
218 stderr_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_BUFFER_LINES))),
219 parser_thread: None,
220 parser_stop: Arc::new(AtomicBool::new(false)),
221 rx_history: [0u64; HISTORY_BUCKETS],
222 tx_history: [0u64; HISTORY_BUCKETS],
223 current_rx_bps: 0,
224 current_tx_bps: 0,
225 peak_rx_bps: 0,
226 peak_tx_bps: 0,
227 last_throughput_at: None,
228 }
229 }
230
231 pub fn record_event(&mut self, mut event: ChannelEvent) {
240 self.rotate_if_due(event.at);
241 match event.kind {
242 ChannelEventKind::Open => {
243 let kind = event.channel_kind.unwrap_or(ChannelKind::Other);
244 if kind.is_user_visible() {
245 self.total_opens = self.total_opens.saturating_add(1);
246 self.active_channels = self.active_channels.saturating_add(1);
247 self.peak_concurrent = self.peak_concurrent.max(self.active_channels);
248 }
249 self.channel_open.insert(event.channel_id, (event.at, kind));
250 }
251 ChannelEventKind::Close => {
252 if let Some((opened_at, kind)) = self.channel_open.remove(&event.channel_id) {
253 event.opened_at = Some(opened_at);
254 event.channel_kind = Some(kind);
255 if kind.is_user_visible() {
256 self.active_channels = self.active_channels.saturating_sub(1);
257 }
258 }
259 }
260 }
261 self.last_event_at = Some(event.at);
262 if self.events.len() == MAX_EVENTS {
263 self.events.pop_front();
264 }
265 self.events.push_back(event);
266 log::debug!(
267 "[purple] Tunnel live event: total_opens={} active={} peak={}",
268 self.total_opens,
269 self.active_channels,
270 self.peak_concurrent
271 );
272 }
273
274 pub fn rotate_if_due(&mut self, now: Instant) {
280 let elapsed = now.saturating_duration_since(self.history_last_rotate);
281 let ticks = elapsed.as_secs() / BUCKET_SECS;
282 if ticks == 0 {
283 return;
284 }
285 let shift = (ticks as usize).min(HISTORY_BUCKETS);
286 if shift >= HISTORY_BUCKETS {
287 self.opens_history.fill(0);
288 self.rx_history.fill(0);
289 self.tx_history.fill(0);
290 } else {
291 self.opens_history.rotate_left(shift);
292 for slot in self.opens_history.iter_mut().rev().take(shift) {
293 *slot = 0;
294 }
295 self.rx_history.rotate_left(shift);
296 for slot in self.rx_history.iter_mut().rev().take(shift) {
297 *slot = 0;
298 }
299 self.tx_history.rotate_left(shift);
300 for slot in self.tx_history.iter_mut().rev().take(shift) {
301 *slot = 0;
302 }
303 }
304 self.history_last_rotate += Duration::from_secs(ticks * BUCKET_SECS);
305 }
306
307 pub fn sample_activity(&mut self, concurrent: u32) {
314 let sample = u8::try_from(concurrent).unwrap_or(u8::MAX);
315 if let Some(last) = self.opens_history.last_mut() {
316 *last = (*last).max(sample);
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub struct ParserMessage {
324 pub alias: String,
325 pub event: ChannelEvent,
326}
327
328#[derive(Debug, Clone)]
331pub struct LsofMessage {
332 pub at: Instant,
333 pub clients: HashMap<u16, Vec<ClientPeer>>,
334 pub conflicts: HashMap<u16, PortConflict>,
335}
336
337impl LsofMessage {
338 pub fn empty(at: Instant) -> Self {
339 Self {
340 at,
341 clients: HashMap::new(),
342 conflicts: HashMap::new(),
343 }
344 }
345}
346
347pub struct LsofPollerHandle {
349 pub stop: Arc<AtomicBool>,
350 pub bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
351 pub thread: Option<JoinHandle<()>>,
352}
353
354impl LsofPollerHandle {
355 pub fn shutdown(&mut self) {
356 self.stop.store(true, Ordering::Relaxed);
357 if let Some(handle) = self.thread.take() {
358 let _ = handle.join();
359 }
360 }
361}
362
363pub fn spawn_parser_thread(
367 stderr: ChildStderr,
368 alias: String,
369 tx: Sender<ParserMessage>,
370 stderr_buffer: Arc<Mutex<VecDeque<String>>>,
371 stop: Arc<AtomicBool>,
372) -> JoinHandle<()> {
373 thread::Builder::new()
374 .name(format!("purple-tunnel-parser-{alias}"))
375 .spawn(move || {
376 let reader = BufReader::new(stderr);
377 for line in reader.lines() {
378 if stop.load(Ordering::Relaxed) {
379 break;
380 }
381 let Ok(line) = line else { break };
382 if let Ok(mut buf) = stderr_buffer.lock() {
383 if buf.len() == STDERR_BUFFER_LINES {
384 buf.pop_front();
385 }
386 buf.push_back(line.clone());
387 }
388 if let Some(event) = parse_channel_line(&line) {
389 let msg = ParserMessage {
390 alias: alias.clone(),
391 event,
392 };
393 if tx.send(msg).is_err() {
394 break;
395 }
396 }
397 }
398 log::debug!("[purple] Tunnel parser thread exit: alias={alias}");
399 })
400 .expect("spawn purple-tunnel-parser thread")
401}
402
403pub fn parse_channel_line(line: &str) -> Option<ChannelEvent> {
416 let trimmed = line.trim_start();
417 let rest = trimmed.strip_prefix("debug1: channel ")?;
418 let (id_str, after) = rest.split_once(':')?;
419 let channel_id: u32 = id_str.trim().parse().ok()?;
420 let after = after.trim_start();
421 let (kind, channel_kind) = if let Some(after_new) = after.strip_prefix("new") {
422 let after_new = after_new.trim_start();
423 let ctype = if let Some(rest) = after_new.strip_prefix('[') {
424 rest.split_once(']').map(|(t, _)| t.trim().to_string())
426 } else {
427 after_new
431 .split_whitespace()
432 .next()
433 .map(|s| s.to_string())
434 .filter(|s| !s.is_empty())
435 };
436 let chan_kind = ctype.as_deref().map(ChannelKind::from_bracket);
437 (ChannelEventKind::Open, chan_kind)
438 } else if after.starts_with("free") {
439 (ChannelEventKind::Close, None)
440 } else {
441 return None;
442 };
443 Some(ChannelEvent {
444 at: Instant::now(),
445 channel_id,
446 kind,
447 channel_kind,
448 opened_at: None,
449 })
450}
451
452#[cfg(any(target_os = "macos", target_os = "linux"))]
456pub fn spawn_lsof_poller(
457 bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
458 tx: Sender<LsofMessage>,
459 stop: Arc<AtomicBool>,
460) -> JoinHandle<()> {
461 thread::Builder::new()
462 .name("purple-tunnel-lsof".into())
463 .spawn(move || {
464 let mut first_seen: HashMap<(u16, String), Instant> = HashMap::new();
467 let mut responsible_cache = ResponsibleAppCache::default();
470 let mut peer_state: HashMap<(u16, String), PeerSampleCache> = HashMap::new();
473 while !stop.load(Ordering::Relaxed) {
474 let ports: Vec<(String, u16, u32)> = match bind_ports.lock() {
475 Ok(g) => g.clone(),
476 Err(p) => p.into_inner().clone(),
477 };
478 if ports.is_empty() {
479 thread::sleep(Duration::from_millis(500));
480 continue;
481 }
482 let now = Instant::now();
483 let mut msg = run_lsof_once(&ports, &mut first_seen, now);
484 annotate_responsible_apps(&mut msg, &mut responsible_cache);
485 annotate_peer_throughput(&mut msg, &mut peer_state, now);
486 if tx.send(msg).is_err() {
487 break;
488 }
489 for _ in 0..20 {
490 if stop.load(Ordering::Relaxed) {
491 break;
492 }
493 thread::sleep(Duration::from_millis(100));
494 }
495 }
496 log::debug!("[purple] Tunnel lsof poller thread exit");
497 })
498 .expect("spawn purple-tunnel-lsof thread")
499}
500
501#[cfg(not(any(target_os = "macos", target_os = "linux")))]
504pub fn spawn_lsof_poller(
505 _bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
506 _tx: Sender<LsofMessage>,
507 stop: Arc<AtomicBool>,
508) -> JoinHandle<()> {
509 thread::spawn(move || {
510 while !stop.load(Ordering::Relaxed) {
511 thread::sleep(Duration::from_millis(500));
512 }
513 })
514}
515
516#[cfg(target_os = "macos")]
529unsafe extern "C" {
530 fn responsibility_get_pid_responsible_for_pid(pid: libc::pid_t) -> libc::pid_t;
531}
532
533#[cfg(target_os = "macos")]
541fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
542 let rpid = unsafe { responsibility_get_pid_responsible_for_pid(pid as libc::pid_t) };
546 if rpid <= 0 {
547 return None;
548 }
549 if rpid as u32 == pid {
550 return None;
551 }
552 let name = process_name(rpid as u32)?;
553 if name.eq_ignore_ascii_case(client_process) {
554 return None;
555 }
556 Some(name)
557}
558
559#[cfg(target_os = "macos")]
564fn process_name(pid: u32) -> Option<String> {
565 unsafe extern "C" {
566 fn proc_name(pid: libc::c_int, buffer: *mut libc::c_void, buffersize: u32) -> libc::c_int;
567 }
568 let mut buf = [0u8; 256];
569 let n = unsafe {
573 proc_name(
574 pid as libc::c_int,
575 buf.as_mut_ptr().cast(),
576 buf.len() as u32,
577 )
578 };
579 if n <= 0 {
580 return None;
581 }
582 let bytes = &buf[..(n as usize).min(buf.len())];
583 let s = std::str::from_utf8(bytes).ok()?.trim_end_matches('\0');
584 if s.is_empty() {
585 None
586 } else {
587 Some(beautify_process(s))
588 }
589}
590
591#[cfg(target_os = "linux")]
603fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
604 let session_id = read_session_leader(pid)?;
605 if session_id == pid {
606 return None;
607 }
608 let comm = std::fs::read_to_string(format!("/proc/{}/comm", session_id)).ok()?;
609 let name = comm.trim();
610 if name.is_empty() || name.eq_ignore_ascii_case(client_process) {
611 return None;
612 }
613 Some(beautify_process(name))
614}
615
616#[cfg(target_os = "linux")]
621fn read_session_leader(pid: u32) -> Option<u32> {
622 let stat = std::fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
623 let close = stat.rfind(')')?;
624 let after = stat[close + 1..].trim();
625 let fields: Vec<&str> = after.split_whitespace().collect();
626 fields.get(3).and_then(|s| s.parse().ok())
627}
628
629#[cfg(not(any(target_os = "macos", target_os = "linux")))]
630fn lookup_responsible_app(_pid: u32, _client_process: &str) -> Option<String> {
631 None
632}
633
634#[derive(Default)]
641struct ResponsibleAppCache {
642 map: HashMap<u32, Option<String>>,
643}
644
645impl ResponsibleAppCache {
646 fn resolve(&mut self, pid: u32, client_process: &str) -> Option<String> {
647 if let Some(cached) = self.map.get(&pid) {
648 return cached.clone();
649 }
650 let resolved = lookup_responsible_app(pid, client_process);
651 self.map.insert(pid, resolved.clone());
652 resolved
653 }
654
655 fn retain_pids(&mut self, live: &std::collections::HashSet<u32>) {
658 self.map.retain(|pid, _| live.contains(pid));
659 }
660}
661
662fn annotate_responsible_apps(msg: &mut LsofMessage, cache: &mut ResponsibleAppCache) {
667 let mut live: std::collections::HashSet<u32> = std::collections::HashSet::new();
668 for peers in msg.clients.values_mut() {
669 for peer in peers.iter_mut() {
670 live.insert(peer.pid);
671 peer.responsible_app = cache.resolve(peer.pid, &peer.process);
672 }
673 }
674 cache.retain_pids(&live);
675}
676
677#[derive(Debug, Clone, Default)]
682struct PeerSampleCache {
683 last_rcvd: u64,
684 last_sent: u64,
685 last_at: Option<Instant>,
686}
687
688#[derive(Debug, Default)]
693struct PerPeerSamples {
694 per_local_port_cumulative: HashMap<u16, (u64, u64)>,
701 per_pid_bps: HashMap<u32, (u64, u64)>,
706}
707
708fn annotate_peer_throughput(
716 msg: &mut LsofMessage,
717 cache: &mut HashMap<(u16, String), PeerSampleCache>,
718 now: Instant,
719) {
720 let samples = sample_peer_throughput();
721 let mut live: std::collections::HashSet<(u16, String)> = std::collections::HashSet::new();
722
723 for (port, peers) in msg.clients.iter_mut() {
724 for peer in peers.iter_mut() {
725 let key = (*port, peer.src.clone());
726 live.insert(key.clone());
727 let entry = cache.entry(key).or_default();
728 let src_port = src_port_from(&peer.src);
729
730 let cumulative =
732 src_port.and_then(|p| samples.per_local_port_cumulative.get(&p).copied());
733 if let Some((rcvd, sent)) = cumulative {
734 if let Some(prev_at) = entry.last_at {
735 let dt = now.saturating_duration_since(prev_at).as_secs_f64();
736 if dt > 0.0 {
737 let rx_bps = ((rcvd.saturating_sub(entry.last_rcvd)) as f64 / dt) as u64;
738 let tx_bps = ((sent.saturating_sub(entry.last_sent)) as f64 / dt) as u64;
739 peer.current_rx_bps = rx_bps;
740 peer.current_tx_bps = tx_bps;
741 }
742 }
743 entry.last_rcvd = rcvd;
744 entry.last_sent = sent;
745 entry.last_at = Some(now);
746 peer.bytes_rcvd = Some(rcvd);
747 peer.bytes_sent = Some(sent);
748 peer.last_sample_at = Some(now);
749 continue;
750 }
751
752 if let Some((rx_bps, tx_bps)) = samples.per_pid_bps.get(&peer.pid).copied() {
754 peer.current_rx_bps = rx_bps;
755 peer.current_tx_bps = tx_bps;
756 entry.last_at = Some(now);
757 peer.last_sample_at = Some(now);
758 }
759 }
760 }
761
762 cache.retain(|key, _| live.contains(key));
763}
764
765fn sample_peer_throughput() -> PerPeerSamples {
773 let mut out = PerPeerSamples::default();
774 #[cfg(target_os = "linux")]
775 {
776 out.per_local_port_cumulative = crate::tcp_diag::sample_per_local_port();
777 }
778 #[cfg(target_os = "macos")]
779 {
780 out.per_pid_bps = sample_nettop_per_pid_macos();
781 }
782 out
783}
784
785#[cfg(target_os = "macos")]
795fn sample_nettop_per_pid_macos() -> HashMap<u32, (u64, u64)> {
796 use std::time::Duration;
797 let output = Command::new("/usr/bin/nettop")
798 .args(["-P", "-d", "-x", "-s", "1", "-L", "2"])
799 .stdin(Stdio::null())
800 .stderr(Stdio::null())
801 .output();
802 let text = match output {
803 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).into_owned(),
804 _ => return HashMap::new(),
805 };
806 let mut blocks: Vec<&str> = Vec::new();
811 let mut start = 0;
812 for (i, line) in text.lines().enumerate() {
813 if line.starts_with("time,") {
814 blocks.push(&text[start..]);
815 start = text.lines().take(i).map(|l| l.len() + 1).sum();
816 }
817 }
818 let _ = Duration::ZERO; let delta_block = if text.matches("\ntime,").count() >= 1 {
820 let first = text.find("time,").unwrap_or(0);
822 let second_rel = text[first + 5..].find("\ntime,");
823 match second_rel {
824 Some(off) => &text[first + 5 + off + 1..],
825 None => &text[first..],
826 }
827 } else {
828 text.as_str()
829 };
830 let mut out: HashMap<u32, (u64, u64)> = HashMap::new();
831 for line in delta_block.lines() {
832 if let Some((pid, rx, tx)) = parse_nettop_csv_row_per_pid(line) {
833 let entry = out.entry(pid).or_insert((0, 0));
836 entry.0 = entry.0.saturating_add(rx);
837 entry.1 = entry.1.saturating_add(tx);
838 }
839 }
840 out
841}
842
843#[cfg(target_os = "macos")]
849fn parse_nettop_csv_row_per_pid(line: &str) -> Option<(u32, u64, u64)> {
850 let line = line.trim();
851 if line.is_empty() || line.starts_with("time,") {
852 return None;
853 }
854 let cols: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
855 if cols.len() < 6 {
856 return None;
857 }
858 let proc_pid = cols[1];
859 let dot = proc_pid.rfind('.')?;
860 let pid_str = &proc_pid[dot + 1..];
861 let pid: u32 = pid_str.parse().ok()?;
862 let rx: u64 = cols[4].parse().ok()?;
863 let tx: u64 = cols[5].parse().ok()?;
864 Some((pid, rx, tx))
865}
866
867fn src_port_from(src: &str) -> Option<u16> {
869 src.rsplit_once(':').and_then(|(_, port)| port.parse().ok())
870}
871
872#[cfg(any(target_os = "macos", target_os = "linux"))]
873fn run_lsof_once(
874 ports: &[(String, u16, u32)],
875 first_seen: &mut HashMap<(u16, String), Instant>,
876 now: Instant,
877) -> LsofMessage {
878 let output = Command::new("lsof")
883 .args(["-iTCP", "-P", "-n", "-w", "+c", "0"])
884 .output();
885 let stdout = match output {
886 Ok(o) if o.status.success() || !o.stdout.is_empty() => o.stdout,
887 _ => return LsofMessage::empty(now),
888 };
889 let text = String::from_utf8_lossy(&stdout);
890 parse_lsof_output(&text, ports, first_seen, now)
891}
892
893pub fn parse_lsof_output(
895 text: &str,
896 ports: &[(String, u16, u32)],
897 first_seen: &mut HashMap<(u16, String), Instant>,
898 now: Instant,
899) -> LsofMessage {
900 let mut clients: HashMap<u16, Vec<ClientPeer>> = HashMap::new();
901 let mut conflicts: HashMap<u16, PortConflict> = HashMap::new();
902 let bind_ports: Vec<u16> = ports.iter().map(|(_, p, _)| *p).collect();
903 let tunnel_pids: Vec<u32> = ports.iter().map(|(_, _, pid)| *pid).collect();
904
905 for line in text.lines().skip(1) {
906 let row = match parse_lsof_row(line) {
907 Some(r) => r,
908 None => continue,
909 };
910 if row.is_listen && bind_ports.contains(&row.local_port) && !tunnel_pids.contains(&row.pid)
912 {
913 conflicts
914 .entry(row.local_port)
915 .or_insert_with(|| PortConflict {
916 port: row.local_port,
917 process: row.command.clone(),
918 pid: row.pid,
919 });
920 continue;
921 }
922 if row.is_listen {
923 continue;
924 }
925 if let Some(remote_port) = row.remote_port {
928 if bind_ports.contains(&remote_port) && !tunnel_pids.contains(&row.pid) {
929 let src = row.local_addr_port().unwrap_or_else(|| "?".to_string());
930 let key = (remote_port, src.clone());
931 let since = *first_seen.entry(key).or_insert(now);
932 let entry = clients.entry(remote_port).or_default();
933 if entry.len() >= MAX_CLIENTS_PER_PORT {
934 continue;
935 }
936 entry.push(ClientPeer {
937 src,
938 process: beautify_process(&row.command),
939 pid: row.pid,
940 since,
941 responsible_app: None,
942 current_rx_bps: 0,
943 current_tx_bps: 0,
944 bytes_rcvd: None,
945 bytes_sent: None,
946 last_sample_at: None,
947 });
948 }
949 }
950 }
951 let live: std::collections::HashSet<(u16, String)> = clients
954 .iter()
955 .flat_map(|(port, peers)| peers.iter().map(move |p| (*port, p.src.clone())))
956 .collect();
957 first_seen.retain(|key, _| live.contains(key));
958
959 LsofMessage {
960 at: now,
961 clients,
962 conflicts,
963 }
964}
965
966#[derive(Debug)]
968struct LsofRow {
969 command: String,
970 pid: u32,
971 is_listen: bool,
972 local_addr: String,
973 local_port: u16,
974 remote_addr: Option<String>,
975 remote_port: Option<u16>,
976}
977
978impl LsofRow {
979 fn local_addr_port(&self) -> Option<String> {
980 if let (Some(addr), Some(port)) = (self.remote_addr.as_deref(), self.remote_port) {
981 let _ = (addr, port);
986 }
987 Some(format!("{}:{}", self.local_addr, self.local_port))
988 }
989}
990
991fn parse_lsof_row(line: &str) -> Option<LsofRow> {
992 if line.trim().is_empty() {
993 return None;
994 }
995 let mut fields = line.split_whitespace();
1000 let command = fields.next()?.to_string();
1001 let pid: u32 = fields.next()?.parse().ok()?;
1002 let _user = fields.next()?;
1003 let _fd = fields.next()?;
1004 let _ty = fields.next()?;
1005 let _dev = fields.next()?;
1006 let _size = fields.next()?;
1007 let _node = fields.next()?;
1008 let name = fields.next()?;
1009 let state = fields.next();
1010 if !name.contains(':') {
1011 return None;
1012 }
1013 let is_listen = matches!(state, Some(s) if s.contains("LISTEN"));
1014 let is_established = matches!(state, Some(s) if s.contains("ESTABLISHED"));
1015 if !is_listen && !is_established {
1016 return None;
1017 }
1018 let (local, remote) = match name.split_once("->") {
1019 Some((l, r)) => (l, Some(r)),
1020 None => (name, None),
1021 };
1022 let (local_addr, local_port) = split_addr_port(local)?;
1023 let (remote_addr, remote_port) = match remote {
1024 Some(r) => match split_addr_port(r) {
1025 Some((a, p)) => (Some(a), Some(p)),
1026 None => (None, None),
1027 },
1028 None => (None, None),
1029 };
1030 Some(LsofRow {
1031 command,
1032 pid,
1033 is_listen,
1034 local_addr,
1035 local_port,
1036 remote_addr,
1037 remote_port,
1038 })
1039}
1040
1041pub fn beautify_process(raw: &str) -> String {
1048 if let Some(rest) = raw.strip_prefix("com.apple.") {
1049 if !rest.is_empty() {
1050 return rest.to_string();
1051 }
1052 }
1053 raw.to_string()
1054}
1055
1056fn split_addr_port(s: &str) -> Option<(String, u16)> {
1058 if let Some(rest) = s.strip_prefix('[') {
1059 let end = rest.find(']')?;
1060 let addr = &rest[..end];
1061 let after = &rest[end + 1..];
1062 let port_str = after.strip_prefix(':')?;
1063 let port: u16 = port_str.parse().ok()?;
1064 return Some((addr.to_string(), port));
1065 }
1066 let colon = s.rfind(':')?;
1067 let addr = &s[..colon];
1068 let port: u16 = s[colon + 1..].parse().ok()?;
1069 Some((addr.to_string(), port))
1070}
1071
1072#[derive(Debug, Clone)]
1080#[allow(dead_code)]
1081pub struct TunnelLiveSnapshot {
1082 pub uptime_secs: u64,
1083 pub active_channels: u32,
1084 pub peak_concurrent: u32,
1085 pub total_opens: u32,
1086 pub idle_secs: u64,
1087 pub rx_history: [u64; HISTORY_BUCKETS],
1088 pub tx_history: [u64; HISTORY_BUCKETS],
1089 pub current_rx_bps: u64,
1090 pub current_tx_bps: u64,
1091 pub peak_rx_bps: u64,
1092 pub peak_tx_bps: u64,
1093 pub throughput_ready: bool,
1096 pub clients: Vec<DisplayClient>,
1097 pub events: Vec<DisplayEvent>,
1098 pub currently_open: Vec<(u32, u64, ChannelKind)>,
1102 pub conflict: Option<PortConflict>,
1103 pub last_exit: Option<(i32, String)>,
1104}
1105
1106#[derive(Debug, Clone)]
1107pub struct DisplayClient {
1108 #[allow(dead_code)]
1112 pub src: String,
1113 pub process: String,
1114 pub age_secs: u64,
1115 #[allow(dead_code)]
1118 pub pid: u32,
1119 pub responsible_app: Option<String>,
1122 pub current_rx_bps: u64,
1126 pub current_tx_bps: u64,
1127 pub viz_history: [u64; PEER_VIZ_BUCKETS],
1130 pub throughput_ready: bool,
1134}
1135
1136#[derive(Debug, Clone)]
1137#[allow(dead_code)]
1138pub struct DisplayEvent {
1139 pub age_secs: u64,
1140 #[allow(dead_code)]
1144 pub channel_id: u32,
1145 pub kind: ChannelEventKind,
1146 pub channel_kind: ChannelKind,
1150 #[allow(dead_code)]
1153 pub duration_secs: Option<u64>,
1154 pub count: u32,
1158}
1159
1160#[cfg(test)]
1165mod tests {
1166 use super::*;
1167
1168 fn user_open(channel_id: u32, at: Instant) -> ChannelEvent {
1169 ChannelEvent {
1170 at,
1171 channel_id,
1172 kind: ChannelEventKind::Open,
1173 channel_kind: Some(ChannelKind::Direct),
1174 opened_at: None,
1175 }
1176 }
1177
1178 #[test]
1179 fn parse_channel_open_simple() {
1180 let ev = parse_channel_line("debug1: channel 0: new [direct-tcpip]").unwrap();
1181 assert_eq!(ev.channel_id, 0);
1182 assert_eq!(ev.kind, ChannelEventKind::Open);
1183 assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1184 }
1185
1186 #[test]
1187 fn parse_channel_open_records_listener_kind() {
1188 let ev = parse_channel_line("debug1: channel 1: new [port listener]").unwrap();
1189 assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1190 }
1191
1192 #[test]
1193 fn parse_channel_open_records_dynamic_kind() {
1194 let ev = parse_channel_line("debug1: channel 4: new [dynamic-tcpip]").unwrap();
1195 assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1196 }
1197
1198 #[test]
1199 fn parse_channel_close_simple() {
1200 let ev = parse_channel_line("debug1: channel 12: free: blah blah").unwrap();
1201 assert_eq!(ev.channel_id, 12);
1202 assert_eq!(ev.kind, ChannelEventKind::Close);
1203 assert_eq!(ev.channel_kind, None);
1206 }
1207
1208 #[test]
1209 fn parse_channel_with_leading_whitespace() {
1210 let ev = parse_channel_line(" debug1: channel 5: new [forwarded-tcpip]").unwrap();
1211 assert_eq!(ev.channel_id, 5);
1212 assert_eq!(ev.kind, ChannelEventKind::Open);
1213 assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1214 }
1215
1216 #[test]
1217 fn parse_channel_modern_openssh_format_with_inactive_timeout() {
1218 let ev = parse_channel_line(
1224 "debug1: channel 3: new direct-tcpip [127.0.0.1:54321] (inactive timeout: 0)",
1225 )
1226 .unwrap();
1227 assert_eq!(ev.channel_id, 3);
1228 assert_eq!(ev.kind, ChannelEventKind::Open);
1229 assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1230 }
1231
1232 #[test]
1233 fn parse_channel_modern_openssh_format_forwarded() {
1234 let ev = parse_channel_line(
1235 "debug1: channel 7: new forwarded-tcpip [10.0.0.1:443] (inactive timeout: 0)",
1236 )
1237 .unwrap();
1238 assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1239 }
1240
1241 #[test]
1242 fn parse_channel_modern_openssh_format_dynamic() {
1243 let ev = parse_channel_line("debug1: channel 9: new dynamic-tcpip [client] (timeout: 5)")
1244 .unwrap();
1245 assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1246 }
1247
1248 #[test]
1249 fn parse_channel_modern_openssh_format_internal_listener_is_other() {
1250 let ev = parse_channel_line(
1254 "debug1: channel 0: new port-listener [::1:8080] (inactive timeout: 0)",
1255 )
1256 .unwrap();
1257 assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1258 }
1259
1260 #[test]
1261 fn parse_channel_unrelated_line_returns_none() {
1262 assert!(parse_channel_line("debug1: client_input_global_request").is_none());
1263 assert!(parse_channel_line("not even ssh output").is_none());
1264 assert!(parse_channel_line("debug1: channel abc: new").is_none());
1265 assert!(parse_channel_line("debug1: channel 1: confirm").is_none());
1266 }
1267
1268 #[test]
1269 fn record_event_open_increments_counters_for_user_visible_kinds() {
1270 let now = Instant::now();
1271 let mut state = TunnelLiveState::new(now);
1272 state.record_event(user_open(1, now));
1273 assert_eq!(state.total_opens, 1);
1274 assert_eq!(state.active_channels, 1);
1275 assert_eq!(state.peak_concurrent, 1);
1276 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1278 }
1279
1280 #[test]
1281 fn record_event_skips_counters_for_internal_channels() {
1282 let now = Instant::now();
1286 let mut state = TunnelLiveState::new(now);
1287 state.record_event(ChannelEvent {
1288 at: now,
1289 channel_id: 0,
1290 kind: ChannelEventKind::Open,
1291 channel_kind: Some(ChannelKind::Other),
1292 opened_at: None,
1293 });
1294 assert_eq!(state.total_opens, 0);
1295 assert_eq!(state.active_channels, 0);
1296 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1297 assert_eq!(state.events.len(), 1);
1299 }
1300
1301 #[test]
1302 fn sample_activity_writes_peak_into_current_bucket() {
1303 let now = Instant::now();
1304 let mut state = TunnelLiveState::new(now);
1305 state.sample_activity(2);
1306 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1307 state.sample_activity(1);
1309 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1310 state.sample_activity(5);
1312 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 5);
1313 }
1314
1315 #[test]
1316 fn sample_activity_clamps_to_u8_max() {
1317 let now = Instant::now();
1318 let mut state = TunnelLiveState::new(now);
1319 state.sample_activity(u32::MAX);
1320 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], u8::MAX);
1321 }
1322
1323 #[test]
1324 fn record_event_close_pairs_with_open_for_duration() {
1325 let t0 = Instant::now();
1326 let t1 = t0 + Duration::from_secs(5);
1327 let mut state = TunnelLiveState::new(t0);
1328 state.record_event(user_open(7, t0));
1329 state.record_event(ChannelEvent {
1330 at: t1,
1331 channel_id: 7,
1332 kind: ChannelEventKind::Close,
1333 channel_kind: None,
1334 opened_at: None,
1335 });
1336 assert_eq!(state.active_channels, 0);
1337 let last = state.events.back().unwrap();
1338 assert_eq!(last.kind, ChannelEventKind::Close);
1339 assert_eq!(last.opened_at, Some(t0));
1340 assert_eq!(last.channel_kind, Some(ChannelKind::Direct));
1342 }
1343
1344 #[test]
1345 fn record_event_caps_ringbuffer_at_max() {
1346 let now = Instant::now();
1347 let mut state = TunnelLiveState::new(now);
1348 for i in 0..(MAX_EVENTS as u32 + 5) {
1349 state.record_event(user_open(i, now));
1350 }
1351 assert_eq!(state.events.len(), MAX_EVENTS);
1352 }
1353
1354 #[test]
1355 fn rotate_if_due_shifts_buckets_per_tick() {
1356 let t0 = Instant::now();
1357 let mut state = TunnelLiveState::new(t0);
1358 state.opens_history[HISTORY_BUCKETS - 1] = 7;
1359 state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS));
1360 assert_eq!(state.opens_history[HISTORY_BUCKETS - 2], 7);
1362 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1363 }
1364
1365 #[test]
1366 fn rotate_if_due_clamps_at_full_window() {
1367 let t0 = Instant::now();
1368 let mut state = TunnelLiveState::new(t0);
1369 state.opens_history[HISTORY_BUCKETS - 1] = 9;
1370 state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS * HISTORY_BUCKETS as u64 * 4));
1372 assert!(state.opens_history.iter().all(|&v| v == 0));
1373 }
1374
1375 #[test]
1376 fn rotate_if_due_noop_within_one_bucket() {
1377 let t0 = Instant::now();
1378 let mut state = TunnelLiveState::new(t0);
1379 state.opens_history[HISTORY_BUCKETS - 1] = 3;
1380 state.rotate_if_due(t0 + Duration::from_millis(BUCKET_SECS * 1000 / 2));
1382 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 3);
1383 }
1384
1385 #[test]
1386 fn parse_lsof_listen_row() {
1387 let line = "ssh 12345 user 3u IPv4 0xabc 0t0 TCP 127.0.0.1:8080 (LISTEN)";
1388 let row = parse_lsof_row(line).unwrap();
1389 assert_eq!(row.command, "ssh");
1390 assert_eq!(row.pid, 12345);
1391 assert!(row.is_listen);
1392 assert_eq!(row.local_addr, "127.0.0.1");
1393 assert_eq!(row.local_port, 8080);
1394 assert!(row.remote_port.is_none());
1395 }
1396
1397 #[test]
1398 fn parse_lsof_established_row() {
1399 let line =
1400 "curl 23456 user 4u IPv4 0xdef 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)";
1401 let row = parse_lsof_row(line).unwrap();
1402 assert_eq!(row.command, "curl");
1403 assert_eq!(row.pid, 23456);
1404 assert!(!row.is_listen);
1405 assert_eq!(row.local_port, 54321);
1406 assert_eq!(row.remote_port, Some(8080));
1407 }
1408
1409 #[test]
1410 fn parse_lsof_other_states_skipped() {
1411 let line = "x 1 u 0u IPv4 0 0t0 TCP 1.2.3.4:1->5.6.7.8:9 (CLOSE_WAIT)";
1413 assert!(parse_lsof_row(line).is_none());
1414 }
1415
1416 #[test]
1417 fn parse_lsof_output_finds_clients_for_bind_port() {
1418 let txt = "\
1419COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1420ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1421curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1422ssh 12345 u 5u IPv4 0xc 0t0 TCP 127.0.0.1:8080->127.0.0.1:54321 (ESTABLISHED)
1423";
1424 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1425 let mut seen = HashMap::new();
1426 let now = Instant::now();
1427 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1428 let peers = msg.clients.get(&8080).expect("clients on 8080");
1429 assert_eq!(peers.len(), 1);
1430 assert_eq!(peers[0].process, "curl");
1431 assert_eq!(peers[0].pid, 23456);
1432 assert!(peers[0].src.contains("54321"));
1433 assert!(msg.conflicts.is_empty());
1434 }
1435
1436 #[test]
1437 fn parse_lsof_output_detects_port_conflict() {
1438 let txt = "\
1439COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1440nginx 99999 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1441";
1442 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1443 let mut seen = HashMap::new();
1444 let now = Instant::now();
1445 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1446 let conflict = msg.conflicts.get(&8080).expect("conflict on 8080");
1447 assert_eq!(conflict.process, "nginx");
1448 assert_eq!(conflict.pid, 99999);
1449 }
1450
1451 #[test]
1452 fn parse_lsof_output_skips_own_listen() {
1453 let txt = "\
1454COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1455ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1456";
1457 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1458 let mut seen = HashMap::new();
1459 let now = Instant::now();
1460 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1461 assert!(msg.conflicts.is_empty());
1462 }
1463
1464 #[test]
1465 fn parse_lsof_output_first_seen_persists_across_polls() {
1466 let txt = "\
1467COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1468ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1469curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1470";
1471 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1472 let mut seen = HashMap::new();
1473 let t0 = Instant::now();
1474 let msg1 = parse_lsof_output(txt, &ports, &mut seen, t0);
1475 let t1 = t0 + Duration::from_secs(5);
1476 let msg2 = parse_lsof_output(txt, &ports, &mut seen, t1);
1477 let p1 = &msg1.clients[&8080][0];
1478 let p2 = &msg2.clients[&8080][0];
1479 assert_eq!(p1.since, p2.since, "first_seen should be sticky");
1480 }
1481
1482 #[test]
1483 fn split_addr_port_handles_ipv6_brackets() {
1484 let (a, p) = split_addr_port("[::1]:8080").unwrap();
1485 assert_eq!(a, "::1");
1486 assert_eq!(p, 8080);
1487 }
1488
1489 #[test]
1490 fn split_addr_port_handles_ipv4() {
1491 let (a, p) = split_addr_port("127.0.0.1:8080").unwrap();
1492 assert_eq!(a, "127.0.0.1");
1493 assert_eq!(p, 8080);
1494 }
1495
1496 #[test]
1497 fn beautify_process_strips_com_apple_prefix() {
1498 assert_eq!(
1499 beautify_process("com.apple.WebKit.Networking"),
1500 "WebKit.Networking"
1501 );
1502 assert_eq!(beautify_process("com.apple.Safari"), "Safari");
1503 }
1504
1505 #[test]
1506 fn beautify_process_passes_other_names_through_unchanged() {
1507 assert_eq!(beautify_process("curl"), "curl");
1508 assert_eq!(beautify_process("nginx"), "nginx");
1509 assert_eq!(beautify_process("python3"), "python3");
1510 }
1511
1512 #[test]
1513 fn beautify_process_does_not_strip_when_only_prefix() {
1514 assert_eq!(beautify_process("com.apple."), "com.apple.");
1518 }
1519
1520 #[test]
1521 fn parse_lsof_output_unwraps_apple_framework_names() {
1522 let txt = "\
1523COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1524ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1525com.apple.WebKit.Networking 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1526";
1527 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1528 let mut seen = HashMap::new();
1529 let now = Instant::now();
1530 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1531 let peers = msg.clients.get(&8080).expect("clients on 8080");
1532 assert_eq!(peers.len(), 1);
1533 assert_eq!(peers[0].process, "WebKit.Networking");
1535 }
1536}