1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::Ordering;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7use crate::app::TunnelFormBaseline;
8use crate::app::forms::TunnelForm;
9use crate::tunnel::{ActiveTunnel, TunnelRule};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum TunnelSortMode {
14 #[default]
17 MostRecent,
18 AlphaHostname,
20}
21
22impl TunnelSortMode {
23 pub fn next(self) -> Self {
24 match self {
25 TunnelSortMode::MostRecent => TunnelSortMode::AlphaHostname,
26 TunnelSortMode::AlphaHostname => TunnelSortMode::MostRecent,
27 }
28 }
29
30 pub fn label(self) -> &'static str {
31 match self {
32 TunnelSortMode::MostRecent => "most recent",
33 TunnelSortMode::AlphaHostname => "A-Z hostname",
34 }
35 }
36}
37use crate::tunnel_live::{
38 ChannelEventKind, ClientPeer, LsofMessage, LsofPollerHandle, PEER_VIZ_BUCKETS, ParserMessage,
39 PortConflict, TunnelLiveSnapshot,
40};
41
42pub struct TunnelState {
48 pub(in crate::app) list: Vec<TunnelRule>,
49 pub(in crate::app) form: TunnelForm,
50 pub(in crate::app) active: HashMap<String, ActiveTunnel>,
51 pub(in crate::app) form_baseline: Option<TunnelFormBaseline>,
52 pub(in crate::app) pending_delete: Option<usize>,
53 pub(in crate::app) summaries_cache: HashMap<String, String>,
54 pub(in crate::app) sort_mode: TunnelSortMode,
56
57 pub(in crate::app) parser_tx: Sender<ParserMessage>,
61 pub(in crate::app) parser_rx: Receiver<ParserMessage>,
62 pub(in crate::app) lsof_tx: Sender<LsofMessage>,
63 pub(in crate::app) lsof_rx: Receiver<LsofMessage>,
64
65 pub(in crate::app) clients: HashMap<u16, Vec<ClientPeer>>,
68 pub(in crate::app) conflicts: HashMap<u16, PortConflict>,
69 pub(in crate::app) last_lsof_at: Option<Instant>,
70
71 pub(in crate::app) peer_viz: HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]>,
79 pub(in crate::app) peer_viz_last_push: Option<Instant>,
86 pub(in crate::app) peer_viz_prev_push: Option<Instant>,
87
88 pub(in crate::app) lsof: Option<LsofPollerHandle>,
92
93 pub(in crate::app) demo_live_snapshots: HashMap<String, TunnelLiveSnapshot>,
97}
98
99impl Default for TunnelState {
100 fn default() -> Self {
101 let (parser_tx, parser_rx) = std::sync::mpsc::channel::<ParserMessage>();
102 let (lsof_tx, lsof_rx) = std::sync::mpsc::channel::<LsofMessage>();
103 Self {
104 list: Vec::new(),
105 form: TunnelForm::new(),
106 active: HashMap::new(),
107 form_baseline: None,
108 pending_delete: None,
109 summaries_cache: HashMap::new(),
110 sort_mode: TunnelSortMode::default(),
111 parser_tx,
112 parser_rx,
113 lsof_tx,
114 lsof_rx,
115 clients: HashMap::new(),
116 conflicts: HashMap::new(),
117 last_lsof_at: None,
118 peer_viz: HashMap::new(),
119 peer_viz_last_push: None,
120 peer_viz_prev_push: None,
121 lsof: None,
122 demo_live_snapshots: HashMap::new(),
123 }
124 }
125}
126
127impl Drop for TunnelState {
128 fn drop(&mut self) {
129 if let Some(mut handle) = self.lsof.take() {
130 handle.shutdown();
131 }
132 }
133}
134
135impl TunnelState {
136 pub fn list(&self) -> &[TunnelRule] {
137 &self.list
138 }
139
140 pub fn list_mut(&mut self) -> &mut Vec<TunnelRule> {
141 &mut self.list
142 }
143
144 pub fn form(&self) -> &TunnelForm {
145 &self.form
146 }
147
148 pub fn form_mut(&mut self) -> &mut TunnelForm {
149 &mut self.form
150 }
151
152 pub fn reset_form(&mut self) {
153 self.form = TunnelForm::new();
154 }
155
156 pub fn load_directives(
158 &mut self,
159 config: &crate::ssh_config::model::SshConfigFile,
160 alias: &str,
161 ) {
162 self.list = config.find_tunnel_directives(alias);
163 }
164
165 pub fn form_is_dirty(&self) -> bool {
167 match &self.form_baseline {
168 Some(b) => {
169 self.form.tunnel_type != b.tunnel_type
170 || self.form.bind_port != b.bind_port
171 || self.form.remote_host != b.remote_host
172 || self.form.remote_port != b.remote_port
173 || self.form.bind_address != b.bind_address
174 }
175 None => false,
176 }
177 }
178
179 pub fn active(&self) -> &HashMap<String, ActiveTunnel> {
180 &self.active
181 }
182
183 pub fn active_get(&self, alias: &str) -> Option<&ActiveTunnel> {
184 self.active.get(alias)
185 }
186
187 pub fn active_get_mut(&mut self, alias: &str) -> Option<&mut ActiveTunnel> {
188 self.active.get_mut(alias)
189 }
190
191 pub fn active_contains(&self, alias: &str) -> bool {
192 self.active.contains_key(alias)
193 }
194
195 pub fn active_insert(&mut self, alias: String, tunnel: ActiveTunnel) {
196 self.active.insert(alias, tunnel);
197 }
198
199 pub fn active_remove(&mut self, alias: &str) -> Option<ActiveTunnel> {
200 self.active.remove(alias)
201 }
202
203 pub fn drain_active(&mut self) -> std::collections::hash_map::Drain<'_, String, ActiveTunnel> {
204 self.active.drain()
205 }
206
207 pub fn clear_active(&mut self) {
208 self.active.clear();
209 }
210
211 pub fn pending_delete(&self) -> Option<usize> {
212 self.pending_delete
213 }
214
215 pub fn take_pending_delete(&mut self) -> Option<usize> {
216 self.pending_delete.take()
217 }
218
219 pub fn sort_mode(&self) -> TunnelSortMode {
220 self.sort_mode
221 }
222
223 pub fn set_sort_mode(&mut self, mode: TunnelSortMode) {
224 self.sort_mode = mode;
225 }
226
227 pub fn form_baseline(&self) -> Option<&TunnelFormBaseline> {
228 self.form_baseline.as_ref()
229 }
230
231 pub fn set_form_baseline(&mut self, baseline: Option<TunnelFormBaseline>) {
232 self.form_baseline = baseline;
233 }
234
235 pub fn demo_live_snapshots(&self) -> &HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
236 &self.demo_live_snapshots
237 }
238
239 pub fn demo_live_snapshots_mut(
240 &mut self,
241 ) -> &mut HashMap<String, crate::tunnel_live::TunnelLiveSnapshot> {
242 &mut self.demo_live_snapshots
243 }
244
245 pub fn parser_tx(&self) -> Sender<ParserMessage> {
246 self.parser_tx.clone()
247 }
248
249 pub fn clients(&self) -> &HashMap<u16, Vec<ClientPeer>> {
250 &self.clients
251 }
252
253 pub fn peer_viz(&self) -> &HashMap<(u16, String), [u64; PEER_VIZ_BUCKETS]> {
254 &self.peer_viz
255 }
256
257 pub fn peer_viz_last_push(&self) -> Option<Instant> {
258 self.peer_viz_last_push
259 }
260
261 pub fn peer_viz_prev_push(&self) -> Option<Instant> {
262 self.peer_viz_prev_push
263 }
264
265 pub fn summaries_cache(&self) -> &HashMap<String, String> {
266 &self.summaries_cache
267 }
268
269 pub fn summaries_cache_mut(&mut self) -> &mut HashMap<String, String> {
270 &mut self.summaries_cache
271 }
272
273 pub fn request_delete(&mut self, idx: usize) {
276 self.pending_delete = Some(idx);
277 }
278
279 pub fn cancel_delete(&mut self) {
281 self.pending_delete = None;
282 }
283
284 pub fn ensure_lsof_poller(&mut self) {
288 if self.lsof.is_some() {
289 return;
290 }
291 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
292 let bind_ports: Arc<Mutex<Vec<(String, u16, u32)>>> = Arc::new(Mutex::new(Vec::new()));
293 let thread = crate::tunnel_live::spawn_lsof_poller(
294 bind_ports.clone(),
295 self.lsof_tx.clone(),
296 stop.clone(),
297 );
298 self.lsof = Some(LsofPollerHandle {
299 stop,
300 bind_ports,
301 thread: Some(thread),
302 });
303 log::debug!("[purple] Tunnel lsof poller started");
304 }
305
306 pub fn set_lsof_ports(&self, ports: Vec<(String, u16, u32)>) {
311 if let Some(handle) = &self.lsof {
312 if let Ok(mut g) = handle.bind_ports.lock() {
313 *g = ports;
314 }
315 }
316 }
317
318 pub fn poll(&mut self) -> Vec<(String, String, bool)> {
324 let now = Instant::now();
325 while let Ok(msg) = self.parser_rx.try_recv() {
328 if let Some(tunnel) = self.active.get_mut(&msg.alias) {
329 tunnel.live.record_event(msg.event);
330 }
331 }
332 let mut latest_lsof: Option<LsofMessage> = None;
336 while let Ok(msg) = self.lsof_rx.try_recv() {
337 latest_lsof = Some(msg);
338 }
339 if let Some(msg) = latest_lsof {
340 self.clients = msg.clients;
341 self.conflicts = msg.conflicts;
342 self.last_lsof_at = Some(msg.at);
343 self.push_peer_viz(now);
348 }
349 for tunnel in self.active.values_mut() {
352 tunnel.live.rotate_if_due(now);
353 }
354 let port_to_alias: HashMap<u16, String> = self
359 .lsof
360 .as_ref()
361 .and_then(|h| h.bind_ports.lock().ok().map(|g| g.clone()))
362 .map(|v| v.into_iter().map(|(a, p, _)| (p, a)).collect())
363 .unwrap_or_default();
364 let mut bps_per_alias: HashMap<String, (u64, u64, bool)> = HashMap::new();
370 for (port, peers) in &self.clients {
371 let Some(alias) = port_to_alias.get(port) else {
372 continue;
373 };
374 let entry = bps_per_alias
375 .entry(alias.clone())
376 .or_insert((0u64, 0u64, false));
377 for peer in peers {
378 entry.0 = entry.0.saturating_add(peer.current_rx_bps);
379 entry.1 = entry.1.saturating_add(peer.current_tx_bps);
380 if peer.last_sample_at.is_some() {
381 entry.2 = true;
382 }
383 }
384 }
385 for (alias, tunnel) in self.active.iter_mut() {
386 let (rx, tx, ready) = bps_per_alias.get(alias).copied().unwrap_or((0, 0, false));
387 tunnel.live.current_rx_bps = rx;
388 tunnel.live.current_tx_bps = tx;
389 tunnel.live.peak_rx_bps = tunnel.live.peak_rx_bps.max(rx);
390 tunnel.live.peak_tx_bps = tunnel.live.peak_tx_bps.max(tx);
391 if ready {
392 tunnel.live.last_throughput_at = Some(now);
393 }
394 }
395 let mut concurrent_per_alias: HashMap<String, u32> = HashMap::new();
402 for (port, peers) in &self.clients {
403 if let Some(alias) = port_to_alias.get(port) {
404 *concurrent_per_alias.entry(alias.clone()).or_insert(0) += peers.len() as u32;
405 }
406 }
407 for (alias, tunnel) in self.active.iter_mut() {
408 let lsof_count = concurrent_per_alias.get(alias).copied().unwrap_or(0);
409 let sample = lsof_count.max(tunnel.live.active_channels);
410 tunnel.live.sample_activity(sample);
411 }
412
413 if self.active.is_empty() {
414 return Vec::new();
415 }
416 let mut exited = Vec::new();
417 let mut to_remove = Vec::new();
418 for (alias, tunnel) in &mut self.active {
419 match tunnel.child.try_wait() {
420 Ok(Some(status)) => {
421 let stderr_msg = tunnel
425 .live
426 .stderr_buffer
427 .lock()
428 .ok()
429 .and_then(|b| b.iter().rev().find(|s| !s.trim().is_empty()).cloned())
430 .map(|s| s.trim().to_string())
431 .filter(|s| !s.is_empty());
432 let exit_code = status.code().unwrap_or(-1);
433 if !status.success() {
434 log::error!(
435 "[external] Tunnel exited unexpectedly: alias={alias} exit={exit_code}"
436 );
437 if let Some(ref err) = stderr_msg {
438 log::debug!("[external] Tunnel stderr: {}", err.trim());
439 }
440 }
441 let last_exit_line = stderr_msg
442 .clone()
443 .unwrap_or_else(|| format!("exit code {}", exit_code));
444 tunnel.live.last_exit = Some((exit_code, last_exit_line));
445 tunnel.live.parser_stop.store(true, Ordering::Relaxed);
446 if tunnel.live.active_channels > 0 {
449 let close_now = ChannelEventKind::Close;
450 let ids: Vec<u32> = tunnel.live.channel_open.keys().copied().collect();
451 for id in ids {
452 tunnel.live.record_event(crate::tunnel_live::ChannelEvent {
453 at: now,
454 channel_id: id,
455 kind: close_now,
456 channel_kind: None,
457 opened_at: None,
458 });
459 }
460 }
461 let (msg, is_error) = if status.success() {
462 (format!("Tunnel for {} closed.", alias), false)
463 } else if let Some(err) = stderr_msg {
464 (format!("Tunnel for {}: {}", alias, err), true)
465 } else {
466 (
467 format!("Tunnel for {} exited with code {}.", alias, exit_code),
468 true,
469 )
470 };
471 exited.push((alias.clone(), msg, is_error));
472 to_remove.push(alias.clone());
473 }
474 Ok(None) => {}
475 Err(e) => {
476 exited.push((
477 alias.clone(),
478 format!("Tunnel for {} lost: {}", alias, e),
479 true,
480 ));
481 to_remove.push(alias.clone());
482 }
483 }
484 }
485 for alias in to_remove {
486 self.active.remove(&alias);
487 }
488 exited
489 }
490
491 pub fn push_peer_viz(&mut self, now: Instant) {
499 let mut live: HashSet<(u16, String)> = HashSet::new();
500 for (port, peers) in &self.clients {
501 for peer in peers {
502 let key = (*port, peer.src.clone());
503 live.insert(key.clone());
504 let combined = peer.current_rx_bps.saturating_add(peer.current_tx_bps);
505 let history = self
506 .peer_viz
507 .entry(key)
508 .or_insert_with(|| [0u64; PEER_VIZ_BUCKETS]);
509 history.rotate_left(1);
510 history[PEER_VIZ_BUCKETS - 1] = combined;
511 }
512 }
513 self.peer_viz.retain(|key, _| live.contains(key));
514 self.peer_viz_prev_push = self.peer_viz_last_push;
515 self.peer_viz_last_push = Some(now);
516 }
517
518 pub fn prune_orphans(&mut self, valid_aliases: &HashSet<&str>) {
523 let pre = self.demo_live_snapshots.len();
524 self.demo_live_snapshots
525 .retain(|alias, _| valid_aliases.contains(alias.as_str()));
526 let dropped = pre.saturating_sub(self.demo_live_snapshots.len());
527 if dropped > 0 {
528 log::debug!(
529 "[purple] reload_hosts: dropped {dropped} orphan demo_live_snapshots entrie(s)"
530 );
531 }
532 }
533
534 pub fn migrate_alias(&mut self, old: &str, new: &str) {
540 if old == new {
541 return;
542 }
543 if let Some(t) = self.active.remove(old) {
544 self.active.insert(new.to_string(), t);
545 }
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 #[test]
554 fn default_state_is_empty() {
555 let s = TunnelState::default();
556 assert!(s.list.is_empty());
557 assert!(s.active.is_empty());
558 assert!(s.pending_delete.is_none());
559 assert!(s.summaries_cache.is_empty());
560 }
561
562 #[test]
563 fn poll_on_empty_returns_empty_vec() {
564 let mut s = TunnelState::default();
568 let result = s.poll();
569 assert!(result.is_empty());
570 assert!(s.active.is_empty());
571 }
572
573 fn make_peer(src: &str, rx: u64, tx: u64) -> ClientPeer {
574 ClientPeer {
575 src: src.to_string(),
576 process: "curl".into(),
577 pid: 1234,
578 since: Instant::now(),
579 responsible_app: None,
580 current_rx_bps: rx,
581 current_tx_bps: tx,
582 bytes_rcvd: None,
583 bytes_sent: None,
584 last_sample_at: Some(Instant::now()),
585 }
586 }
587
588 #[test]
589 fn push_peer_viz_initialises_history_and_writes_combined_bps_to_rightmost_cell() {
590 let mut s = TunnelState::default();
591 s.clients
592 .insert(8080, vec![make_peer("127.0.0.1:1", 1_000, 500)]);
593 let now = Instant::now();
594 s.push_peer_viz(now);
595 let key = (8080u16, "127.0.0.1:1".to_string());
596 let history = s.peer_viz.get(&key).expect("entry created on first push");
597 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 1_500);
598 for cell in &history[..PEER_VIZ_BUCKETS - 1] {
599 assert_eq!(*cell, 0);
600 }
601 assert_eq!(s.peer_viz_last_push, Some(now));
602 assert_eq!(s.peer_viz_prev_push, None);
603 }
604
605 #[test]
606 fn push_peer_viz_rotates_left_on_each_call() {
607 let mut s = TunnelState::default();
608 s.clients
609 .insert(8080, vec![make_peer("127.0.0.1:1", 100, 0)]);
610 let t0 = Instant::now();
611 s.push_peer_viz(t0);
612 if let Some(peers) = s.clients.get_mut(&8080) {
615 peers[0].current_rx_bps = 200;
616 }
617 let t1 = t0 + std::time::Duration::from_secs(2);
618 s.push_peer_viz(t1);
619 let key = (8080u16, "127.0.0.1:1".to_string());
620 let history = s.peer_viz.get(&key).expect("entry exists");
621 assert_eq!(history[PEER_VIZ_BUCKETS - 1], 200);
622 assert_eq!(history[PEER_VIZ_BUCKETS - 2], 100);
623 assert_eq!(s.peer_viz_last_push, Some(t1));
626 assert_eq!(s.peer_viz_prev_push, Some(t0));
627 }
628
629 #[test]
630 fn push_peer_viz_garbage_collects_disappeared_peers() {
631 let mut s = TunnelState::default();
632 s.clients.insert(8080, vec![make_peer("127.0.0.1:1", 0, 0)]);
633 let t0 = Instant::now();
634 s.push_peer_viz(t0);
635 assert!(
636 s.peer_viz
637 .contains_key(&(8080u16, "127.0.0.1:1".to_string()))
638 );
639 s.clients.clear();
641 s.push_peer_viz(t0 + std::time::Duration::from_secs(2));
642 assert!(s.peer_viz.is_empty());
643 }
644
645 #[test]
646 fn request_delete_sets_pending_delete_to_some_idx() {
647 let mut s = TunnelState::default();
648 s.request_delete(3);
649 assert_eq!(s.pending_delete, Some(3));
650 }
651
652 #[test]
653 fn cancel_delete_clears_pending_delete() {
654 let mut s = TunnelState::default();
655 s.pending_delete = Some(2);
656 s.cancel_delete();
657 assert!(s.pending_delete.is_none());
658 }
659
660 #[test]
661 fn request_delete_overwrites_existing_pending() {
662 let mut s = TunnelState::default();
663 s.pending_delete = Some(1);
664 s.request_delete(7);
665 assert_eq!(s.pending_delete, Some(7));
666 }
667
668 #[test]
669 fn cancel_delete_is_idempotent_on_empty_pending() {
670 let mut s = TunnelState::default();
671 s.cancel_delete();
672 s.cancel_delete();
673 assert!(s.pending_delete.is_none());
674 }
675
676 fn empty_snapshot() -> crate::tunnel_live::TunnelLiveSnapshot {
677 crate::tunnel_live::TunnelLiveSnapshot {
678 uptime_secs: 0,
679 active_channels: 0,
680 peak_concurrent: 0,
681 total_opens: 0,
682 idle_secs: 0,
683 rx_history: [0; crate::tunnel_live::HISTORY_BUCKETS],
684 tx_history: [0; crate::tunnel_live::HISTORY_BUCKETS],
685 current_rx_bps: 0,
686 current_tx_bps: 0,
687 peak_rx_bps: 0,
688 peak_tx_bps: 0,
689 throughput_ready: false,
690 clients: vec![],
691 events: vec![],
692 currently_open: vec![],
693 conflict: None,
694 last_exit: None,
695 }
696 }
697
698 #[test]
699 fn prune_orphans_drops_unknown_demo_snapshots() {
700 let mut s = TunnelState::default();
701 s.demo_live_snapshots
702 .insert("keep".to_string(), empty_snapshot());
703 s.demo_live_snapshots
704 .insert("drop".to_string(), empty_snapshot());
705
706 let valid: HashSet<&str> = ["keep"].into_iter().collect();
707 s.prune_orphans(&valid);
708
709 assert!(s.demo_live_snapshots.contains_key("keep"));
710 assert!(!s.demo_live_snapshots.contains_key("drop"));
711 }
712
713 #[test]
714 fn migrate_alias_is_noop_on_empty_active_map() {
715 let mut s = TunnelState::default();
721 s.migrate_alias("missing", "new");
722 assert!(s.active.is_empty());
723 s.migrate_alias("same", "same");
724 assert!(s.active.is_empty());
725 }
726
727 fn state_matching_baseline() -> TunnelState {
728 let mut s = TunnelState::default();
729 s.form.tunnel_type = crate::tunnel::TunnelType::Local;
730 s.form.bind_port = "8080".into();
731 s.form.remote_host = "db.internal".into();
732 s.form.remote_port = "5432".into();
733 s.form.bind_address = "127.0.0.1".into();
734 s.set_form_baseline(Some(TunnelFormBaseline {
735 tunnel_type: crate::tunnel::TunnelType::Local,
736 bind_port: "8080".into(),
737 remote_host: "db.internal".into(),
738 remote_port: "5432".into(),
739 bind_address: "127.0.0.1".into(),
740 }));
741 s
742 }
743
744 #[test]
745 fn form_is_dirty_is_false_without_a_baseline() {
746 let mut s = TunnelState::default();
747 s.form.bind_port = "9000".into();
748 assert!(!s.form_is_dirty());
749 }
750
751 #[test]
752 fn form_is_dirty_is_false_when_form_equals_baseline() {
753 assert!(!state_matching_baseline().form_is_dirty());
754 }
755
756 fn assert_field_change_is_dirty(field: &str, mutate: impl FnOnce(&mut TunnelForm)) {
757 let mut s = state_matching_baseline();
758 mutate(&mut s.form);
759 assert!(s.form_is_dirty(), "a change in {field} must read dirty");
760 }
761
762 #[test]
763 fn form_is_dirty_detects_a_change_in_each_field() {
764 assert_field_change_is_dirty("tunnel_type", |f| {
765 f.tunnel_type = crate::tunnel::TunnelType::Remote;
766 });
767 assert_field_change_is_dirty("bind_port", |f| f.bind_port.push('1'));
768 assert_field_change_is_dirty("remote_host", |f| f.remote_host.push('x'));
769 assert_field_change_is_dirty("remote_port", |f| f.remote_port.push('1'));
770 assert_field_change_is_dirty("bind_address", |f| f.bind_address.push('9'));
771 }
772}