omnyssh 1.0.2

TUI SSH dashboard & server manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
//! Background metrics polling pool.
//!
//! Each host gets its own persistent tokio task that manages its SSH
//! connection and collects metrics at a configurable interval.
//!
//! Architecture:
//! - [`PollManager`] — created by the main app, owns abort handles.
//! - One `HostPoller` task per host — loops indefinitely until aborted.
//! - Implements exponential backoff on connection failures.
//! - One SSH connection per host, reused across polls.
//! - All data sent to the main event loop via `mpsc::Sender<AppEvent>`.

use std::collections::HashMap;
use std::time::{Duration, Instant};

use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::event::{AppEvent, Metrics, ProcessInfo};
use crate::ssh::client::{ConnectionStatus, Host};
use crate::ssh::metrics::{
    parse_cpu_proc_stat, parse_cpu_top, parse_cpu_top_macos, parse_disk_df, parse_loadavg,
    parse_ram_free, parse_ram_vmstat, parse_top_processes, parse_uptime,
};
use crate::ssh::session::SshSession;

// ---------------------------------------------------------------------------
// Backoff schedule
// ---------------------------------------------------------------------------

const BACKOFF_SECS: [u64; 4] = [30, 60, 120, 300];

struct BackoffState {
    step: usize,
}

impl BackoffState {
    fn new() -> Self {
        Self { step: 0 }
    }

    fn next_delay(&mut self) -> Duration {
        let secs = BACKOFF_SECS[self.step];
        self.step = (self.step + 1).min(BACKOFF_SECS.len() - 1);
        Duration::from_secs(secs)
    }

    fn reset(&mut self) {
        self.step = 0;
    }
}

// ---------------------------------------------------------------------------
// PollManager — owned by App, drives all HostPoller tasks
// ---------------------------------------------------------------------------

/// Manages background metric polling for all hosts.
///
/// Drop this struct to abort all poller tasks.
pub struct PollManager {
    task_handles: Vec<JoinHandle<()>>,
    /// Per-host channel to send an immediate-refresh signal.
    refresh_txs: HashMap<String, mpsc::Sender<()>>,
}

impl PollManager {
    /// Spawn one poller task per host.
    pub fn start(hosts: Vec<Host>, tx: mpsc::Sender<AppEvent>, poll_interval: Duration) -> Self {
        let mut task_handles = Vec::with_capacity(hosts.len());
        let mut refresh_txs = HashMap::with_capacity(hosts.len());

        for host in hosts {
            let (refresh_tx, refresh_rx) = mpsc::channel::<()>(4);
            refresh_txs.insert(host.name.clone(), refresh_tx);

            let event_tx = tx.clone();
            let interval = poll_interval;
            let handle = tokio::spawn(run_host_poller(host, event_tx, interval, refresh_rx));
            task_handles.push(handle);
        }

        Self {
            task_handles,
            refresh_txs,
        }
    }

    /// Trigger an immediate poll for all hosts (called on `r` key press).
    pub fn refresh_all(&self) {
        for (name, tx) in &self.refresh_txs {
            if tx.try_send(()).is_err() {
                tracing::debug!(host = %name, "refresh signal dropped — channel full or closed");
            }
        }
    }

    /// Abort all poller tasks. Called on app exit to allow clean shutdown.
    /// SSH sessions are dropped inside the tasks, which triggers
    /// russh's graceful disconnect.
    pub fn shutdown(self) {
        for handle in &self.task_handles {
            handle.abort();
        }
    }
}

// ---------------------------------------------------------------------------
// Per-host poller task
// ---------------------------------------------------------------------------

async fn run_host_poller(
    host: Host,
    tx: mpsc::Sender<AppEvent>,
    poll_interval: Duration,
    mut refresh_rx: mpsc::Receiver<()>,
) {
    let mut backoff = BackoffState::new();
    let mut session: Option<SshSession> = None;
    let mut discovery_done = false; // Track if we've done Quick Scan

    loop {
        // Ensure we have a live session.
        if session.is_none() {
            send_status(&tx, &host.name, ConnectionStatus::Connecting).await;
            match SshSession::connect(&host).await {
                Ok(s) => {
                    backoff.reset();
                    send_status(&tx, &host.name, ConnectionStatus::Connected).await;
                    session = Some(s);
                    discovery_done = false; // Reset discovery flag on new connection
                }
                Err(e) => {
                    tracing::debug!(host = %host.name, error = %e, "connection failed");
                    send_status(&tx, &host.name, ConnectionStatus::Failed(e.to_string())).await;
                    // Wait with backoff, allowing early refresh.
                    let delay = backoff.next_delay();
                    wait_or_refresh(delay, &mut refresh_rx).await;
                    continue;
                }
            }
        }

        // Run Quick Scan once per connection (don't block UI)
        // This happens right after connection before the first metrics poll
        if !discovery_done {
            if let Some(sess) = &session {
                // Run discovery asynchronously
                // Clone the session since Handle is Arc-based and cheap to clone
                let sess_clone = sess.clone();
                let host_name = host.name.clone();
                let tx_clone = tx.clone();
                tokio::spawn(async move {
                    match crate::ssh::discovery::quick_scan(
                        &sess_clone,
                        host_name.clone(),
                        tx_clone.clone(),
                    )
                    .await
                    {
                        Ok(()) => {
                            tracing::debug!(host = %host_name, "quick scan completed successfully");
                        }
                        Err(e) => {
                            tracing::warn!(host = %host_name, error = %e, "quick scan failed");
                            let _ = tx_clone
                                .send(AppEvent::DiscoveryFailed(host_name, e.to_string()))
                                .await;
                        }
                    }
                });
                discovery_done = true;
            }
        }

        // Collect metrics using the live session.
        // SAFETY: we only reach this point if session was set to Some above
        // (either just connected or carried over from the previous iteration).
        // This is a single-task async loop with no concurrent mutation, so
        // the expect is always satisfied.
        let sess = session.as_ref().expect("session is Some here");
        match collect_metrics(sess, &host.name).await {
            Ok(metrics) => {
                if tx
                    .send(AppEvent::MetricsUpdate(host.name.clone(), metrics))
                    .await
                    .is_err()
                {
                    break; // App has shut down.
                }
            }
            Err(e) => {
                tracing::debug!(host = %host.name, error = %e, "metric collection failed");
                // Session is broken — drop it and reconnect next iteration.
                session.take();
                send_status(&tx, &host.name, ConnectionStatus::Failed(e.to_string())).await;
                let delay = backoff.next_delay();
                wait_or_refresh(delay, &mut refresh_rx).await;
                continue;
            }
        }

        // Wait for the next poll interval or a manual refresh signal.
        wait_or_refresh(poll_interval, &mut refresh_rx).await;
    }
}

/// Wait for `delay`, but return early if a refresh signal is received.
async fn wait_or_refresh(delay: Duration, refresh_rx: &mut mpsc::Receiver<()>) {
    tokio::select! {
        _ = tokio::time::sleep(delay) => {}
        _ = refresh_rx.recv() => {}
    }
}

async fn send_status(tx: &mpsc::Sender<AppEvent>, name: &str, status: ConnectionStatus) {
    let _ = tx
        .send(AppEvent::HostStatusChanged(name.to_string(), status))
        .await;
}

// ---------------------------------------------------------------------------
// Metric collection
// ---------------------------------------------------------------------------

/// Run all metric commands and return a [`Metrics`] snapshot.
///
/// Tries Linux commands first. If the output doesn't match the expected
/// format, falls back to macOS/BSD variants (graceful degradation per
/// the risk matrix in tech.md §10).
///
/// Returns `Err` when all commands fail simultaneously — this indicates a dead
/// session and should prompt the caller to reconnect.
async fn collect_metrics(session: &SshSession, host_name: &str) -> anyhow::Result<Metrics> {
    // Run all commands concurrently for speed.
    let (cpu_out, mem_out, disk_out, uptime_out, loadavg_out) = tokio::join!(
        session.run_command("top -bn1 2>/dev/null | head -5"),
        session.run_command("free -b 2>/dev/null || vm_stat 2>/dev/null"),
        session.run_command("df -k / 2>/dev/null"),
        session.run_command("uptime 2>/dev/null"),
        session.run_command("cat /proc/loadavg 2>/dev/null"),
    );

    // If every command failed the session is almost certainly dead — return an
    // error so the poller drops the session and reconnects.
    if cpu_out.is_err()
        && mem_out.is_err()
        && disk_out.is_err()
        && uptime_out.is_err()
        && loadavg_out.is_err()
    {
        let err = cpu_out
            .err()
            .unwrap_or_else(|| anyhow::anyhow!("all metric commands failed"));
        return Err(anyhow::anyhow!(
            "all metric commands failed (session may be dead): {}",
            err
        ));
    }

    // Log individual command failures at debug level so operators can distinguish
    // "metric unavailable on this OS" from "command errored".
    let cpu_str = cpu_out
        .inspect_err(|e| tracing::debug!(host = %host_name, error = %e, "cpu command failed"))
        .unwrap_or_default();
    let mem_str = mem_out
        .inspect_err(|e| tracing::debug!(host = %host_name, error = %e, "mem command failed"))
        .unwrap_or_default();
    let disk_str = disk_out
        .inspect_err(|e| tracing::debug!(host = %host_name, error = %e, "disk command failed"))
        .unwrap_or_default();
    let uptime_str = uptime_out
        .inspect_err(|e| tracing::debug!(host = %host_name, error = %e, "uptime command failed"))
        .unwrap_or_default();
    let loadavg_str = loadavg_out
        .inspect_err(|e| tracing::debug!(host = %host_name, error = %e, "loadavg command failed"))
        .unwrap_or_default();

    let cpu_percent = parse_cpu_combined(&cpu_str, session).await;

    let ram_percent = parse_ram_combined(&mem_str, session).await;

    let disk_percent = parse_disk_df(&disk_str).or_else(|| {
        if !disk_str.is_empty() {
            tracing::debug!(host = %host_name, "disk output present but parse failed");
        }
        None
    });

    let uptime = parse_uptime(&uptime_str);

    let load_avg = parse_loadavg(&loadavg_str);

    let top_processes = collect_top_processes(session).await;

    Ok(Metrics {
        cpu_percent,
        ram_percent,
        disk_percent,
        uptime,
        load_avg,
        os_info: None, // OS info is collected during discovery, not metrics polling
        top_processes,
        last_updated: Instant::now(),
    })
}

/// Collects the top 3 processes by CPU usage.
///
/// Tries GNU `ps` (Linux) with a server-side sort first, then falls back to
/// BSD `ps` (macOS). Returns `None` when neither variant yields usable output.
async fn collect_top_processes(session: &SshSession) -> Option<Vec<ProcessInfo>> {
    // Linux: GNU ps with server-side sort by CPU; empty `=` headers suppressed.
    let linux_out = session
        .run_command(&top_processes_command(
            "-eo pid=,ppid=,pcpu=,pmem=,comm= --sort=-pcpu",
        ))
        .await
        .unwrap_or_default();
    if let Some(procs) = parse_top_processes(&linux_out) {
        return Some(procs);
    }
    // macOS: BSD ps sorted by CPU usage (-r).
    let macos_out = session
        .run_command(&top_processes_command(
            "-Aceo pid=,ppid=,pcpu=,pmem=,comm= -r",
        ))
        .await
        .unwrap_or_default();
    parse_top_processes(&macos_out)
}

/// Builds the remote shell command that lists the top processes by CPU with
/// the monitoring connection's own process chain removed. `ps_args` selects
/// the OS-specific `ps` columns and sort order.
///
/// The pipeline runs inside an SSH-spawned shell whose own processes — and the
/// `sshd` hosting the connection — would otherwise dominate the snapshot on an
/// idle server. The `awk` filter drops them strictly by PID:
///
/// - `s` (`$$`) — the shell, and everything it forked (`ps`, `awk`, `head`);
/// - `p` (`$PPID`) — the connection's `sshd`, and its children;
/// - `g` — the privileged `sshd` one level up (parent of `$PPID`).
///
/// Filtering is by PID only, never by process name, so a genuinely busy SSH
/// session belonging to another user still appears. POSIX-sh syntax — a
/// non-Bourne login shell simply yields no output and the panel degrades to
/// "process data unavailable".
fn top_processes_command(ps_args: &str) -> String {
    format!(
        "g=$(ps -o ppid= -p $PPID 2>/dev/null | tr -d ' '); \
         ps {ps_args} 2>/dev/null | \
         awk -v s=$$ -v p=$PPID -v g=\"$g\" \
         '$1!=s && $1!=p && $1!=g && $2!=s && $2!=p \
         {{$1=\"\";$2=\"\";sub(/^[ \\t]+/,\"\");print}}' | \
         head -n 3"
    )
}

async fn parse_cpu_combined(top_out: &str, session: &SshSession) -> Option<f64> {
    // Try Linux top format first.
    if let Some(v) = parse_cpu_top(top_out) {
        return Some(v);
    }
    // Try macOS top format.
    let macos_out = session
        .run_command("top -l 1 -n 0 2>/dev/null | grep 'CPU usage'")
        .await
        .unwrap_or_default();
    if let Some(v) = parse_cpu_top_macos(&macos_out) {
        return Some(v);
    }
    // Fall back to /proc/stat.
    let stat_out = session
        .run_command("head -1 /proc/stat 2>/dev/null")
        .await
        .unwrap_or_default();
    parse_cpu_proc_stat(&stat_out)
}

async fn parse_ram_combined(mem_out: &str, session: &SshSession) -> Option<f64> {
    // Try Linux free -b output.
    if let Some(v) = parse_ram_free(mem_out) {
        return Some(v);
    }
    // vm_stat output (macOS) — also need sysctl hw.memsize.
    if mem_out.contains("Mach Virtual Memory") {
        let memsize_out = session
            .run_command("sysctl hw.memsize 2>/dev/null")
            .await
            .unwrap_or_default();
        return parse_ram_vmstat(mem_out, &memsize_out);
    }
    None
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn top_processes_command_excludes_monitor_pid_chain() {
        let cmd = top_processes_command("-eo pid=,ppid=,pcpu=,pmem=,comm= --sort=-pcpu");

        // The grandparent PID is resolved before the pipeline runs.
        assert!(cmd.contains("g=$(ps -o ppid= -p $PPID"));
        // The awk filter binds the shell, its parent sshd and the grandparent.
        assert!(cmd.contains("-v s=$$"));
        assert!(cmd.contains("-v p=$PPID"));
        assert!(cmd.contains("-v g=\"$g\""));
        // It drops all three PIDs (and the children of the shell and sshd).
        assert!(cmd.contains("$1!=s && $1!=p && $1!=g && $2!=s && $2!=p"));
        // Filtering is by PID only — never by process name.
        assert!(!cmd.contains("sshd"));
        // Output is capped server-side.
        assert!(cmd.trim_end().ends_with("head -n 3"));
    }

    #[test]
    fn top_processes_command_splices_ps_args_verbatim() {
        let linux = top_processes_command("-eo pid=,ppid=,pcpu=,pmem=,comm= --sort=-pcpu");
        assert!(linux.contains("ps -eo pid=,ppid=,pcpu=,pmem=,comm= --sort=-pcpu 2>/dev/null"));

        let macos = top_processes_command("-Aceo pid=,ppid=,pcpu=,pmem=,comm= -r");
        assert!(macos.contains("ps -Aceo pid=,ppid=,pcpu=,pmem=,comm= -r 2>/dev/null"));
    }
}