use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::app::TunnelFormBaseline;
use crate::app::forms::TunnelForm;
use crate::tunnel::{ActiveTunnel, TunnelRule};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TunnelSortMode {
#[default]
MostRecent,
AlphaHostname,
}
impl TunnelSortMode {
pub fn next(self) -> Self {
match self {
TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
}
}
pub fn label(self) -> &'static str {
match self {
TunnelSortMode::MostRecent => "most recent",
TunnelSortMode::AlphaHostname => "A-Z hostname",
}
}
}
use crate::tunnel_live::{
ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
PortConflict, TunnelLiveSnapshot,
};
pub struct TunnelState {
pub(in crate::app) list: Vec<TunnelRule>,
pub(in crate::app) form: TunnelForm,
pub(in crate::app) active: HashMap<String, ActiveTunnel>,
pub(in crate::app) form_baseline: Option<TunnelFormBaseline>,
pub(in crate::app) pending_delete: Option<usize>,
pub(in crate::app) summaries_cache: HashMap<String, String>,
pub(in crate::app) sort_mode: TunnelSortMode,
pub(in crate::app) parser_tx: Sender<ParserMessage>,
pub(in crate::app) parser_rx: Receiver<ParserMessage>,
pub(in crate::app) lsof_tx: Sender<LsofMessage>,
pub(in crate::app) lsof_rx: Receiver<LsofMessage>,
pub(in crate::app) clients: HashMap<u16, Vec<ClientPeer>>,
pub(in crate::app) conflicts: HashMap<u16, PortConflict>,
pub(in crate::app) last_lsof_at: Option<Instant>,
pub(in crate::app) peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
pub(in crate::app) peer_viz_last_push: Option<Instant>,
pub(in crate::app) peer_viz_prev_push: Option<Instant>,
pub(in crate::app) lsof: Option<LsofPollerHandle>,
pub(in crate::app) demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
}
impl Default for TunnelState {
fn default() -> Self {
let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
Self {
list: Vec::new(),
form: TunnelForm::new(),
active: HashMap::new(),
form_baseline: None,
pending_delete: None,
summaries_cache: HashMap::new(),
sort_mode: TunnelSortMode::default(),
parser_tx,
parser_rx,
lsof_tx,
lsof_rx,
clients: HashMap::new(),
conflicts: HashMap::new(),
last_lsof_at: None,
peer_viz: HashMap::new(),
peer_viz_last_push: None,
peer_viz_prev_push: None,
lsof: None,
demo_live_snapshots: HashMap::new(),
}
}
}
impl Drop for TunnelState {
fn drop(&mut self) {
if let Some(mut handle) = self.lsof.take() {
handle.shutdown();
}
}
}
impl TunnelState {
pub fn list(&self) -> &[TunnelRule] {
&self.list
}
pub fn list_mut(&mut self) -> &mut Vec<TunnelRule> {
&mut self.list
}
pub fn form(&self) -> &TunnelForm {
&self.form
}
pub fn form_mut(&mut self) -> &mut TunnelForm {
&mut self.form
}
pub fn reset_form(&mut self) {
self.form = TunnelForm::new();
}
pub fn active(&self) -> &HashMap<String, ActiveTunnel> {
&self.active
}
pub fn active_get(&self, alias: &str) -> Option<&ActiveTunnel> {
self.active.get(alias)
}
pub fn active_get_mut(&mut self, alias: &str) -> Option<&mut ActiveTunnel> {
self.active.get_mut(alias)
}
pub fn active_contains(&self, alias: &str) -> bool {
self.active.contains_key(alias)
}
pub fn active_insert(&mut self, alias: String, tunnel: ActiveTunnel) {
self.active.insert(alias, tunnel);
}
pub fn active_remove(&mut self, alias: &str) -> Option<ActiveTunnel> {
self.active.remove(alias)
}
pub fn drain_active(&mut self) -> std::collections::hash_map::Drain<'_, String, ActiveTunnel> {
self.active.drain()
}
pub fn clear_active(&mut self) {
self.active.clear();
}
pub fn pending_delete(&self) -> Option<usize> {
self.pending_delete
}
pub fn take_pending_delete(&mut self) -> Option<usize> {
self.pending_delete.take()
}
pub fn sort_mode(&self) -> TunnelSortMode {
self.sort_mode
}
pub fn set_sort_mode(&mut self, mode: TunnelSortMode) {
self.sort_mode = mode;
}
pub fn form_baseline(&self) -> Option<&TunnelFormBaseline> {
self.form_baseline.as_ref()
}
pub fn set_form_baseline(&mut self, baseline: Option<TunnelFormBaseline>) {
self.form_baseline = baseline;
}
pub fn demo_live_snapshots(&self) -> &HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
&self.demo_live_snapshots
}
pub fn demo_live_snapshots_mut(
&mut self,
) -> &mut HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
&mut self.demo_live_snapshots
}
pub fn parser_tx(&self) -> Sender<ParserMessage> {
self.parser_tx.clone()
}
pub fn clients(&self) -> &HashMap<u16, Vec<ClientPeer>> {
&self.clients
}
pub fn peer_viz(&self) -> &HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]> {
&self.peer_viz
}
pub fn peer_viz_last_push(&self) -> Option<Instant> {
self.peer_viz_last_push
}
pub fn peer_viz_prev_push(&self) -> Option<Instant> {
self.peer_viz_prev_push
}
pub fn summaries_cache(&self) -> &HashMap<String, String> {
&self.summaries_cache
}
pub fn summaries_cache_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.summaries_cache
}
pub fn request_delete(&mut self, idx: usize) {
self.pending_delete = Some(idx);
}
pub fn cancel_delete(&mut self) {
self.pending_delete = None;
}
pub fn ensure_lsof_poller(&mut self) {
if self.lsof.is_some() {
return;
}
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
let thread = crate::tunnel_live::spawn_lsof_poller(
bind_ports.clone(),
self.lsof_tx.clone(),
stop.clone(),
);
self.lsof = Some(LsofPollerHandle {
stop,
bind_ports,
thread: Some(thread),
});
log::debug!("[purple] Tunnel lsof poller started");
}
pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
if let Some(handle) = &self.lsof {
if let Ok(mut g) = handle.bind_ports.lock() {
*g = ports;
}
}
}
pub fn poll(&mut self) -> Vec<(String, String, bool)> {
let now = Instant::now();
while let Ok(msg) = self.parser_rx.try_recv() {
if let Some(tunnel) = self.active.get_mut(&msg.alias) {
tunnel.live.record_event(msg.event);
}
}
let mut latest_lsof: Option<LsofMessage> = None;
while let Ok(msg) = self.lsof_rx.try_recv() {
latest_lsof = Some(msg);
}
if let Some(msg) = latest_lsof {
self.clients = msg.clients;
self.conflicts = msg.conflicts;
self.last_lsof_at = Some(msg.at);
self.push_peer_viz(now);
}
for tunnel in self.active.values_mut() {
tunnel.live.rotate_if_due(now);
}
let port_to_alias: HashMap<u16, String> = self
.lsof
.as_ref()
.and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
.map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
.unwrap_or_default();
let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
for (port, peers) in &self.clients {
let Some(alias) = port_to_alias.get(port) else {
continue;
};
let entry = bps_per_alias
.entry(alias.clone())
.or_insert((0u64, 0u64, false));
for peer in peers {
entry.0 = entry.0.saturating_add(peer.current_rx_bps);
entry.1 = entry.1.saturating_add(peer.current_tx_bps);
if peer.last_sample_at.is_some() {
entry.2 = true;
}
}
}
for (alias, tunnel) in self.active.iter_mut() {
let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
tunnel.live.current_rx_bps = rx;
tunnel.live.current_tx_bps = tx;
tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
if ready {
tunnel.live.last_throughput_at = Some(now);
}
}
let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
for (port, peers) in &self.clients {
if let Some(alias) = port_to_alias.get(port) {
*concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
}
}
for (alias, tunnel) in self.active.iter_mut() {
let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
let sample = lsof_count.max(tunnel.live.active_channels);
tunnel.live.sample_activity(sample);
}
if self.active.is_empty() {
return Vec::new();
}
let mut exited = Vec::new();
let mut to_remove = Vec::new();
for (alias, tunnel) in &mut self.active {
match tunnel.child.try_wait() {
Ok(Some(status)) => {
let stderr_msg = tunnel
.live
.stderr_buffer
.lock()
.ok()
.and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
let exit_code = status.code().unwrap_or(-1);
if !status.success() {
log::error!(
"[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
);
if let Some(ref err) = stderr_msg {
log::debug!("[external] Tunnel stderr: {}", err.trim());
}
}
let last_exit_line = stderr_msg
.clone()
.unwrap_or_else(|| format!("exit code {}", exit_code));
tunnel.live.last_exit = Some((exit_code, last_exit_line));
tunnel.live.parser_stop.store(true, Ordering::Relaxed);
if tunnel.live.active_channels > 0 {
let close_now = ChannelEventKind::Close;
let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
for id in ids {
tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
at: now,
channel_id: id,
kind: close_now,
channel_kind: None,
opened_at: None,
});
}
}
let (msg, is_error) = if status.success() {
(format!("Tunnel for {} closed.", alias), false)
} else if let Some(err) = stderr_msg {
(format!("Tunnel for {}: {}", alias, err), true)
} else {
(
format!("Tunnel for {} exited with code {}.", alias, exit_code),
true,
)
};
exited.push((alias.clone(), msg, is_error));
to_remove.push(alias.clone());
}
Ok(None) => {}
Err(e) => {
exited.push((
alias.clone(),
format!("Tunnel for {} lost: {}", alias, e),
true,
));
to_remove.push(alias.clone());
}
}
}
for alias in to_remove {
self.active.remove(&alias);
}
exited
}
pub fn push_peer_viz(&mut self, now: Instant) {
let mut live: HashSet<(u16, String)> = HashSet::new();
for (port, peers) in &self.clients {
for peer in peers {
let key = (*port, peer.src.clone());
live.insert(key.clone());
let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
let history = self
.peer_viz
.entry(key)
.or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
history.rotate_left(1);
history[PEER_VIZ_BUCKETS - 1] = combined;
}
}
self.peer_viz.retain(|key, _| live.contains(key));
self.peer_viz_prev_push = self.peer_viz_last_push;
self.peer_viz_last_push = Some(now);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_state_is_empty() {
let s = TunnelState::default();
assert!(s.list.is_empty());
assert!(s.active.is_empty());
assert!(s.pending_delete.is_none());
assert!(s.summaries_cache.is_empty());
}
#[test]
fn poll_on_empty_returns_empty_vec() {
let mut s = TunnelState::default();
let result = s.poll();
assert!(result.is_empty());
assert!(s.active.is_empty());
}
fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
ClientPeer {
src: src.to_string(),
process: "curl".into(),
pid: 1234,
since: Instant::now(),
responsible_app: None,
current_rx_bps: rx,
current_tx_bps: tx,
bytes_rcvd: None,
bytes_sent: None,
last_sample_at: Some(Instant::now()),
}
}
#[test]
fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
let mut s = TunnelState::default();
s.clients
.insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
let now = Instant::now();
s.push_peer_viz(now);
let key = (8080u16, "127.0.0.1:1".to_string());
let history = s.peer_viz.get(&key).expect("entry created on first push");
assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
for cell in &history[..PEER_VIZ_BUCKETS - 1] {
assert_eq!(*cell, 0);
}
assert_eq!(s.peer_viz_last_push, Some(now));
assert_eq!(s.peer_viz_prev_push, None);
}
#[test]
fn push_peer_viz_rotates_left_on_each_call() {
let mut s = TunnelState::default();
s.clients
.insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
let t0 = Instant::now();
s.push_peer_viz(t0);
if let Some(peers) = s.clients.get_mut(&8080) {
peers[0].current_rx_bps = 200;
}
let t1 = t0 + std::time::Duration::from_secs(2);
s.push_peer_viz(t1);
let key = (8080u16, "127.0.0.1:1".to_string());
let history = s.peer_viz.get(&key).expect("entry exists");
assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
assert_eq!(s.peer_viz_last_push, Some(t1));
assert_eq!(s.peer_viz_prev_push, Some(t0));
}
#[test]
fn push_peer_viz_garbage_collects_disappeared_peers() {
let mut s = TunnelState::default();
s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
let t0 = Instant::now();
s.push_peer_viz(t0);
assert!(
s.peer_viz
.contains_key(&(8080u16, "127.0.0.1:1".to_string()))
);
s.clients.clear();
s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
assert!(s.peer_viz.is_empty());
}
#[test]
fn request_delete_sets_pending_delete_to_some_idx() {
let mut s = TunnelState::default();
s.request_delete(3);
assert_eq!(s.pending_delete, Some(3));
}
#[test]
fn cancel_delete_clears_pending_delete() {
let mut s = TunnelState::default();
s.pending_delete = Some(2);
s.cancel_delete();
assert!(s.pending_delete.is_none());
}
#[test]
fn request_delete_overwrites_existing_pending() {
let mut s = TunnelState::default();
s.pending_delete = Some(1);
s.request_delete(7);
assert_eq!(s.pending_delete, Some(7));
}
#[test]
fn cancel_delete_is_idempotent_on_empty_pending() {
let mut s = TunnelState::default();
s.cancel_delete();
s.cancel_delete();
assert!(s.pending_delete.is_none());
}
}