1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14 #[default]
17 MostRecent,
18 AlphaHostname,
20}
21
22impl TunnelSortMode {
23 pub fn next(self) -> Self {
24 match self {
25 TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26 TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27 }
28 }
29
30 pub fn label(self) -> &'static str {
31 match self {
32 TunnelSortMode::MostRecent => "most recent",
33 TunnelSortMode::AlphaHostname => "A-Z hostname",
34 }
35 }
36}
37use crate::tunnel_live::{
38 ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39 PortConflict, TunnelLiveSnapshot,
40};
41
42pub struct TunnelState {
48 pub(in crate::app) list: Vec<TunnelRule>,
49 pub(crate) form: TunnelForm,
53 pub(in crate::app) active: HashMap<String, ActiveTunnel>,
54 pub(in crate::app) form_baseline: Option<TunnelFormBaseline>,
55 pub(crate) pending_delete: Option<usize>,
59 pub(in crate::app) summaries_cache: HashMap<String, String>,
60 pub(in crate::app) sort_mode: TunnelSortMode,
62
63 pub(in crate::app) parser_tx: Sender<ParserMessage>,
67 pub(in crate::app) parser_rx: Receiver<ParserMessage>,
68 pub(in crate::app) lsof_tx: Sender<LsofMessage>,
69 pub(in crate::app) lsof_rx: Receiver<LsofMessage>,
70
71 pub(in crate::app) clients: HashMap<u16, Vec<ClientPeer>>,
74 pub(in crate::app) conflicts: HashMap<u16, PortConflict>,
75 pub(in crate::app) last_lsof_at: Option<Instant>,
76
77 pub(in crate::app) peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
85 pub(in crate::app) peer_viz_last_push: Option<Instant>,
92 pub(in crate::app) peer_viz_prev_push: Option<Instant>,
93
94 pub(in crate::app) lsof: Option<LsofPollerHandle>,
98
99 pub(in crate::app) demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
103}
104
105impl Default for TunnelState {
106 fn default() -> Self {
107 let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
108 let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
109 Self {
110 list: Vec::new(),
111 form: TunnelForm::new(),
112 active: HashMap::new(),
113 form_baseline: None,
114 pending_delete: None,
115 summaries_cache: HashMap::new(),
116 sort_mode: TunnelSortMode::default(),
117 parser_tx,
118 parser_rx,
119 lsof_tx,
120 lsof_rx,
121 clients: HashMap::new(),
122 conflicts: HashMap::new(),
123 last_lsof_at: None,
124 peer_viz: HashMap::new(),
125 peer_viz_last_push: None,
126 peer_viz_prev_push: None,
127 lsof: None,
128 demo_live_snapshots: HashMap::new(),
129 }
130 }
131}
132
133impl Drop for TunnelState {
134 fn drop(&mut self) {
135 if let Some(mut handle) = self.lsof.take() {
136 handle.shutdown();
137 }
138 }
139}
140
141impl TunnelState {
142 pub fn list(&self) -> &[TunnelRule] {
143 &self.list
144 }
145
146 pub fn list_mut(&mut self) -> &mut Vec<TunnelRule> {
147 &mut self.list
148 }
149
150 pub fn form(&self) -> &TunnelForm {
151 &self.form
152 }
153
154 pub fn form_mut(&mut self) -> &mut TunnelForm {
155 &mut self.form
156 }
157
158 pub fn reset_form(&mut self) {
159 self.form = TunnelForm::new();
160 }
161
162 pub fn active(&self) -> &HashMap<String, ActiveTunnel> {
163 &self.active
164 }
165
166 pub fn active_get(&self, alias: &str) -> Option<&ActiveTunnel> {
167 self.active.get(alias)
168 }
169
170 pub fn active_get_mut(&mut self, alias: &str) -> Option<&mut ActiveTunnel> {
171 self.active.get_mut(alias)
172 }
173
174 pub fn active_contains(&self, alias: &str) -> bool {
175 self.active.contains_key(alias)
176 }
177
178 pub fn active_insert(&mut self, alias: String, tunnel: ActiveTunnel) {
179 self.active.insert(alias, tunnel);
180 }
181
182 pub fn active_remove(&mut self, alias: &str) -> Option<ActiveTunnel> {
183 self.active.remove(alias)
184 }
185
186 pub fn drain_active(&mut self) -> std::collections::hash_map::Drain<'_, String, ActiveTunnel> {
187 self.active.drain()
188 }
189
190 pub fn clear_active(&mut self) {
191 self.active.clear();
192 }
193
194 pub fn pending_delete(&self) -> Option<usize> {
195 self.pending_delete
196 }
197
198 pub fn take_pending_delete(&mut self) -> Option<usize> {
199 self.pending_delete.take()
200 }
201
202 pub fn sort_mode(&self) -> TunnelSortMode {
203 self.sort_mode
204 }
205
206 pub fn set_sort_mode(&mut self, mode: TunnelSortMode) {
207 self.sort_mode = mode;
208 }
209
210 pub fn form_baseline(&self) -> Option<&TunnelFormBaseline> {
211 self.form_baseline.as_ref()
212 }
213
214 pub fn set_form_baseline(&mut self, baseline: Option<TunnelFormBaseline>) {
215 self.form_baseline = baseline;
216 }
217
218 pub fn demo_live_snapshots(&self) -> &HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
219 &self.demo_live_snapshots
220 }
221
222 pub fn demo_live_snapshots_mut(
223 &mut self,
224 ) -> &mut HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
225 &mut self.demo_live_snapshots
226 }
227
228 pub fn parser_tx(&self) -> Sender<ParserMessage> {
229 self.parser_tx.clone()
230 }
231
232 pub fn clients(&self) -> &HashMap<u16, Vec<ClientPeer>> {
233 &self.clients
234 }
235
236 pub fn peer_viz(&self) -> &HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]> {
237 &self.peer_viz
238 }
239
240 pub fn peer_viz_last_push(&self) -> Option<Instant> {
241 self.peer_viz_last_push
242 }
243
244 pub fn peer_viz_prev_push(&self) -> Option<Instant> {
245 self.peer_viz_prev_push
246 }
247
248 pub fn summaries_cache(&self) -> &HashMap<String, String> {
249 &self.summaries_cache
250 }
251
252 pub fn summaries_cache_mut(&mut self) -> &mut HashMap<String, String> {
253 &mut self.summaries_cache
254 }
255
256 pub fn request_delete(&mut self, idx: usize) {
259 self.pending_delete = Some(idx);
260 }
261
262 pub fn cancel_delete(&mut self) {
264 self.pending_delete = None;
265 }
266
267 pub fn ensure_lsof_poller(&mut self) {
271 if self.lsof.is_some() {
272 return;
273 }
274 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
275 let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
276 let thread = crate::tunnel_live::spawn_lsof_poller(
277 bind_ports.clone(),
278 self.lsof_tx.clone(),
279 stop.clone(),
280 );
281 self.lsof = Some(LsofPollerHandle {
282 stop,
283 bind_ports,
284 thread: Some(thread),
285 });
286 log::debug!("[purple] Tunnel lsof poller started");
287 }
288
289 pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
294 if let Some(handle) = &self.lsof {
295 if let Ok(mut g) = handle.bind_ports.lock() {
296 *g = ports;
297 }
298 }
299 }
300
301 pub fn poll(&mut self) -> Vec<(String, String, bool)> {
307 let now = Instant::now();
308 while let Ok(msg) = self.parser_rx.try_recv() {
311 if let Some(tunnel) = self.active.get_mut(&msg.alias) {
312 tunnel.live.record_event(msg.event);
313 }
314 }
315 let mut latest_lsof: Option<LsofMessage> = None;
319 while let Ok(msg) = self.lsof_rx.try_recv() {
320 latest_lsof = Some(msg);
321 }
322 if let Some(msg) = latest_lsof {
323 self.clients = msg.clients;
324 self.conflicts = msg.conflicts;
325 self.last_lsof_at = Some(msg.at);
326 self.push_peer_viz(now);
331 }
332 for tunnel in self.active.values_mut() {
335 tunnel.live.rotate_if_due(now);
336 }
337 let port_to_alias: HashMap<u16, String> = self
342 .lsof
343 .as_ref()
344 .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
345 .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
346 .unwrap_or_default();
347 let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
353 for (port, peers) in &self.clients {
354 let Some(alias) = port_to_alias.get(port) else {
355 continue;
356 };
357 let entry = bps_per_alias
358 .entry(alias.clone())
359 .or_insert((0u64, 0u64, false));
360 for peer in peers {
361 entry.0 = entry.0.saturating_add(peer.current_rx_bps);
362 entry.1 = entry.1.saturating_add(peer.current_tx_bps);
363 if peer.last_sample_at.is_some() {
364 entry.2 = true;
365 }
366 }
367 }
368 for (alias, tunnel) in self.active.iter_mut() {
369 let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
370 tunnel.live.current_rx_bps = rx;
371 tunnel.live.current_tx_bps = tx;
372 tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
373 tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
374 if ready {
375 tunnel.live.last_throughput_at = Some(now);
376 }
377 }
378 let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
385 for (port, peers) in &self.clients {
386 if let Some(alias) = port_to_alias.get(port) {
387 *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
388 }
389 }
390 for (alias, tunnel) in self.active.iter_mut() {
391 let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
392 let sample = lsof_count.max(tunnel.live.active_channels);
393 tunnel.live.sample_activity(sample);
394 }
395
396 if self.active.is_empty() {
397 return Vec::new();
398 }
399 let mut exited = Vec::new();
400 let mut to_remove = Vec::new();
401 for (alias, tunnel) in &mut self.active {
402 match tunnel.child.try_wait() {
403 Ok(Some(status)) => {
404 let stderr_msg = tunnel
408 .live
409 .stderr_buffer
410 .lock()
411 .ok()
412 .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
413 .map(|s| s.trim().to_string())
414 .filter(|s| !s.is_empty());
415 let exit_code = status.code().unwrap_or(-1);
416 if !status.success() {
417 log::error!(
418 "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
419 );
420 if let Some(ref err) = stderr_msg {
421 log::debug!("[external] Tunnel stderr: {}", err.trim());
422 }
423 }
424 let last_exit_line = stderr_msg
425 .clone()
426 .unwrap_or_else(|| format!("exit code {}", exit_code));
427 tunnel.live.last_exit = Some((exit_code, last_exit_line));
428 tunnel.live.parser_stop.store(true, Ordering::Relaxed);
429 if tunnel.live.active_channels > 0 {
432 let close_now = ChannelEventKind::Close;
433 let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
434 for id in ids {
435 tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
436 at: now,
437 channel_id: id,
438 kind: close_now,
439 channel_kind: None,
440 opened_at: None,
441 });
442 }
443 }
444 let (msg, is_error) = if status.success() {
445 (format!("Tunnel for {} closed.", alias), false)
446 } else if let Some(err) = stderr_msg {
447 (format!("Tunnel for {}: {}", alias, err), true)
448 } else {
449 (
450 format!("Tunnel for {} exited with code {}.", alias, exit_code),
451 true,
452 )
453 };
454 exited.push((alias.clone(), msg, is_error));
455 to_remove.push(alias.clone());
456 }
457 Ok(None) => {}
458 Err(e) => {
459 exited.push((
460 alias.clone(),
461 format!("Tunnel for {} lost: {}", alias, e),
462 true,
463 ));
464 to_remove.push(alias.clone());
465 }
466 }
467 }
468 for alias in to_remove {
469 self.active.remove(&alias);
470 }
471 exited
472 }
473
474 pub fn push_peer_viz(&mut self, now: Instant) {
482 let mut live: HashSet<(u16, String)> = HashSet::new();
483 for (port, peers) in &self.clients {
484 for peer in peers {
485 let key = (*port, peer.src.clone());
486 live.insert(key.clone());
487 let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
488 let history = self
489 .peer_viz
490 .entry(key)
491 .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
492 history.rotate_left(1);
493 history[PEER_VIZ_BUCKETS - 1] = combined;
494 }
495 }
496 self.peer_viz.retain(|key, _| live.contains(key));
497 self.peer_viz_prev_push = self.peer_viz_last_push;
498 self.peer_viz_last_push = Some(now);
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn default_state_is_empty() {
508 let s = TunnelState::default();
509 assert!(s.list.is_empty());
510 assert!(s.active.is_empty());
511 assert!(s.pending_delete.is_none());
512 assert!(s.summaries_cache.is_empty());
513 }
514
515 #[test]
516 fn poll_on_empty_returns_empty_vec() {
517 let mut s = TunnelState::default();
521 let result = s.poll();
522 assert!(result.is_empty());
523 assert!(s.active.is_empty());
524 }
525
526 fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
527 ClientPeer {
528 src: src.to_string(),
529 process: "curl".into(),
530 pid: 1234,
531 since: Instant::now(),
532 responsible_app: None,
533 current_rx_bps: rx,
534 current_tx_bps: tx,
535 bytes_rcvd: None,
536 bytes_sent: None,
537 last_sample_at: Some(Instant::now()),
538 }
539 }
540
541 #[test]
542 fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
543 let mut s = TunnelState::default();
544 s.clients
545 .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
546 let now = Instant::now();
547 s.push_peer_viz(now);
548 let key = (8080u16, "127.0.0.1:1".to_string());
549 let history = s.peer_viz.get(&key).expect("entry created on first push");
550 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
551 for cell in &history[..PEER_VIZ_BUCKETS - 1] {
552 assert_eq!(*cell, 0);
553 }
554 assert_eq!(s.peer_viz_last_push, Some(now));
555 assert_eq!(s.peer_viz_prev_push, None);
556 }
557
558 #[test]
559 fn push_peer_viz_rotates_left_on_each_call() {
560 let mut s = TunnelState::default();
561 s.clients
562 .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
563 let t0 = Instant::now();
564 s.push_peer_viz(t0);
565 if let Some(peers) = s.clients.get_mut(&8080) {
568 peers[0].current_rx_bps = 200;
569 }
570 let t1 = t0 + std::time::Duration::from_secs(2);
571 s.push_peer_viz(t1);
572 let key = (8080u16, "127.0.0.1:1".to_string());
573 let history = s.peer_viz.get(&key).expect("entry exists");
574 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
575 assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
576 assert_eq!(s.peer_viz_last_push, Some(t1));
579 assert_eq!(s.peer_viz_prev_push, Some(t0));
580 }
581
582 #[test]
583 fn push_peer_viz_garbage_collects_disappeared_peers() {
584 let mut s = TunnelState::default();
585 s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
586 let t0 = Instant::now();
587 s.push_peer_viz(t0);
588 assert!(
589 s.peer_viz
590 .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
591 );
592 s.clients.clear();
594 s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
595 assert!(s.peer_viz.is_empty());
596 }
597
598 #[test]
599 fn request_delete_sets_pending_delete_to_some_idx() {
600 let mut s = TunnelState::default();
601 s.request_delete(3);
602 assert_eq!(s.pending_delete, Some(3));
603 }
604
605 #[test]
606 fn cancel_delete_clears_pending_delete() {
607 let mut s = TunnelState::default();
608 s.pending_delete = Some(2);
609 s.cancel_delete();
610 assert!(s.pending_delete.is_none());
611 }
612
613 #[test]
614 fn request_delete_overwrites_existing_pending() {
615 let mut s = TunnelState::default();
616 s.pending_delete = Some(1);
617 s.request_delete(7);
618 assert_eq!(s.pending_delete, Some(7));
619 }
620
621 #[test]
622 fn cancel_delete_is_idempotent_on_empty_pending() {
623 let mut s = TunnelState::default();
624 s.cancel_delete();
625 s.cancel_delete();
626 assert!(s.pending_delete.is_none());
627 }
628}