1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14 #[default]
17 MostRecent,
18 AlphaHostname,
20}
21
22impl TunnelSortMode {
23 pub fn next(self) -> Self {
24 match self {
25 TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26 TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27 }
28 }
29
30 pub fn label(self) -> &'static str {
31 match self {
32 TunnelSortMode::MostRecent => "most recent",
33 TunnelSortMode::AlphaHostname => "A-Z hostname",
34 }
35 }
36}
37use crate::tunnel_live::{
38 ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39 PortConflict, TunnelLiveSnapshot,
40};
41
42pub struct TunnelState {
48 pub list: Vec<TunnelRule>,
49 pub form: TunnelForm,
50 pub active: HashMap<String, ActiveTunnel>,
51 pub form_baseline: Option<TunnelFormBaseline>,
52 pub pending_delete: Option<usize>,
53 pub summaries_cache: HashMap<String, String>,
54 pub sort_mode: TunnelSortMode,
56
57 pub parser_tx: Sender<ParserMessage>,
61 pub parser_rx: Receiver<ParserMessage>,
62 pub lsof_tx: Sender<LsofMessage>,
63 pub lsof_rx: Receiver<LsofMessage>,
64
65 pub clients: HashMap<u16, Vec<ClientPeer>>,
68 pub conflicts: HashMap<u16, PortConflict>,
69 pub last_lsof_at: Option<Instant>,
70
71 pub peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
79 pub peer_viz_last_push: Option<Instant>,
86 pub peer_viz_prev_push: Option<Instant>,
87
88 pub lsof: Option<LsofPollerHandle>,
92
93 pub demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
97}
98
99impl Default for TunnelState {
100 fn default() -> Self {
101 let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
102 let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
103 Self {
104 list: Vec::new(),
105 form: TunnelForm::new(),
106 active: HashMap::new(),
107 form_baseline: None,
108 pending_delete: None,
109 summaries_cache: HashMap::new(),
110 sort_mode: TunnelSortMode::default(),
111 parser_tx,
112 parser_rx,
113 lsof_tx,
114 lsof_rx,
115 clients: HashMap::new(),
116 conflicts: HashMap::new(),
117 last_lsof_at: None,
118 peer_viz: HashMap::new(),
119 peer_viz_last_push: None,
120 peer_viz_prev_push: None,
121 lsof: None,
122 demo_live_snapshots: HashMap::new(),
123 }
124 }
125}
126
127impl Drop for TunnelState {
128 fn drop(&mut self) {
129 if let Some(mut handle) = self.lsof.take() {
130 handle.shutdown();
131 }
132 }
133}
134
135impl TunnelState {
136 pub fn request_delete(&mut self, idx: usize) {
139 self.pending_delete = Some(idx);
140 }
141
142 pub fn cancel_delete(&mut self) {
144 self.pending_delete = None;
145 }
146
147 pub fn ensure_lsof_poller(&mut self) {
151 if self.lsof.is_some() {
152 return;
153 }
154 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
155 let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
156 let thread = crate::tunnel_live::spawn_lsof_poller(
157 bind_ports.clone(),
158 self.lsof_tx.clone(),
159 stop.clone(),
160 );
161 self.lsof = Some(LsofPollerHandle {
162 stop,
163 bind_ports,
164 thread: Some(thread),
165 });
166 log::debug!("[purple] Tunnel lsof poller started");
167 }
168
169 pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
174 if let Some(handle) = &self.lsof {
175 if let Ok(mut g) = handle.bind_ports.lock() {
176 *g = ports;
177 }
178 }
179 }
180
181 pub fn poll(&mut self) -> Vec<(String, String, bool)> {
187 let now = Instant::now();
188 while let Ok(msg) = self.parser_rx.try_recv() {
191 if let Some(tunnel) = self.active.get_mut(&msg.alias) {
192 tunnel.live.record_event(msg.event);
193 }
194 }
195 let mut latest_lsof: Option<LsofMessage> = None;
199 while let Ok(msg) = self.lsof_rx.try_recv() {
200 latest_lsof = Some(msg);
201 }
202 if let Some(msg) = latest_lsof {
203 self.clients = msg.clients;
204 self.conflicts = msg.conflicts;
205 self.last_lsof_at = Some(msg.at);
206 self.push_peer_viz(now);
211 }
212 for tunnel in self.active.values_mut() {
215 tunnel.live.rotate_if_due(now);
216 }
217 let port_to_alias: HashMap<u16, String> = self
222 .lsof
223 .as_ref()
224 .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
225 .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
226 .unwrap_or_default();
227 let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
233 for (port, peers) in &self.clients {
234 let Some(alias) = port_to_alias.get(port) else {
235 continue;
236 };
237 let entry = bps_per_alias
238 .entry(alias.clone())
239 .or_insert((0u64, 0u64, false));
240 for peer in peers {
241 entry.0 = entry.0.saturating_add(peer.current_rx_bps);
242 entry.1 = entry.1.saturating_add(peer.current_tx_bps);
243 if peer.last_sample_at.is_some() {
244 entry.2 = true;
245 }
246 }
247 }
248 for (alias, tunnel) in self.active.iter_mut() {
249 let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
250 tunnel.live.current_rx_bps = rx;
251 tunnel.live.current_tx_bps = tx;
252 tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
253 tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
254 if ready {
255 tunnel.live.last_throughput_at = Some(now);
256 }
257 }
258 let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
265 for (port, peers) in &self.clients {
266 if let Some(alias) = port_to_alias.get(port) {
267 *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
268 }
269 }
270 for (alias, tunnel) in self.active.iter_mut() {
271 let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
272 let sample = lsof_count.max(tunnel.live.active_channels);
273 tunnel.live.sample_activity(sample);
274 }
275
276 if self.active.is_empty() {
277 return Vec::new();
278 }
279 let mut exited = Vec::new();
280 let mut to_remove = Vec::new();
281 for (alias, tunnel) in &mut self.active {
282 match tunnel.child.try_wait() {
283 Ok(Some(status)) => {
284 let stderr_msg = tunnel
288 .live
289 .stderr_buffer
290 .lock()
291 .ok()
292 .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
293 .map(|s| s.trim().to_string())
294 .filter(|s| !s.is_empty());
295 let exit_code = status.code().unwrap_or(-1);
296 if !status.success() {
297 log::error!(
298 "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
299 );
300 if let Some(ref err) = stderr_msg {
301 log::debug!("[external] Tunnel stderr: {}", err.trim());
302 }
303 }
304 let last_exit_line = stderr_msg
305 .clone()
306 .unwrap_or_else(|| format!("exit code {}", exit_code));
307 tunnel.live.last_exit = Some((exit_code, last_exit_line));
308 tunnel.live.parser_stop.store(true, Ordering::Relaxed);
309 if tunnel.live.active_channels > 0 {
312 let close_now = ChannelEventKind::Close;
313 let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
314 for id in ids {
315 tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
316 at: now,
317 channel_id: id,
318 kind: close_now,
319 channel_kind: None,
320 opened_at: None,
321 });
322 }
323 }
324 let (msg, is_error) = if status.success() {
325 (format!("Tunnel for {} closed.", alias), false)
326 } else if let Some(err) = stderr_msg {
327 (format!("Tunnel for {}: {}", alias, err), true)
328 } else {
329 (
330 format!("Tunnel for {} exited with code {}.", alias, exit_code),
331 true,
332 )
333 };
334 exited.push((alias.clone(), msg, is_error));
335 to_remove.push(alias.clone());
336 }
337 Ok(None) => {}
338 Err(e) => {
339 exited.push((
340 alias.clone(),
341 format!("Tunnel for {} lost: {}", alias, e),
342 true,
343 ));
344 to_remove.push(alias.clone());
345 }
346 }
347 }
348 for alias in to_remove {
349 self.active.remove(&alias);
350 }
351 exited
352 }
353
354 pub fn push_peer_viz(&mut self, now: Instant) {
362 let mut live: HashSet<(u16, String)> = HashSet::new();
363 for (port, peers) in &self.clients {
364 for peer in peers {
365 let key = (*port, peer.src.clone());
366 live.insert(key.clone());
367 let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
368 let history = self
369 .peer_viz
370 .entry(key)
371 .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
372 history.rotate_left(1);
373 history[PEER_VIZ_BUCKETS - 1] = combined;
374 }
375 }
376 self.peer_viz.retain(|key, _| live.contains(key));
377 self.peer_viz_prev_push = self.peer_viz_last_push;
378 self.peer_viz_last_push = Some(now);
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[test]
387 fn default_state_is_empty() {
388 let s = TunnelState::default();
389 assert!(s.list.is_empty());
390 assert!(s.active.is_empty());
391 assert!(s.pending_delete.is_none());
392 assert!(s.summaries_cache.is_empty());
393 }
394
395 #[test]
396 fn poll_on_empty_returns_empty_vec() {
397 let mut s = TunnelState::default();
401 let result = s.poll();
402 assert!(result.is_empty());
403 assert!(s.active.is_empty());
404 }
405
406 fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
407 ClientPeer {
408 src: src.to_string(),
409 process: "curl".into(),
410 pid: 1234,
411 since: Instant::now(),
412 responsible_app: None,
413 current_rx_bps: rx,
414 current_tx_bps: tx,
415 bytes_rcvd: None,
416 bytes_sent: None,
417 last_sample_at: Some(Instant::now()),
418 }
419 }
420
421 #[test]
422 fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
423 let mut s = TunnelState::default();
424 s.clients
425 .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
426 let now = Instant::now();
427 s.push_peer_viz(now);
428 let key = (8080u16, "127.0.0.1:1".to_string());
429 let history = s.peer_viz.get(&key).expect("entry created on first push");
430 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
431 for cell in &history[..PEER_VIZ_BUCKETS - 1] {
432 assert_eq!(*cell, 0);
433 }
434 assert_eq!(s.peer_viz_last_push, Some(now));
435 assert_eq!(s.peer_viz_prev_push, None);
436 }
437
438 #[test]
439 fn push_peer_viz_rotates_left_on_each_call() {
440 let mut s = TunnelState::default();
441 s.clients
442 .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
443 let t0 = Instant::now();
444 s.push_peer_viz(t0);
445 if let Some(peers) = s.clients.get_mut(&8080) {
448 peers[0].current_rx_bps = 200;
449 }
450 let t1 = t0 + std::time::Duration::from_secs(2);
451 s.push_peer_viz(t1);
452 let key = (8080u16, "127.0.0.1:1".to_string());
453 let history = s.peer_viz.get(&key).expect("entry exists");
454 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
455 assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
456 assert_eq!(s.peer_viz_last_push, Some(t1));
459 assert_eq!(s.peer_viz_prev_push, Some(t0));
460 }
461
462 #[test]
463 fn push_peer_viz_garbage_collects_disappeared_peers() {
464 let mut s = TunnelState::default();
465 s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
466 let t0 = Instant::now();
467 s.push_peer_viz(t0);
468 assert!(
469 s.peer_viz
470 .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
471 );
472 s.clients.clear();
474 s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
475 assert!(s.peer_viz.is_empty());
476 }
477
478 #[test]
479 fn request_delete_sets_pending_delete_to_some_idx() {
480 let mut s = TunnelState::default();
481 s.request_delete(3);
482 assert_eq!(s.pending_delete, Some(3));
483 }
484
485 #[test]
486 fn cancel_delete_clears_pending_delete() {
487 let mut s = TunnelState::default();
488 s.pending_delete = Some(2);
489 s.cancel_delete();
490 assert!(s.pending_delete.is_none());
491 }
492
493 #[test]
494 fn request_delete_overwrites_existing_pending() {
495 let mut s = TunnelState::default();
496 s.pending_delete = Some(1);
497 s.request_delete(7);
498 assert_eq!(s.pending_delete, Some(7));
499 }
500
501 #[test]
502 fn cancel_delete_is_idempotent_on_empty_pending() {
503 let mut s = TunnelState::default();
504 s.cancel_delete();
505 s.cancel_delete();
506 assert!(s.pending_delete.is_none());
507 }
508}