1use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader};
16#[cfg(any(target_os = "macos", target_os = "linux"))]
17use std::process::Stdio;
18use std::process::{ChildStderr, Command};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::mpsc::Sender;
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::{Duration, Instant};
24
25pub const MAX_EVENTS: usize = 50;
27pub const BUCKET_SECS: u64 = 2;
32
33pub const HISTORY_BUCKETS: usize = 150;
38pub const STDERR_BUFFER_LINES: usize = 10;
40pub const MAX_CLIENTS_PER_PORT: usize = 64;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ChannelEventKind {
48 Open,
49 Close,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ChannelKind {
62 Direct,
64 Forwarded,
67 Dynamic,
69 Other,
72}
73
74impl ChannelKind {
75 pub fn from_bracket(token: &str) -> Self {
77 match token {
78 "direct-tcpip" => Self::Direct,
79 "forwarded-tcpip" => Self::Forwarded,
80 "dynamic-tcpip" => Self::Dynamic,
81 _ => Self::Other,
82 }
83 }
84
85 pub fn is_user_visible(self) -> bool {
88 matches!(self, Self::Direct | Self::Forwarded | Self::Dynamic)
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct ChannelEvent {
94 pub at: Instant,
95 pub channel_id: u32,
96 pub kind: ChannelEventKind,
97 pub channel_kind: Option<ChannelKind>,
102 pub opened_at: Option<Instant>,
105}
106
107pub const PEER_VIZ_BUCKETS: usize = 12;
114
115#[derive(Debug, Clone)]
117pub struct ClientPeer {
118 pub src: String,
119 pub process: String,
120 pub pid: u32,
123 pub since: Instant,
124 pub responsible_app: Option<String>,
130 pub current_rx_bps: u64,
135 pub current_tx_bps: u64,
136 pub bytes_rcvd: Option<u64>,
139 pub bytes_sent: Option<u64>,
140 pub last_sample_at: Option<Instant>,
142}
143
144#[derive(Debug, Clone)]
150#[allow(dead_code)]
151pub struct PortConflict {
152 pub port: u16,
153 pub process: String,
154 pub pid: u32,
155}
156
157pub struct TunnelLiveState {
161 pub events: VecDeque<ChannelEvent>,
162 pub opens_history: [u8; HISTORY_BUCKETS],
163 pub history_last_rotate: Instant,
166 pub peak_concurrent: u32,
167 pub total_opens: u32,
168 pub last_event_at: Option<Instant>,
169 pub active_channels: u32,
170 pub channel_open: HashMap<u32, (Instant, ChannelKind)>,
173 pub last_exit: Option<(i32, String)>,
175 pub stderr_buffer: Arc<Mutex<VecDeque<String>>>,
178 pub parser_thread: Option<JoinHandle<()>>,
180 pub parser_stop: Arc<AtomicBool>,
184
185 pub rx_history: [u64; HISTORY_BUCKETS],
188 pub tx_history: [u64; HISTORY_BUCKETS],
191 pub current_rx_bps: u64,
195 pub current_tx_bps: u64,
196 pub peak_rx_bps: u64,
199 pub peak_tx_bps: u64,
200 pub last_throughput_at: Option<Instant>,
204}
205
206impl TunnelLiveState {
207 pub fn new(started_at: Instant) -> Self {
208 Self {
209 events: VecDeque::with_capacity(MAX_EVENTS),
210 opens_history: [0u8; HISTORY_BUCKETS],
211 history_last_rotate: started_at,
212 peak_concurrent: 0,
213 total_opens: 0,
214 last_event_at: None,
215 active_channels: 0,
216 channel_open: HashMap::new(),
217 last_exit: None,
218 stderr_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_BUFFER_LINES))),
219 parser_thread: None,
220 parser_stop: Arc::new(AtomicBool::new(false)),
221 rx_history: [0u64; HISTORY_BUCKETS],
222 tx_history: [0u64; HISTORY_BUCKETS],
223 current_rx_bps: 0,
224 current_tx_bps: 0,
225 peak_rx_bps: 0,
226 peak_tx_bps: 0,
227 last_throughput_at: None,
228 }
229 }
230
231 pub fn record_event(&mut self, mut event: ChannelEvent) {
240 self.rotate_if_due(event.at);
241 match event.kind {
242 ChannelEventKind::Open => {
243 let kind = event.channel_kind.unwrap_or(ChannelKind::Other);
244 if kind.is_user_visible() {
245 self.total_opens = self.total_opens.saturating_add(1);
246 self.active_channels = self.active_channels.saturating_add(1);
247 self.peak_concurrent = self.peak_concurrent.max(self.active_channels);
248 }
249 self.channel_open.insert(event.channel_id, (event.at, kind));
250 }
251 ChannelEventKind::Close => {
252 if let Some((opened_at, kind)) = self.channel_open.remove(&event.channel_id) {
253 event.opened_at = Some(opened_at);
254 event.channel_kind = Some(kind);
255 if kind.is_user_visible() {
256 self.active_channels = self.active_channels.saturating_sub(1);
257 }
258 }
259 }
260 }
261 self.last_event_at = Some(event.at);
262 if self.events.len() == MAX_EVENTS {
263 self.events.pop_front();
264 }
265 self.events.push_back(event);
266 log::debug!(
267 "[purple] Tunnel live event: total_opens={} active={} peak={}",
268 self.total_opens,
269 self.active_channels,
270 self.peak_concurrent
271 );
272 }
273
274 pub fn rotate_if_due(&mut self, now: Instant) {
280 let elapsed = now.saturating_duration_since(self.history_last_rotate);
281 let ticks = elapsed.as_secs() / BUCKET_SECS;
282 if ticks == 0 {
283 return;
284 }
285 let shift = (ticks as usize).min(HISTORY_BUCKETS);
286 if shift >= HISTORY_BUCKETS {
287 self.opens_history.fill(0);
288 self.rx_history.fill(0);
289 self.tx_history.fill(0);
290 } else {
291 self.opens_history.rotate_left(shift);
292 for slot in self.opens_history.iter_mut().rev().take(shift) {
293 *slot = 0;
294 }
295 self.rx_history.rotate_left(shift);
296 for slot in self.rx_history.iter_mut().rev().take(shift) {
297 *slot = 0;
298 }
299 self.tx_history.rotate_left(shift);
300 for slot in self.tx_history.iter_mut().rev().take(shift) {
301 *slot = 0;
302 }
303 }
304 self.history_last_rotate += Duration::from_secs(ticks * BUCKET_SECS);
305 }
306
307 pub fn sample_activity(&mut self, concurrent: u32) {
314 let sample = u8::try_from(concurrent).unwrap_or(u8::MAX);
315 if let Some(last) = self.opens_history.last_mut() {
316 *last = (*last).max(sample);
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub struct ParserMessage {
324 pub alias: String,
325 pub event: ChannelEvent,
326}
327
328#[derive(Debug, Clone)]
331pub struct LsofMessage {
332 pub at: Instant,
333 pub clients: HashMap<u16, Vec<ClientPeer>>,
334 pub conflicts: HashMap<u16, PortConflict>,
335}
336
337impl LsofMessage {
338 pub fn empty(at: Instant) -> Self {
339 Self {
340 at,
341 clients: HashMap::new(),
342 conflicts: HashMap::new(),
343 }
344 }
345}
346
347pub struct LsofPollerHandle {
349 pub stop: Arc<AtomicBool>,
350 pub bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
351 pub thread: Option<JoinHandle<()>>,
352}
353
354impl LsofPollerHandle {
355 pub fn shutdown(&mut self) {
356 self.stop.store(true, Ordering::Relaxed);
357 if let Some(handle) = self.thread.take() {
358 let _ = handle.join();
359 }
360 }
361}
362
363pub fn spawn_parser_thread(
367 stderr: ChildStderr,
368 alias: String,
369 tx: Sender<ParserMessage>,
370 stderr_buffer: Arc<Mutex<VecDeque<String>>>,
371 stop: Arc<AtomicBool>,
372) -> JoinHandle<()> {
373 thread::Builder::new()
374 .name(format!("purple-tunnel-parser-{alias}"))
375 .spawn(move || {
376 let reader = BufReader::new(stderr);
377 for line in reader.lines() {
378 if stop.load(Ordering::Relaxed) {
379 break;
380 }
381 let Ok(line) = line else { break };
382 if let Ok(mut buf) = stderr_buffer.lock() {
383 if buf.len() == STDERR_BUFFER_LINES {
384 buf.pop_front();
385 }
386 buf.push_back(line.clone());
387 }
388 if let Some(event) = parse_channel_line(&line) {
389 let msg = ParserMessage {
390 alias: alias.clone(),
391 event,
392 };
393 if tx.send(msg).is_err() {
394 break;
395 }
396 }
397 }
398 log::debug!("[purple] Tunnel parser thread exit: alias={alias}");
399 })
400 .expect("spawn purple-tunnel-parser thread")
401}
402
403pub fn parse_channel_line(line: &str) -> Option<ChannelEvent> {
416 let trimmed = line.trim_start();
417 let rest = trimmed.strip_prefix("debug1: channel ")?;
418 let (id_str, after) = rest.split_once(':')?;
419 let channel_id: u32 = id_str.trim().parse().ok()?;
420 let after = after.trim_start();
421 let (kind, channel_kind) = if let Some(after_new) = after.strip_prefix("new") {
422 let after_new = after_new.trim_start();
423 let ctype = if let Some(rest) = after_new.strip_prefix('[') {
424 rest.split_once(']').map(|(t, _)| t.trim().to_string())
426 } else {
427 after_new
431 .split_whitespace()
432 .next()
433 .map(|s| s.to_string())
434 .filter(|s| !s.is_empty())
435 };
436 let chan_kind = ctype.as_deref().map(ChannelKind::from_bracket);
437 (ChannelEventKind::Open, chan_kind)
438 } else if after.starts_with("free") {
439 (ChannelEventKind::Close, None)
440 } else {
441 return None;
442 };
443 Some(ChannelEvent {
444 at: Instant::now(),
445 channel_id,
446 kind,
447 channel_kind,
448 opened_at: None,
449 })
450}
451
452#[cfg(any(target_os = "macos", target_os = "linux"))]
456pub fn spawn_lsof_poller(
457 bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
458 tx: Sender<LsofMessage>,
459 stop: Arc<AtomicBool>,
460) -> JoinHandle<()> {
461 thread::Builder::new()
462 .name("purple-tunnel-lsof".into())
463 .spawn(move || {
464 let mut first_seen: HashMap<(u16, String), Instant> = HashMap::new();
467 let mut responsible_cache = ResponsibleAppCache::default();
470 let mut peer_state: HashMap<(u16, String), PeerSampleCache> = HashMap::new();
473 while !stop.load(Ordering::Relaxed) {
474 let ports: Vec<(String, u16, u32)> = match bind_ports.lock() {
475 Ok(g) => g.clone(),
476 Err(p) => p.into_inner().clone(),
477 };
478 if ports.is_empty() {
479 thread::sleep(Duration::from_millis(500));
480 continue;
481 }
482 let now = Instant::now();
483 let mut msg = run_lsof_once(&ports, &mut first_seen, now);
484 annotate_responsible_apps(&mut msg, &mut responsible_cache);
485 annotate_peer_throughput(&mut msg, &mut peer_state, now);
486 if tx.send(msg).is_err() {
487 break;
488 }
489 for _ in 0..20 {
490 if stop.load(Ordering::Relaxed) {
491 break;
492 }
493 thread::sleep(Duration::from_millis(100));
494 }
495 }
496 log::debug!("[purple] Tunnel lsof poller thread exit");
497 })
498 .expect("spawn purple-tunnel-lsof thread")
499}
500
501#[cfg(not(any(target_os = "macos", target_os = "linux")))]
504pub fn spawn_lsof_poller(
505 _bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>>,
506 _tx: Sender<LsofMessage>,
507 stop: Arc<AtomicBool>,
508) -> JoinHandle<()> {
509 thread::spawn(move || {
510 while !stop.load(Ordering::Relaxed) {
511 thread::sleep(Duration::from_millis(500));
512 }
513 })
514}
515
516#[cfg(target_os = "macos")]
529unsafe extern "C" {
530 fn responsibility_get_pid_responsible_for_pid(pid: libc::pid_t) -> libc::pid_t;
531}
532
533#[cfg(target_os = "macos")]
541fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
542 let rpid = unsafe { responsibility_get_pid_responsible_for_pid(pid as libc::pid_t) };
546 if rpid <= 0 {
547 return None;
548 }
549 if rpid as u32 == pid {
550 return None;
551 }
552 let name = process_name(rpid as u32)?;
553 if name.eq_ignore_ascii_case(client_process) {
554 return None;
555 }
556 Some(name)
557}
558
559#[cfg(target_os = "macos")]
564fn process_name(pid: u32) -> Option<String> {
565 unsafe extern "C" {
566 fn proc_name(pid: libc::c_int, buffer: *mut libc::c_void, buffersize: u32) -> libc::c_int;
567 }
568 let mut buf = [0u8; 256];
569 let n = unsafe {
573 proc_name(
574 pid as libc::c_int,
575 buf.as_mut_ptr().cast(),
576 buf.len() as u32,
577 )
578 };
579 if n <= 0 {
580 return None;
581 }
582 let bytes = &buf[..(n as usize).min(buf.len())];
583 let s = std::str::from_utf8(bytes).ok()?.trim_end_matches('\0');
584 if s.is_empty() {
585 None
586 } else {
587 Some(beautify_process(s))
588 }
589}
590
591#[cfg(target_os = "linux")]
603fn lookup_responsible_app(pid: u32, client_process: &str) -> Option<String> {
604 let session_id = read_session_leader(pid)?;
605 if session_id == pid {
606 return None;
607 }
608 let comm = std::fs::read_to_string(format!("/proc/{}/comm", session_id)).ok()?;
609 let name = comm.trim();
610 if name.is_empty() || name.eq_ignore_ascii_case(client_process) {
611 return None;
612 }
613 Some(beautify_process(name))
614}
615
616#[cfg(target_os = "linux")]
621fn read_session_leader(pid: u32) -> Option<u32> {
622 let stat = std::fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
623 let close = stat.rfind(')')?;
624 let after = stat[close + 1..].trim();
625 let fields: Vec<&str> = after.split_whitespace().collect();
626 fields.get(3).and_then(|s| s.parse().ok())
627}
628
629#[cfg(not(any(target_os = "macos", target_os = "linux")))]
630fn lookup_responsible_app(_pid: u32, _client_process: &str) -> Option<String> {
631 None
632}
633
634#[derive(Default)]
641struct ResponsibleAppCache {
642 map: HashMap<u32, Option<String>>,
643}
644
645impl ResponsibleAppCache {
646 fn resolve(&mut self, pid: u32, client_process: &str) -> Option<String> {
647 if let Some(cached) = self.map.get(&pid) {
648 return cached.clone();
649 }
650 let resolved = lookup_responsible_app(pid, client_process);
651 self.map.insert(pid, resolved.clone());
652 resolved
653 }
654
655 fn retain_pids(&mut self, live: &std::collections::HashSet<u32>) {
658 self.map.retain(|pid, _| live.contains(pid));
659 }
660}
661
662fn annotate_responsible_apps(msg: &mut LsofMessage, cache: &mut ResponsibleAppCache) {
667 let mut live: std::collections::HashSet<u32> = std::collections::HashSet::new();
668 for peers in msg.clients.values_mut() {
669 for peer in peers.iter_mut() {
670 live.insert(peer.pid);
671 peer.responsible_app = cache.resolve(peer.pid, &peer.process);
672 }
673 }
674 cache.retain_pids(&live);
675}
676
677#[derive(Debug, Clone, Default)]
682struct PeerSampleCache {
683 last_rcvd: u64,
684 last_sent: u64,
685 last_at: Option<Instant>,
686}
687
688#[derive(Debug, Default)]
693struct PerPeerSamples {
694 per_socket_cumulative: HashMap<(u32, u16), (u64, u64)>,
698 per_pid_bps: HashMap<u32, (u64, u64)>,
703}
704
705fn annotate_peer_throughput(
713 msg: &mut LsofMessage,
714 cache: &mut HashMap<(u16, String), PeerSampleCache>,
715 now: Instant,
716) {
717 let samples = sample_peer_throughput();
718 let mut live: std::collections::HashSet<(u16, String)> = std::collections::HashSet::new();
719
720 for (port, peers) in msg.clients.iter_mut() {
721 for peer in peers.iter_mut() {
722 let key = (*port, peer.src.clone());
723 live.insert(key.clone());
724 let entry = cache.entry(key).or_default();
725 let src_port = src_port_from(&peer.src);
726
727 let cumulative =
729 src_port.and_then(|p| samples.per_socket_cumulative.get(&(peer.pid, p)).copied());
730 if let Some((rcvd, sent)) = cumulative {
731 if let Some(prev_at) = entry.last_at {
732 let dt = now.saturating_duration_since(prev_at).as_secs_f64();
733 if dt > 0.0 {
734 let rx_bps = ((rcvd.saturating_sub(entry.last_rcvd)) as f64 / dt) as u64;
735 let tx_bps = ((sent.saturating_sub(entry.last_sent)) as f64 / dt) as u64;
736 peer.current_rx_bps = rx_bps;
737 peer.current_tx_bps = tx_bps;
738 }
739 }
740 entry.last_rcvd = rcvd;
741 entry.last_sent = sent;
742 entry.last_at = Some(now);
743 peer.bytes_rcvd = Some(rcvd);
744 peer.bytes_sent = Some(sent);
745 peer.last_sample_at = Some(now);
746 continue;
747 }
748
749 if let Some((rx_bps, tx_bps)) = samples.per_pid_bps.get(&peer.pid).copied() {
751 peer.current_rx_bps = rx_bps;
752 peer.current_tx_bps = tx_bps;
753 entry.last_at = Some(now);
754 peer.last_sample_at = Some(now);
755 }
756 }
757 }
758
759 cache.retain(|key, _| live.contains(key));
760}
761
762#[cfg(target_os = "linux")]
769fn parse_ss_per_socket(_input: &str) -> HashMap<(u32, u16), (u64, u64)> {
770 HashMap::new()
771}
772
773fn sample_peer_throughput() -> PerPeerSamples {
780 let mut out = PerPeerSamples::default();
781 #[cfg(target_os = "linux")]
782 {
783 let output = Command::new("ss")
784 .args(["-H", "-t", "-i", "-n", "-p", "state", "established"])
785 .stdin(Stdio::null())
786 .stderr(Stdio::null())
787 .output();
788 if let Ok(o) = output {
789 if o.status.success() {
790 out.per_socket_cumulative =
791 parse_ss_per_socket(&String::from_utf8_lossy(&o.stdout));
792 }
793 }
794 }
795 #[cfg(target_os = "macos")]
796 {
797 out.per_pid_bps = sample_nettop_per_pid_macos();
798 }
799 out
800}
801
802#[cfg(target_os = "macos")]
812fn sample_nettop_per_pid_macos() -> HashMap<u32, (u64, u64)> {
813 use std::time::Duration;
814 let output = Command::new("/usr/bin/nettop")
815 .args(["-P", "-d", "-x", "-s", "1", "-L", "2"])
816 .stdin(Stdio::null())
817 .stderr(Stdio::null())
818 .output();
819 let text = match output {
820 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).into_owned(),
821 _ => return HashMap::new(),
822 };
823 let mut blocks: Vec<&str> = Vec::new();
828 let mut start = 0;
829 for (i, line) in text.lines().enumerate() {
830 if line.starts_with("time,") {
831 blocks.push(&text[start..]);
832 start = text.lines().take(i).map(|l| l.len() + 1).sum();
833 }
834 }
835 let _ = Duration::ZERO; let delta_block = if text.matches("\ntime,").count() >= 1 {
837 let first = text.find("time,").unwrap_or(0);
839 let second_rel = text[first + 5..].find("\ntime,");
840 match second_rel {
841 Some(off) => &text[first + 5 + off + 1..],
842 None => &text[first..],
843 }
844 } else {
845 text.as_str()
846 };
847 let mut out: HashMap<u32, (u64, u64)> = HashMap::new();
848 for line in delta_block.lines() {
849 if let Some((pid, rx, tx)) = parse_nettop_csv_row_per_pid(line) {
850 let entry = out.entry(pid).or_insert((0, 0));
853 entry.0 = entry.0.saturating_add(rx);
854 entry.1 = entry.1.saturating_add(tx);
855 }
856 }
857 out
858}
859
860#[cfg(target_os = "macos")]
866fn parse_nettop_csv_row_per_pid(line: &str) -> Option<(u32, u64, u64)> {
867 let line = line.trim();
868 if line.is_empty() || line.starts_with("time,") {
869 return None;
870 }
871 let cols: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
872 if cols.len() < 6 {
873 return None;
874 }
875 let proc_pid = cols[1];
876 let dot = proc_pid.rfind('.')?;
877 let pid_str = &proc_pid[dot + 1..];
878 let pid: u32 = pid_str.parse().ok()?;
879 let rx: u64 = cols[4].parse().ok()?;
880 let tx: u64 = cols[5].parse().ok()?;
881 Some((pid, rx, tx))
882}
883
884fn src_port_from(src: &str) -> Option<u16> {
886 src.rsplit_once(':').and_then(|(_, port)| port.parse().ok())
887}
888
889#[cfg(any(target_os = "macos", target_os = "linux"))]
890fn run_lsof_once(
891 ports: &[(String, u16, u32)],
892 first_seen: &mut HashMap<(u16, String), Instant>,
893 now: Instant,
894) -> LsofMessage {
895 let output = Command::new("lsof")
900 .args(["-iTCP", "-P", "-n", "-w", "+c", "0"])
901 .output();
902 let stdout = match output {
903 Ok(o) if o.status.success() || !o.stdout.is_empty() => o.stdout,
904 _ => return LsofMessage::empty(now),
905 };
906 let text = String::from_utf8_lossy(&stdout);
907 parse_lsof_output(&text, ports, first_seen, now)
908}
909
910pub fn parse_lsof_output(
912 text: &str,
913 ports: &[(String, u16, u32)],
914 first_seen: &mut HashMap<(u16, String), Instant>,
915 now: Instant,
916) -> LsofMessage {
917 let mut clients: HashMap<u16, Vec<ClientPeer>> = HashMap::new();
918 let mut conflicts: HashMap<u16, PortConflict> = HashMap::new();
919 let bind_ports: Vec<u16> = ports.iter().map(|(_, p, _)| *p).collect();
920 let tunnel_pids: Vec<u32> = ports.iter().map(|(_, _, pid)| *pid).collect();
921
922 for line in text.lines().skip(1) {
923 let row = match parse_lsof_row(line) {
924 Some(r) => r,
925 None => continue,
926 };
927 if row.is_listen && bind_ports.contains(&row.local_port) && !tunnel_pids.contains(&row.pid)
929 {
930 conflicts
931 .entry(row.local_port)
932 .or_insert_with(|| PortConflict {
933 port: row.local_port,
934 process: row.command.clone(),
935 pid: row.pid,
936 });
937 continue;
938 }
939 if row.is_listen {
940 continue;
941 }
942 if let Some(remote_port) = row.remote_port {
945 if bind_ports.contains(&remote_port) && !tunnel_pids.contains(&row.pid) {
946 let src = row.local_addr_port().unwrap_or_else(|| "?".to_string());
947 let key = (remote_port, src.clone());
948 let since = *first_seen.entry(key).or_insert(now);
949 let entry = clients.entry(remote_port).or_default();
950 if entry.len() >= MAX_CLIENTS_PER_PORT {
951 continue;
952 }
953 entry.push(ClientPeer {
954 src,
955 process: beautify_process(&row.command),
956 pid: row.pid,
957 since,
958 responsible_app: None,
959 current_rx_bps: 0,
960 current_tx_bps: 0,
961 bytes_rcvd: None,
962 bytes_sent: None,
963 last_sample_at: None,
964 });
965 }
966 }
967 }
968 let live: std::collections::HashSet<(u16, String)> = clients
971 .iter()
972 .flat_map(|(port, peers)| peers.iter().map(move |p| (*port, p.src.clone())))
973 .collect();
974 first_seen.retain(|key, _| live.contains(key));
975
976 LsofMessage {
977 at: now,
978 clients,
979 conflicts,
980 }
981}
982
983#[derive(Debug)]
985struct LsofRow {
986 command: String,
987 pid: u32,
988 is_listen: bool,
989 local_addr: String,
990 local_port: u16,
991 remote_addr: Option<String>,
992 remote_port: Option<u16>,
993}
994
995impl LsofRow {
996 fn local_addr_port(&self) -> Option<String> {
997 if let (Some(addr), Some(port)) = (self.remote_addr.as_deref(), self.remote_port) {
998 let _ = (addr, port);
1003 }
1004 Some(format!("{}:{}", self.local_addr, self.local_port))
1005 }
1006}
1007
1008fn parse_lsof_row(line: &str) -> Option<LsofRow> {
1009 if line.trim().is_empty() {
1010 return None;
1011 }
1012 let mut fields = line.split_whitespace();
1017 let command = fields.next()?.to_string();
1018 let pid: u32 = fields.next()?.parse().ok()?;
1019 let _user = fields.next()?;
1020 let _fd = fields.next()?;
1021 let _ty = fields.next()?;
1022 let _dev = fields.next()?;
1023 let _size = fields.next()?;
1024 let _node = fields.next()?;
1025 let name = fields.next()?;
1026 let state = fields.next();
1027 if !name.contains(':') {
1028 return None;
1029 }
1030 let is_listen = matches!(state, Some(s) if s.contains("LISTEN"));
1031 let is_established = matches!(state, Some(s) if s.contains("ESTABLISHED"));
1032 if !is_listen && !is_established {
1033 return None;
1034 }
1035 let (local, remote) = match name.split_once("->") {
1036 Some((l, r)) => (l, Some(r)),
1037 None => (name, None),
1038 };
1039 let (local_addr, local_port) = split_addr_port(local)?;
1040 let (remote_addr, remote_port) = match remote {
1041 Some(r) => match split_addr_port(r) {
1042 Some((a, p)) => (Some(a), Some(p)),
1043 None => (None, None),
1044 },
1045 None => (None, None),
1046 };
1047 Some(LsofRow {
1048 command,
1049 pid,
1050 is_listen,
1051 local_addr,
1052 local_port,
1053 remote_addr,
1054 remote_port,
1055 })
1056}
1057
1058pub fn beautify_process(raw: &str) -> String {
1065 if let Some(rest) = raw.strip_prefix("com.apple.") {
1066 if !rest.is_empty() {
1067 return rest.to_string();
1068 }
1069 }
1070 raw.to_string()
1071}
1072
1073fn split_addr_port(s: &str) -> Option<(String, u16)> {
1075 if let Some(rest) = s.strip_prefix('[') {
1076 let end = rest.find(']')?;
1077 let addr = &rest[..end];
1078 let after = &rest[end + 1..];
1079 let port_str = after.strip_prefix(':')?;
1080 let port: u16 = port_str.parse().ok()?;
1081 return Some((addr.to_string(), port));
1082 }
1083 let colon = s.rfind(':')?;
1084 let addr = &s[..colon];
1085 let port: u16 = s[colon + 1..].parse().ok()?;
1086 Some((addr.to_string(), port))
1087}
1088
1089#[derive(Debug, Clone)]
1097#[allow(dead_code)]
1098pub struct TunnelLiveSnapshot {
1099 pub uptime_secs: u64,
1100 pub active_channels: u32,
1101 pub peak_concurrent: u32,
1102 pub total_opens: u32,
1103 pub idle_secs: u64,
1104 pub rx_history: [u64; HISTORY_BUCKETS],
1105 pub tx_history: [u64; HISTORY_BUCKETS],
1106 pub current_rx_bps: u64,
1107 pub current_tx_bps: u64,
1108 pub peak_rx_bps: u64,
1109 pub peak_tx_bps: u64,
1110 pub throughput_ready: bool,
1113 pub clients: Vec<DisplayClient>,
1114 pub events: Vec<DisplayEvent>,
1115 pub currently_open: Vec<(u32, u64, ChannelKind)>,
1119 pub conflict: Option<PortConflict>,
1120 pub last_exit: Option<(i32, String)>,
1121}
1122
1123#[derive(Debug, Clone)]
1124pub struct DisplayClient {
1125 #[allow(dead_code)]
1129 pub src: String,
1130 pub process: String,
1131 pub age_secs: u64,
1132 #[allow(dead_code)]
1135 pub pid: u32,
1136 pub responsible_app: Option<String>,
1139 pub current_rx_bps: u64,
1143 pub current_tx_bps: u64,
1144 pub viz_history: [u64; PEER_VIZ_BUCKETS],
1147 pub throughput_ready: bool,
1151}
1152
1153#[derive(Debug, Clone)]
1154#[allow(dead_code)]
1155pub struct DisplayEvent {
1156 pub age_secs: u64,
1157 #[allow(dead_code)]
1161 pub channel_id: u32,
1162 pub kind: ChannelEventKind,
1163 pub channel_kind: ChannelKind,
1167 #[allow(dead_code)]
1170 pub duration_secs: Option<u64>,
1171 pub count: u32,
1175}
1176
1177#[cfg(test)]
1182mod tests {
1183 use super::*;
1184
1185 fn user_open(channel_id: u32, at: Instant) -> ChannelEvent {
1186 ChannelEvent {
1187 at,
1188 channel_id,
1189 kind: ChannelEventKind::Open,
1190 channel_kind: Some(ChannelKind::Direct),
1191 opened_at: None,
1192 }
1193 }
1194
1195 #[test]
1196 fn parse_channel_open_simple() {
1197 let ev = parse_channel_line("debug1: channel 0: new [direct-tcpip]").unwrap();
1198 assert_eq!(ev.channel_id, 0);
1199 assert_eq!(ev.kind, ChannelEventKind::Open);
1200 assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1201 }
1202
1203 #[test]
1204 fn parse_channel_open_records_listener_kind() {
1205 let ev = parse_channel_line("debug1: channel 1: new [port listener]").unwrap();
1206 assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1207 }
1208
1209 #[test]
1210 fn parse_channel_open_records_dynamic_kind() {
1211 let ev = parse_channel_line("debug1: channel 4: new [dynamic-tcpip]").unwrap();
1212 assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1213 }
1214
1215 #[test]
1216 fn parse_channel_close_simple() {
1217 let ev = parse_channel_line("debug1: channel 12: free: blah blah").unwrap();
1218 assert_eq!(ev.channel_id, 12);
1219 assert_eq!(ev.kind, ChannelEventKind::Close);
1220 assert_eq!(ev.channel_kind, None);
1223 }
1224
1225 #[test]
1226 fn parse_channel_with_leading_whitespace() {
1227 let ev = parse_channel_line(" debug1: channel 5: new [forwarded-tcpip]").unwrap();
1228 assert_eq!(ev.channel_id, 5);
1229 assert_eq!(ev.kind, ChannelEventKind::Open);
1230 assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1231 }
1232
1233 #[test]
1234 fn parse_channel_modern_openssh_format_with_inactive_timeout() {
1235 let ev = parse_channel_line(
1241 "debug1: channel 3: new direct-tcpip [127.0.0.1:54321] (inactive timeout: 0)",
1242 )
1243 .unwrap();
1244 assert_eq!(ev.channel_id, 3);
1245 assert_eq!(ev.kind, ChannelEventKind::Open);
1246 assert_eq!(ev.channel_kind, Some(ChannelKind::Direct));
1247 }
1248
1249 #[test]
1250 fn parse_channel_modern_openssh_format_forwarded() {
1251 let ev = parse_channel_line(
1252 "debug1: channel 7: new forwarded-tcpip [10.0.0.1:443] (inactive timeout: 0)",
1253 )
1254 .unwrap();
1255 assert_eq!(ev.channel_kind, Some(ChannelKind::Forwarded));
1256 }
1257
1258 #[test]
1259 fn parse_channel_modern_openssh_format_dynamic() {
1260 let ev = parse_channel_line("debug1: channel 9: new dynamic-tcpip [client] (timeout: 5)")
1261 .unwrap();
1262 assert_eq!(ev.channel_kind, Some(ChannelKind::Dynamic));
1263 }
1264
1265 #[test]
1266 fn parse_channel_modern_openssh_format_internal_listener_is_other() {
1267 let ev = parse_channel_line(
1271 "debug1: channel 0: new port-listener [::1:8080] (inactive timeout: 0)",
1272 )
1273 .unwrap();
1274 assert_eq!(ev.channel_kind, Some(ChannelKind::Other));
1275 }
1276
1277 #[test]
1278 fn parse_channel_unrelated_line_returns_none() {
1279 assert!(parse_channel_line("debug1: client_input_global_request").is_none());
1280 assert!(parse_channel_line("not even ssh output").is_none());
1281 assert!(parse_channel_line("debug1: channel abc: new").is_none());
1282 assert!(parse_channel_line("debug1: channel 1: confirm").is_none());
1283 }
1284
1285 #[test]
1286 fn record_event_open_increments_counters_for_user_visible_kinds() {
1287 let now = Instant::now();
1288 let mut state = TunnelLiveState::new(now);
1289 state.record_event(user_open(1, now));
1290 assert_eq!(state.total_opens, 1);
1291 assert_eq!(state.active_channels, 1);
1292 assert_eq!(state.peak_concurrent, 1);
1293 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1295 }
1296
1297 #[test]
1298 fn record_event_skips_counters_for_internal_channels() {
1299 let now = Instant::now();
1303 let mut state = TunnelLiveState::new(now);
1304 state.record_event(ChannelEvent {
1305 at: now,
1306 channel_id: 0,
1307 kind: ChannelEventKind::Open,
1308 channel_kind: Some(ChannelKind::Other),
1309 opened_at: None,
1310 });
1311 assert_eq!(state.total_opens, 0);
1312 assert_eq!(state.active_channels, 0);
1313 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1314 assert_eq!(state.events.len(), 1);
1316 }
1317
1318 #[test]
1319 fn sample_activity_writes_peak_into_current_bucket() {
1320 let now = Instant::now();
1321 let mut state = TunnelLiveState::new(now);
1322 state.sample_activity(2);
1323 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1324 state.sample_activity(1);
1326 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 2);
1327 state.sample_activity(5);
1329 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 5);
1330 }
1331
1332 #[test]
1333 fn sample_activity_clamps_to_u8_max() {
1334 let now = Instant::now();
1335 let mut state = TunnelLiveState::new(now);
1336 state.sample_activity(u32::MAX);
1337 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], u8::MAX);
1338 }
1339
1340 #[test]
1341 fn record_event_close_pairs_with_open_for_duration() {
1342 let t0 = Instant::now();
1343 let t1 = t0 + Duration::from_secs(5);
1344 let mut state = TunnelLiveState::new(t0);
1345 state.record_event(user_open(7, t0));
1346 state.record_event(ChannelEvent {
1347 at: t1,
1348 channel_id: 7,
1349 kind: ChannelEventKind::Close,
1350 channel_kind: None,
1351 opened_at: None,
1352 });
1353 assert_eq!(state.active_channels, 0);
1354 let last = state.events.back().unwrap();
1355 assert_eq!(last.kind, ChannelEventKind::Close);
1356 assert_eq!(last.opened_at, Some(t0));
1357 assert_eq!(last.channel_kind, Some(ChannelKind::Direct));
1359 }
1360
1361 #[test]
1362 fn record_event_caps_ringbuffer_at_max() {
1363 let now = Instant::now();
1364 let mut state = TunnelLiveState::new(now);
1365 for i in 0..(MAX_EVENTS as u32 + 5) {
1366 state.record_event(user_open(i, now));
1367 }
1368 assert_eq!(state.events.len(), MAX_EVENTS);
1369 }
1370
1371 #[test]
1372 fn rotate_if_due_shifts_buckets_per_tick() {
1373 let t0 = Instant::now();
1374 let mut state = TunnelLiveState::new(t0);
1375 state.opens_history[HISTORY_BUCKETS - 1] = 7;
1376 state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS));
1377 assert_eq!(state.opens_history[HISTORY_BUCKETS - 2], 7);
1379 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 0);
1380 }
1381
1382 #[test]
1383 fn rotate_if_due_clamps_at_full_window() {
1384 let t0 = Instant::now();
1385 let mut state = TunnelLiveState::new(t0);
1386 state.opens_history[HISTORY_BUCKETS - 1] = 9;
1387 state.rotate_if_due(t0 + Duration::from_secs(BUCKET_SECS * HISTORY_BUCKETS as u64 * 4));
1389 assert!(state.opens_history.iter().all(|&v| v == 0));
1390 }
1391
1392 #[test]
1393 fn rotate_if_due_noop_within_one_bucket() {
1394 let t0 = Instant::now();
1395 let mut state = TunnelLiveState::new(t0);
1396 state.opens_history[HISTORY_BUCKETS - 1] = 3;
1397 state.rotate_if_due(t0 + Duration::from_millis(BUCKET_SECS * 1000 / 2));
1399 assert_eq!(state.opens_history[HISTORY_BUCKETS - 1], 3);
1400 }
1401
1402 #[test]
1403 fn parse_lsof_listen_row() {
1404 let line = "ssh 12345 user 3u IPv4 0xabc 0t0 TCP 127.0.0.1:8080 (LISTEN)";
1405 let row = parse_lsof_row(line).unwrap();
1406 assert_eq!(row.command, "ssh");
1407 assert_eq!(row.pid, 12345);
1408 assert!(row.is_listen);
1409 assert_eq!(row.local_addr, "127.0.0.1");
1410 assert_eq!(row.local_port, 8080);
1411 assert!(row.remote_port.is_none());
1412 }
1413
1414 #[test]
1415 fn parse_lsof_established_row() {
1416 let line =
1417 "curl 23456 user 4u IPv4 0xdef 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)";
1418 let row = parse_lsof_row(line).unwrap();
1419 assert_eq!(row.command, "curl");
1420 assert_eq!(row.pid, 23456);
1421 assert!(!row.is_listen);
1422 assert_eq!(row.local_port, 54321);
1423 assert_eq!(row.remote_port, Some(8080));
1424 }
1425
1426 #[test]
1427 fn parse_lsof_other_states_skipped() {
1428 let line = "x 1 u 0u IPv4 0 0t0 TCP 1.2.3.4:1->5.6.7.8:9 (CLOSE_WAIT)";
1430 assert!(parse_lsof_row(line).is_none());
1431 }
1432
1433 #[test]
1434 fn parse_lsof_output_finds_clients_for_bind_port() {
1435 let txt = "\
1436COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1437ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1438curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1439ssh 12345 u 5u IPv4 0xc 0t0 TCP 127.0.0.1:8080->127.0.0.1:54321 (ESTABLISHED)
1440";
1441 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1442 let mut seen = HashMap::new();
1443 let now = Instant::now();
1444 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1445 let peers = msg.clients.get(&8080).expect("clients on 8080");
1446 assert_eq!(peers.len(), 1);
1447 assert_eq!(peers[0].process, "curl");
1448 assert_eq!(peers[0].pid, 23456);
1449 assert!(peers[0].src.contains("54321"));
1450 assert!(msg.conflicts.is_empty());
1451 }
1452
1453 #[test]
1454 fn parse_lsof_output_detects_port_conflict() {
1455 let txt = "\
1456COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1457nginx 99999 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1458";
1459 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1460 let mut seen = HashMap::new();
1461 let now = Instant::now();
1462 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1463 let conflict = msg.conflicts.get(&8080).expect("conflict on 8080");
1464 assert_eq!(conflict.process, "nginx");
1465 assert_eq!(conflict.pid, 99999);
1466 }
1467
1468 #[test]
1469 fn parse_lsof_output_skips_own_listen() {
1470 let txt = "\
1471COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1472ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1473";
1474 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1475 let mut seen = HashMap::new();
1476 let now = Instant::now();
1477 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1478 assert!(msg.conflicts.is_empty());
1479 }
1480
1481 #[test]
1482 fn parse_lsof_output_first_seen_persists_across_polls() {
1483 let txt = "\
1484COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1485ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1486curl 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1487";
1488 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1489 let mut seen = HashMap::new();
1490 let t0 = Instant::now();
1491 let msg1 = parse_lsof_output(txt, &ports, &mut seen, t0);
1492 let t1 = t0 + Duration::from_secs(5);
1493 let msg2 = parse_lsof_output(txt, &ports, &mut seen, t1);
1494 let p1 = &msg1.clients[&8080][0];
1495 let p2 = &msg2.clients[&8080][0];
1496 assert_eq!(p1.since, p2.since, "first_seen should be sticky");
1497 }
1498
1499 #[test]
1500 fn split_addr_port_handles_ipv6_brackets() {
1501 let (a, p) = split_addr_port("[::1]:8080").unwrap();
1502 assert_eq!(a, "::1");
1503 assert_eq!(p, 8080);
1504 }
1505
1506 #[test]
1507 fn split_addr_port_handles_ipv4() {
1508 let (a, p) = split_addr_port("127.0.0.1:8080").unwrap();
1509 assert_eq!(a, "127.0.0.1");
1510 assert_eq!(p, 8080);
1511 }
1512
1513 #[test]
1514 fn beautify_process_strips_com_apple_prefix() {
1515 assert_eq!(
1516 beautify_process("com.apple.WebKit.Networking"),
1517 "WebKit.Networking"
1518 );
1519 assert_eq!(beautify_process("com.apple.Safari"), "Safari");
1520 }
1521
1522 #[test]
1523 fn beautify_process_passes_other_names_through_unchanged() {
1524 assert_eq!(beautify_process("curl"), "curl");
1525 assert_eq!(beautify_process("nginx"), "nginx");
1526 assert_eq!(beautify_process("python3"), "python3");
1527 }
1528
1529 #[test]
1530 fn beautify_process_does_not_strip_when_only_prefix() {
1531 assert_eq!(beautify_process("com.apple."), "com.apple.");
1535 }
1536
1537 #[test]
1538 fn parse_lsof_output_unwraps_apple_framework_names() {
1539 let txt = "\
1540COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1541ssh 12345 u 3u IPv4 0xa 0t0 TCP 127.0.0.1:8080 (LISTEN)
1542com.apple.WebKit.Networking 23456 u 4u IPv4 0xb 0t0 TCP 127.0.0.1:54321->127.0.0.1:8080 (ESTABLISHED)
1543";
1544 let ports = vec![("foo".into(), 8080u16, 12345u32)];
1545 let mut seen = HashMap::new();
1546 let now = Instant::now();
1547 let msg = parse_lsof_output(txt, &ports, &mut seen, now);
1548 let peers = msg.clients.get(&8080).expect("clients on 8080");
1549 assert_eq!(peers.len(), 1);
1550 assert_eq!(peers[0].process, "WebKit.Networking");
1552 }
1553}