1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14 #[default]
17 MostRecent,
18 AlphaHostname,
20}
21
22impl TunnelSortMode {
23 pub fn next(self) -> Self {
24 match self {
25 TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26 TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27 }
28 }
29
30 pub fn label(self) -> &'static str {
31 match self {
32 TunnelSortMode::MostRecent => "most recent",
33 TunnelSortMode::AlphaHostname => "A-Z hostname",
34 }
35 }
36}
37use crate::tunnel_live::{
38 ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39 PortConflict, TunnelLiveSnapshot,
40};
41
42pub struct TunnelState {
48 pub(in crate::app) list: Vec<TunnelRule>,
49 pub(in crate::app) form: TunnelForm,
50 pub(in crate::app) active: HashMap<String, ActiveTunnel>,
51 pub(in crate::app) form_baseline: Option<TunnelFormBaseline>,
52 pub(in crate::app) pending_delete: Option<usize>,
53 pub(in crate::app) summaries_cache: HashMap<String, String>,
54 pub(in crate::app) sort_mode: TunnelSortMode,
56
57 pub(in crate::app) parser_tx: Sender<ParserMessage>,
61 pub(in crate::app) parser_rx: Receiver<ParserMessage>,
62 pub(in crate::app) lsof_tx: Sender<LsofMessage>,
63 pub(in crate::app) lsof_rx: Receiver<LsofMessage>,
64
65 pub(in crate::app) clients: HashMap<u16, Vec<ClientPeer>>,
68 pub(in crate::app) conflicts: HashMap<u16, PortConflict>,
69 pub(in crate::app) last_lsof_at: Option<Instant>,
70
71 pub(in crate::app) peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
79 pub(in crate::app) peer_viz_last_push: Option<Instant>,
86 pub(in crate::app) peer_viz_prev_push: Option<Instant>,
87
88 pub(in crate::app) lsof: Option<LsofPollerHandle>,
92
93 pub(in crate::app) demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
97}
98
99impl Default for TunnelState {
100 fn default() -> Self {
101 let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
102 let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
103 Self {
104 list: Vec::new(),
105 form: TunnelForm::new(),
106 active: HashMap::new(),
107 form_baseline: None,
108 pending_delete: None,
109 summaries_cache: HashMap::new(),
110 sort_mode: TunnelSortMode::default(),
111 parser_tx,
112 parser_rx,
113 lsof_tx,
114 lsof_rx,
115 clients: HashMap::new(),
116 conflicts: HashMap::new(),
117 last_lsof_at: None,
118 peer_viz: HashMap::new(),
119 peer_viz_last_push: None,
120 peer_viz_prev_push: None,
121 lsof: None,
122 demo_live_snapshots: HashMap::new(),
123 }
124 }
125}
126
127impl Drop for TunnelState {
128 fn drop(&mut self) {
129 if let Some(mut handle) = self.lsof.take() {
130 handle.shutdown();
131 }
132 }
133}
134
135impl TunnelState {
136 pub fn list(&self) -> &[TunnelRule] {
137 &self.list
138 }
139
140 pub fn list_mut(&mut self) -> &mut Vec<TunnelRule> {
141 &mut self.list
142 }
143
144 pub fn form(&self) -> &TunnelForm {
145 &self.form
146 }
147
148 pub fn form_mut(&mut self) -> &mut TunnelForm {
149 &mut self.form
150 }
151
152 pub fn reset_form(&mut self) {
153 self.form = TunnelForm::new();
154 }
155
156 pub fn active(&self) -> &HashMap<String, ActiveTunnel> {
157 &self.active
158 }
159
160 pub fn active_get(&self, alias: &str) -> Option<&ActiveTunnel> {
161 self.active.get(alias)
162 }
163
164 pub fn active_get_mut(&mut self, alias: &str) -> Option<&mut ActiveTunnel> {
165 self.active.get_mut(alias)
166 }
167
168 pub fn active_contains(&self, alias: &str) -> bool {
169 self.active.contains_key(alias)
170 }
171
172 pub fn active_insert(&mut self, alias: String, tunnel: ActiveTunnel) {
173 self.active.insert(alias, tunnel);
174 }
175
176 pub fn active_remove(&mut self, alias: &str) -> Option<ActiveTunnel> {
177 self.active.remove(alias)
178 }
179
180 pub fn drain_active(&mut self) -> std::collections::hash_map::Drain<'_, String, ActiveTunnel> {
181 self.active.drain()
182 }
183
184 pub fn clear_active(&mut self) {
185 self.active.clear();
186 }
187
188 pub fn pending_delete(&self) -> Option<usize> {
189 self.pending_delete
190 }
191
192 pub fn take_pending_delete(&mut self) -> Option<usize> {
193 self.pending_delete.take()
194 }
195
196 pub fn sort_mode(&self) -> TunnelSortMode {
197 self.sort_mode
198 }
199
200 pub fn set_sort_mode(&mut self, mode: TunnelSortMode) {
201 self.sort_mode = mode;
202 }
203
204 pub fn form_baseline(&self) -> Option<&TunnelFormBaseline> {
205 self.form_baseline.as_ref()
206 }
207
208 pub fn set_form_baseline(&mut self, baseline: Option<TunnelFormBaseline>) {
209 self.form_baseline = baseline;
210 }
211
212 pub fn demo_live_snapshots(&self) -> &HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
213 &self.demo_live_snapshots
214 }
215
216 pub fn demo_live_snapshots_mut(
217 &mut self,
218 ) -> &mut HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
219 &mut self.demo_live_snapshots
220 }
221
222 pub fn parser_tx(&self) -> Sender<ParserMessage> {
223 self.parser_tx.clone()
224 }
225
226 pub fn clients(&self) -> &HashMap<u16, Vec<ClientPeer>> {
227 &self.clients
228 }
229
230 pub fn peer_viz(&self) -> &HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]> {
231 &self.peer_viz
232 }
233
234 pub fn peer_viz_last_push(&self) -> Option<Instant> {
235 self.peer_viz_last_push
236 }
237
238 pub fn peer_viz_prev_push(&self) -> Option<Instant> {
239 self.peer_viz_prev_push
240 }
241
242 pub fn summaries_cache(&self) -> &HashMap<String, String> {
243 &self.summaries_cache
244 }
245
246 pub fn summaries_cache_mut(&mut self) -> &mut HashMap<String, String> {
247 &mut self.summaries_cache
248 }
249
250 pub fn request_delete(&mut self, idx: usize) {
253 self.pending_delete = Some(idx);
254 }
255
256 pub fn cancel_delete(&mut self) {
258 self.pending_delete = None;
259 }
260
261 pub fn ensure_lsof_poller(&mut self) {
265 if self.lsof.is_some() {
266 return;
267 }
268 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
269 let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
270 let thread = crate::tunnel_live::spawn_lsof_poller(
271 bind_ports.clone(),
272 self.lsof_tx.clone(),
273 stop.clone(),
274 );
275 self.lsof = Some(LsofPollerHandle {
276 stop,
277 bind_ports,
278 thread: Some(thread),
279 });
280 log::debug!("[purple] Tunnel lsof poller started");
281 }
282
283 pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
288 if let Some(handle) = &self.lsof {
289 if let Ok(mut g) = handle.bind_ports.lock() {
290 *g = ports;
291 }
292 }
293 }
294
295 pub fn poll(&mut self) -> Vec<(String, String, bool)> {
301 let now = Instant::now();
302 while let Ok(msg) = self.parser_rx.try_recv() {
305 if let Some(tunnel) = self.active.get_mut(&msg.alias) {
306 tunnel.live.record_event(msg.event);
307 }
308 }
309 let mut latest_lsof: Option<LsofMessage> = None;
313 while let Ok(msg) = self.lsof_rx.try_recv() {
314 latest_lsof = Some(msg);
315 }
316 if let Some(msg) = latest_lsof {
317 self.clients = msg.clients;
318 self.conflicts = msg.conflicts;
319 self.last_lsof_at = Some(msg.at);
320 self.push_peer_viz(now);
325 }
326 for tunnel in self.active.values_mut() {
329 tunnel.live.rotate_if_due(now);
330 }
331 let port_to_alias: HashMap<u16, String> = self
336 .lsof
337 .as_ref()
338 .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
339 .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
340 .unwrap_or_default();
341 let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
347 for (port, peers) in &self.clients {
348 let Some(alias) = port_to_alias.get(port) else {
349 continue;
350 };
351 let entry = bps_per_alias
352 .entry(alias.clone())
353 .or_insert((0u64, 0u64, false));
354 for peer in peers {
355 entry.0 = entry.0.saturating_add(peer.current_rx_bps);
356 entry.1 = entry.1.saturating_add(peer.current_tx_bps);
357 if peer.last_sample_at.is_some() {
358 entry.2 = true;
359 }
360 }
361 }
362 for (alias, tunnel) in self.active.iter_mut() {
363 let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
364 tunnel.live.current_rx_bps = rx;
365 tunnel.live.current_tx_bps = tx;
366 tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
367 tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
368 if ready {
369 tunnel.live.last_throughput_at = Some(now);
370 }
371 }
372 let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
379 for (port, peers) in &self.clients {
380 if let Some(alias) = port_to_alias.get(port) {
381 *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
382 }
383 }
384 for (alias, tunnel) in self.active.iter_mut() {
385 let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
386 let sample = lsof_count.max(tunnel.live.active_channels);
387 tunnel.live.sample_activity(sample);
388 }
389
390 if self.active.is_empty() {
391 return Vec::new();
392 }
393 let mut exited = Vec::new();
394 let mut to_remove = Vec::new();
395 for (alias, tunnel) in &mut self.active {
396 match tunnel.child.try_wait() {
397 Ok(Some(status)) => {
398 let stderr_msg = tunnel
402 .live
403 .stderr_buffer
404 .lock()
405 .ok()
406 .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
407 .map(|s| s.trim().to_string())
408 .filter(|s| !s.is_empty());
409 let exit_code = status.code().unwrap_or(-1);
410 if !status.success() {
411 log::error!(
412 "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
413 );
414 if let Some(ref err) = stderr_msg {
415 log::debug!("[external] Tunnel stderr: {}", err.trim());
416 }
417 }
418 let last_exit_line = stderr_msg
419 .clone()
420 .unwrap_or_else(|| format!("exit code {}", exit_code));
421 tunnel.live.last_exit = Some((exit_code, last_exit_line));
422 tunnel.live.parser_stop.store(true, Ordering::Relaxed);
423 if tunnel.live.active_channels > 0 {
426 let close_now = ChannelEventKind::Close;
427 let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
428 for id in ids {
429 tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
430 at: now,
431 channel_id: id,
432 kind: close_now,
433 channel_kind: None,
434 opened_at: None,
435 });
436 }
437 }
438 let (msg, is_error) = if status.success() {
439 (format!("Tunnel for {} closed.", alias), false)
440 } else if let Some(err) = stderr_msg {
441 (format!("Tunnel for {}: {}", alias, err), true)
442 } else {
443 (
444 format!("Tunnel for {} exited with code {}.", alias, exit_code),
445 true,
446 )
447 };
448 exited.push((alias.clone(), msg, is_error));
449 to_remove.push(alias.clone());
450 }
451 Ok(None) => {}
452 Err(e) => {
453 exited.push((
454 alias.clone(),
455 format!("Tunnel for {} lost: {}", alias, e),
456 true,
457 ));
458 to_remove.push(alias.clone());
459 }
460 }
461 }
462 for alias in to_remove {
463 self.active.remove(&alias);
464 }
465 exited
466 }
467
468 pub fn push_peer_viz(&mut self, now: Instant) {
476 let mut live: HashSet<(u16, String)> = HashSet::new();
477 for (port, peers) in &self.clients {
478 for peer in peers {
479 let key = (*port, peer.src.clone());
480 live.insert(key.clone());
481 let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
482 let history = self
483 .peer_viz
484 .entry(key)
485 .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
486 history.rotate_left(1);
487 history[PEER_VIZ_BUCKETS - 1] = combined;
488 }
489 }
490 self.peer_viz.retain(|key, _| live.contains(key));
491 self.peer_viz_prev_push = self.peer_viz_last_push;
492 self.peer_viz_last_push = Some(now);
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn default_state_is_empty() {
502 let s = TunnelState::default();
503 assert!(s.list.is_empty());
504 assert!(s.active.is_empty());
505 assert!(s.pending_delete.is_none());
506 assert!(s.summaries_cache.is_empty());
507 }
508
509 #[test]
510 fn poll_on_empty_returns_empty_vec() {
511 let mut s = TunnelState::default();
515 let result = s.poll();
516 assert!(result.is_empty());
517 assert!(s.active.is_empty());
518 }
519
520 fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
521 ClientPeer {
522 src: src.to_string(),
523 process: "curl".into(),
524 pid: 1234,
525 since: Instant::now(),
526 responsible_app: None,
527 current_rx_bps: rx,
528 current_tx_bps: tx,
529 bytes_rcvd: None,
530 bytes_sent: None,
531 last_sample_at: Some(Instant::now()),
532 }
533 }
534
535 #[test]
536 fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
537 let mut s = TunnelState::default();
538 s.clients
539 .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
540 let now = Instant::now();
541 s.push_peer_viz(now);
542 let key = (8080u16, "127.0.0.1:1".to_string());
543 let history = s.peer_viz.get(&key).expect("entry created on first push");
544 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
545 for cell in &history[..PEER_VIZ_BUCKETS - 1] {
546 assert_eq!(*cell, 0);
547 }
548 assert_eq!(s.peer_viz_last_push, Some(now));
549 assert_eq!(s.peer_viz_prev_push, None);
550 }
551
552 #[test]
553 fn push_peer_viz_rotates_left_on_each_call() {
554 let mut s = TunnelState::default();
555 s.clients
556 .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
557 let t0 = Instant::now();
558 s.push_peer_viz(t0);
559 if let Some(peers) = s.clients.get_mut(&8080) {
562 peers[0].current_rx_bps = 200;
563 }
564 let t1 = t0 + std::time::Duration::from_secs(2);
565 s.push_peer_viz(t1);
566 let key = (8080u16, "127.0.0.1:1".to_string());
567 let history = s.peer_viz.get(&key).expect("entry exists");
568 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
569 assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
570 assert_eq!(s.peer_viz_last_push, Some(t1));
573 assert_eq!(s.peer_viz_prev_push, Some(t0));
574 }
575
576 #[test]
577 fn push_peer_viz_garbage_collects_disappeared_peers() {
578 let mut s = TunnelState::default();
579 s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
580 let t0 = Instant::now();
581 s.push_peer_viz(t0);
582 assert!(
583 s.peer_viz
584 .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
585 );
586 s.clients.clear();
588 s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
589 assert!(s.peer_viz.is_empty());
590 }
591
592 #[test]
593 fn request_delete_sets_pending_delete_to_some_idx() {
594 let mut s = TunnelState::default();
595 s.request_delete(3);
596 assert_eq!(s.pending_delete, Some(3));
597 }
598
599 #[test]
600 fn cancel_delete_clears_pending_delete() {
601 let mut s = TunnelState::default();
602 s.pending_delete = Some(2);
603 s.cancel_delete();
604 assert!(s.pending_delete.is_none());
605 }
606
607 #[test]
608 fn request_delete_overwrites_existing_pending() {
609 let mut s = TunnelState::default();
610 s.pending_delete = Some(1);
611 s.request_delete(7);
612 assert_eq!(s.pending_delete, Some(7));
613 }
614
615 #[test]
616 fn cancel_delete_is_idempotent_on_empty_pending() {
617 let mut s = TunnelState::default();
618 s.cancel_delete();
619 s.cancel_delete();
620 assert!(s.pending_delete.is_none());
621 }
622}